You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/31 02:07:17 UTC

[18/20] storm git commit: STORM-2497: Let Supervisor enforce memory and add in support for shared memory regions

STORM-2497: Let Supervisor enforce memory and add in support for shared memory regions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b4d33955
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b4d33955
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b4d33955

Branch: refs/heads/master
Commit: b4d33955b15c74ecf2c779d321e941a6345924bc
Parents: 450ed63
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jun 2 13:08:08 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Jul 25 15:26:47 2017 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |   11 +-
 dev-tools/checkstyle.xslt                       |   40 +
 dev-tools/find-checkstyle-issues.py             |   38 +
 docs/Resource_Aware_Scheduler_overview.md       |  132 +-
 docs/STORM-UI-REST-API.md                       |   29 +-
 .../starter/ResourceAwareExampleTopology.java   |  177 +-
 external/storm-mongodb/pom.xml                  |    2 +-
 external/storm-pmml/pom.xml                     |    2 +-
 storm-client-misc/pom.xml                       |    2 +-
 .../src/jvm/org/apache/storm/Config.java        |   16 +-
 .../jvm/org/apache/storm/StormSubmitter.java    |   61 +-
 .../coordination/BatchSubtopologyBuilder.java   |   19 +-
 .../org/apache/storm/daemon/StormCommon.java    |    4 +-
 .../storm/daemon/supervisor/AdvancedFSOps.java  |   35 +-
 .../storm/drpc/LinearDRPCTopologyBuilder.java   |   26 +-
 .../org/apache/storm/generated/Assignment.java  |  413 +++--
 .../org/apache/storm/generated/BoltStats.java   |  440 ++---
 .../apache/storm/generated/ClusterSummary.java  |  108 +-
 .../storm/generated/ClusterWorkerHeartbeat.java |   52 +-
 .../storm/generated/CommonAggregateStats.java   |   44 +-
 .../storm/generated/ComponentPageInfo.java      |  264 +--
 .../org/apache/storm/generated/Credentials.java |   44 +-
 .../apache/storm/generated/ExecutorStats.java   |  168 +-
 .../jvm/org/apache/storm/generated/HBNodes.java |   32 +-
 .../org/apache/storm/generated/HBRecords.java   |   36 +-
 .../storm/generated/LSApprovedWorkers.java      |   44 +-
 .../generated/LSSupervisorAssignments.java      |   48 +-
 .../apache/storm/generated/LSTopoHistory.java   |   64 +-
 .../storm/generated/LSTopoHistoryList.java      |   36 +-
 .../storm/generated/LSWorkerHeartbeat.java      |   36 +-
 .../apache/storm/generated/ListBlobsResult.java |   32 +-
 .../apache/storm/generated/LocalAssignment.java |  147 +-
 .../apache/storm/generated/LocalStateData.java  |   48 +-
 .../org/apache/storm/generated/LogConfig.java   |   48 +-
 .../jvm/org/apache/storm/generated/Nimbus.java  |   36 +-
 .../org/apache/storm/generated/NodeInfo.java    |   32 +-
 .../storm/generated/RebalanceOptions.java       |   44 +-
 .../storm/generated/SettableBlobMeta.java       |   36 +-
 .../apache/storm/generated/SharedMemory.java    |  711 ++++++++
 .../org/apache/storm/generated/SpoutStats.java  |  252 +--
 .../org/apache/storm/generated/StormBase.java   |   92 +-
 .../apache/storm/generated/StormTopology.java   |  531 +++++-
 .../apache/storm/generated/SupervisorInfo.java  |  152 +-
 .../storm/generated/SupervisorPageInfo.java     |   72 +-
 .../storm/generated/SupervisorSummary.java      |   44 +-
 .../storm/generated/TopologyHistoryInfo.java    |   32 +-
 .../apache/storm/generated/TopologyInfo.java    |  160 +-
 .../storm/generated/TopologyPageInfo.java       |  934 ++++++++++-
 .../apache/storm/generated/TopologyStats.java   |  220 +--
 .../apache/storm/generated/WorkerResources.java |  206 ++-
 .../apache/storm/generated/WorkerSummary.java   |   44 +-
 .../storm/metric/cgroup/CGroupMemoryLimit.java  |   16 +
 .../jvm/org/apache/storm/scheduler/Cluster.java |  837 ----------
 .../jvm/org/apache/storm/scheduler/INimbus.java |   49 -
 .../org/apache/storm/scheduler/IScheduler.java  |   47 -
 .../org/apache/storm/scheduler/ISupervisor.java |   45 -
 .../storm/scheduler/SchedulerAssignment.java    |   29 +-
 .../scheduler/SchedulerAssignmentImpl.java      |  141 +-
 .../storm/scheduler/SupervisorDetails.java      |    4 +-
 .../org/apache/storm/scheduler/Topologies.java  |   87 -
 .../apache/storm/scheduler/TopologyDetails.java |  514 ------
 .../org/apache/storm/scheduler/WorkerSlot.java  |   43 +-
 .../storm/scheduler/resource/Component.java     |   54 -
 .../storm/scheduler/resource/RAS_Node.java      |  529 ------
 .../storm/scheduler/resource/RAS_Nodes.java     |  138 --
 .../storm/scheduler/resource/ResourceUtils.java |  207 ---
 .../scheduler/resource/SchedulingResult.java    |  116 --
 .../scheduler/resource/SchedulingState.java     |   56 -
 .../scheduler/resource/SchedulingStatus.java    |   40 -
 .../apache/storm/scheduler/resource/User.java   |  350 ----
 .../DefaultResourceAwareStrategy.java           |  757 ---------
 .../strategies/scheduling/IStrategy.java        |   47 -
 .../topology/BaseConfigurationDeclarer.java     |    9 +-
 .../ComponentConfigurationDeclarer.java         |   36 +
 .../apache/storm/topology/ResourceDeclarer.java |   30 +
 .../storm/topology/SharedOffHeapWithinNode.java |   38 +
 .../topology/SharedOffHeapWithinWorker.java     |   36 +
 .../org/apache/storm/topology/SharedOnHeap.java |   38 +
 .../apache/storm/topology/TopologyBuilder.java  |   33 +-
 .../TransactionalTopologyBuilder.java           |   47 +-
 .../jvm/org/apache/storm/trident/Stream.java    |    7 +
 .../org/apache/storm/trident/TridentState.java  |    7 +
 .../apache/storm/trident/TridentTopology.java   |    5 +
 .../org/apache/storm/trident/graph/Group.java   |   12 +
 .../operation/DefaultResourceDeclarer.java      |   24 +-
 .../trident/operation/ITridentResource.java     |    8 +
 .../topology/TridentTopologyBuilder.java        |   46 +-
 .../apache/storm/utils/ThriftTopologyUtils.java |   40 +-
 storm-client/src/py/storm/Nimbus.py             |   14 +-
 storm-client/src/py/storm/ttypes.py             | 1561 +++++++++++-------
 storm-client/src/storm.thrift                   |   24 +-
 .../org/apache/storm/scheduler/ClusterTest.java |  111 --
 storm-core/src/clj/org/apache/storm/ui/core.clj |    9 +
 .../templates/topology-page-template.html       |  100 +-
 .../apache/storm/trident/integration_test.clj   |    2 +-
 .../scheduler/multitenant_scheduler_test.clj    |  101 +-
 .../clj/org/apache/storm/scheduler_test.clj     |   19 +-
 .../java/org/apache/storm/DaemonConfig.java     |   92 +-
 .../container/ResourceIsolationInterface.java   |   51 +-
 .../storm/container/cgroup/CgroupManager.java   |  161 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  179 +-
 .../storm/daemon/nimbus/TopologyResources.java  |  255 ++-
 .../storm/daemon/supervisor/BasicContainer.java |  200 ++-
 .../storm/daemon/supervisor/Container.java      |  145 +-
 .../daemon/supervisor/ReadClusterState.java     |   17 +-
 .../apache/storm/daemon/supervisor/Slot.java    |   73 +-
 .../apache/storm/localizer/AsyncLocalizer.java  |   28 +-
 .../logging/filters/AccessLoggingFilter.java    |   36 +-
 .../org/apache/storm/scheduler/Cluster.java     | 1050 ++++++++++++
 .../org/apache/storm/scheduler/Component.java   |   88 +
 .../storm/scheduler/DefaultScheduler.java       |    9 +-
 .../apache/storm/scheduler/EvenScheduler.java   |   25 +-
 .../org/apache/storm/scheduler/INimbus.java     |   47 +
 .../org/apache/storm/scheduler/IScheduler.java  |   47 +
 .../storm/scheduler/ISchedulingState.java       |  275 +++
 .../org/apache/storm/scheduler/ISupervisor.java |   50 +
 .../storm/scheduler/SingleTopologyCluster.java  |   46 +
 .../org/apache/storm/scheduler/Topologies.java  |  139 ++
 .../apache/storm/scheduler/TopologyDetails.java |  573 +++++++
 .../storm/scheduler/resource/RAS_Node.java      |  465 ++++++
 .../storm/scheduler/resource/RAS_Nodes.java     |  152 ++
 .../resource/ResourceAwareScheduler.java        |  438 ++---
 .../storm/scheduler/resource/ResourceUtils.java |  175 ++
 .../scheduler/resource/SchedulingResult.java    |   88 +
 .../scheduler/resource/SchedulingStatus.java    |   41 +
 .../apache/storm/scheduler/resource/User.java   |  228 +++
 .../eviction/DefaultEvictionStrategy.java       |   68 +-
 .../strategies/eviction/IEvictionStrategy.java  |   17 +-
 .../DefaultSchedulingPriorityStrategy.java      |   47 +-
 .../priority/ISchedulingPriorityStrategy.java   |   13 +-
 .../DefaultResourceAwareStrategy.java           |  741 +++++++++
 .../strategies/scheduling/IStrategy.java        |   48 +
 .../test/java/org/apache/storm/TestCgroups.java |    5 +-
 .../apache/storm/daemon/nimbus/NimbusTest.java  |   56 +
 .../daemon/supervisor/BasicContainerTest.java   |    2 +
 .../org/apache/storm/scheduler/ClusterTest.java |  111 ++
 .../resource/TestResourceAwareScheduler.java    |  910 +++-------
 .../storm/scheduler/resource/TestUser.java      |   96 +-
 .../TestUtilsForResourceAwareScheduler.java     |  159 +-
 .../eviction/TestDefaultEvictionStrategy.java   |  791 ++-------
 .../TestDefaultResourceAwareStrategy.java       |  216 +--
 141 files changed, 12285 insertions(+), 8927 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 0078323..c6ef390 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -295,6 +295,7 @@ storm.resource.isolation.plugin: "org.apache.storm.container.cgroup.CgroupManage
 # Also determines whether the unit tests for cgroup runs.  
 # If storm.resource.isolation.plugin.enable is set to false the unit tests for cgroups will not run
 storm.resource.isolation.plugin.enable: false
+storm.cgroup.memory.enforcement.enable: false
 
 # Configs for CGroup support
 storm.cgroup.hierarchy.dir: "/cgroup/storm_resources"
@@ -304,7 +305,13 @@ storm.cgroup.resources:
 storm.cgroup.hierarchy.name: "storm"
 storm.supervisor.cgroup.rootdir: "storm"
 storm.cgroup.cgexec.cmd: "/bin/cgexec"
-storm.cgroup.memory.limit.tolerance.margin.mb: 128.0
+storm.cgroup.memory.limit.tolerance.margin.mb: 0.0
+storm.supervisor.memory.limit.tolerance.margin.mb: 128.0
+storm.supervisor.hard.memory.limit.multiplier: 2.0
+storm.supervisor.hard.memory.limit.overage.mb: 2024
+storm.supervisor.low.memory.threshold.mb: 1024
+storm.supervisor.medium.memory.threshold.mb: 1536
+storm.supervisor.medium.memory.grace.period.ms: 30000
 storm.topology.classpath.beginning.enabled: false
 worker.metrics:
     "CGroupMemory": "org.apache.storm.metric.cgroup.CGroupMemoryUsage"
@@ -312,4 +319,4 @@ worker.metrics:
     "CGroupCpu": "org.apache.storm.metric.cgroup.CGroupCpu"
     "CGroupCpuGuarantee": "org.apache.storm.metric.cgroup.CGroupCpuGuarantee"
 
-num.stat.buckets: 20
\ No newline at end of file
+num.stat.buckets: 20

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/dev-tools/checkstyle.xslt
----------------------------------------------------------------------
diff --git a/dev-tools/checkstyle.xslt b/dev-tools/checkstyle.xslt
new file mode 100644
index 0000000..54ff18f
--- /dev/null
+++ b/dev-tools/checkstyle.xslt
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<xsl:stylesheet version="1.0"
+xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
+<xsl:output method="text" omit-xml-declaration="yes" indent="no"/>
+<xsl:template match="/">
+	<xsl:for-each select="checkstyle/file">
+	<xsl:if test="@name=$target">
+		<xsl:value-of select="@name"/>
+		<xsl:text>
+</xsl:text>
+		<xsl:for-each select="error">
+			<xsl:text>	</xsl:text>
+			<xsl:value-of select="@line"/>
+			<xsl:text>: </xsl:text>
+			<xsl:value-of select="@message"/>
+			<xsl:text>
+</xsl:text>
+		</xsl:for-each>
+	</xsl:if>
+	</xsl:for-each>
+</xsl:template>
+
+</xsl:stylesheet> 

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/dev-tools/find-checkstyle-issues.py
----------------------------------------------------------------------
diff --git a/dev-tools/find-checkstyle-issues.py b/dev-tools/find-checkstyle-issues.py
new file mode 100755
index 0000000..ea3520f
--- /dev/null
+++ b/dev-tools/find-checkstyle-issues.py
@@ -0,0 +1,38 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import sys
+import os
+from optparse import OptionParser
+import subprocess
+
+def getCheckstyleFor(f, check_result):
+    f = os.path.abspath(f)
+    check_result = os.path.abspath(check_result)
+    ret = subprocess.check_output(['xsltproc', '--stringparam', 'target', f, './dev-tools/checkstyle.xslt', check_result])
+    if not ret.isspace():
+        print ret
+
+def main():
+    parser = OptionParser(usage="usage: %prog [options]")
+    parser.add_option("-c", "--checkstyle-result", dest="check_result",
+                      type="string", help="the checkstyle-result.xml file to parse", metavar="FILE")
+
+    (options, args) = parser.parse_args()
+
+    for f in args:
+        getCheckstyleFor(f, options.check_result)
+
+if __name__ == "__main__":
+    main()

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/docs/Resource_Aware_Scheduler_overview.md
----------------------------------------------------------------------
diff --git a/docs/Resource_Aware_Scheduler_overview.md b/docs/Resource_Aware_Scheduler_overview.md
index e3e2b56..94d4106 100644
--- a/docs/Resource_Aware_Scheduler_overview.md
+++ b/docs/Resource_Aware_Scheduler_overview.md
@@ -14,10 +14,11 @@ http://www.slideshare.net/HadoopSummit/resource-aware-scheduling-in-apache-storm
 1. [Using Resource Aware Scheduler](#Using-Resource-Aware-Scheduler)
 2. [API Overview](#API-Overview)
     1. [Setting Memory Requirement](#Setting-Memory-Requirement)
-    2. [Setting CPU Requirement](#Setting-CPU-Requirement)
-    3. [Limiting the Heap Size per Worker (JVM) Process](#Limiting-the-Heap-Size-per-Worker-(JVM)Process)
-    4. [Setting Available Resources on Node](#Setting-Available-Resources-on-Node)
-    5. [Other Configurations](#Other-Configurations)
+    2. [Shared Memory Requirement](#Setting-Shared-Memory)
+    3. [Setting CPU Requirement](#Setting-CPU-Requirement)
+    4. [Limiting the Heap Size per Worker (JVM) Process](#Limiting-the-Heap-Size-per-Worker-(JVM)Process)
+    5. [Setting Available Resources on Node](#Setting-Available-Resources-on-Node)
+    6. [Other Configurations](#Other-Configurations)
 3. [Topology Priorities and Per User Resource](#Topology-Priorities-and-Per-User-Resource)
     1. [Setup](#Setup)
     2. [Specifying Topology Priority](#Specifying-Topology-Priority)
@@ -31,9 +32,9 @@ http://www.slideshare.net/HadoopSummit/resource-aware-scheduling-in-apache-storm
 ## Using Resource Aware Scheduler
 
 The user can switch to using the Resource Aware Scheduler by setting the following in *conf/storm.yaml*
-
+```
     storm.scheduler: “org.apache.storm.scheduler.resource.ResourceAwareScheduler”
-    
+```    
 <div id='API-Overview'/>
 ## API Overview
 
@@ -45,38 +46,61 @@ For a Storm Topology, the user can now specify the amount of resources a topolog
 ### Setting Memory Requirement
 
 API to set component memory requirement:
-
+```
     public T setMemoryLoad(Number onHeap, Number offHeap)
-
+```
 Parameters:
 * Number onHeap – The amount of on heap memory an instance of this component will consume in megabytes
 * Number offHeap – The amount of off heap memory an instance of this component will consume in megabytes
 
 The user also has to option to just specify the on heap memory requirement if the component does not have an off heap memory need.
-
+```
     public T setMemoryLoad(Number onHeap)
-
+```
 Parameters:
 * Number onHeap – The amount of on heap memory an instance of this component will consume
 
 If no value is provided for offHeap, 0.0 will be used. If no value is provided for onHeap, or if the API is never called for a component, the default value will be used.
 
 Example of Usage:
-
+```
     SpoutDeclarer s1 = builder.setSpout("word", new TestWordSpout(), 10);
     s1.setMemoryLoad(1024.0, 512.0);
     builder.setBolt("exclaim1", new ExclamationBolt(), 3)
                 .shuffleGrouping("word").setMemoryLoad(512.0);
-
+```
 The entire memory requested for this topology is 16.5 GB. That is from 10 spouts with 1GB on heap memory and 0.5 GB off heap memory each and 3 bolts with 0.5 GB on heap memory each.
 
+<div id='Setting-Shared-Memory'/>
+### Shared Memory
+
+In some cases you may have memory that is shared between components. It may be a as simple as a large static data structure, or as complex as static data that is memory mapped into a bolt and is shared accross workers.  In any case you can specify your shared memory request by
+creating one of `SharedOffHeapWithinNode`, `SharedOffHeapWithinWorker`, or `SharedOnHeap` and adding it to bolts and spouts that use that shared memory.
+
+Example of Usage:
+
+```
+ builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word")
+          .addSharedMemory(new SharedOnHeap(100, "exclaim-cache"));
+```
+
+In the above example all of the "exclaim1" bolts in a worker will share 100MB of memory.
+
+```
+ builder.setBolt("lookup", new LookupBolt(), 3).shuffleGrouping("spout")
+          .addSharedMemory(new SharedOffHeapWithinNode(500, "static-lookup"));
+```
+
+In this example all "lookup" bolts on a given node will share 500 MB or memory off heap. 
+
+
 <div id='Setting-CPU-Requirement'/>
 ### Setting CPU Requirement
 
 API to set component CPU requirement:
-
+```
     public T setCPULoad(Double amount)
-
+```
 Parameters:
 * Number amount – The amount of on CPU an instance of this component will consume.
 
@@ -85,55 +109,55 @@ Currently, the amount of CPU resources a component requires or is available on a
 By convention a CPU core typically will get 100 points. If you feel that your processors are more or less powerful you can adjust this accordingly. Heavy tasks that are CPU bound will get 100 points, as they can consume an entire core. Medium tasks should get 50, light tasks 25, and tiny tasks 10. In some cases you have a task that spawns other threads to help with processing. These tasks may need to go above 100 points to express the amount of CPU they are using. If these conventions are followed the common case for a single threaded task the reported Capacity * 100 should be the number of CPU points that the task needs.
 
 Example of Usage:
-
+```
     SpoutDeclarer s1 = builder.setSpout("word", new TestWordSpout(), 10);
     s1.setCPULoad(15.0);
     builder.setBolt("exclaim1", new ExclamationBolt(), 3)
                 .shuffleGrouping("word").setCPULoad(10.0);
     builder.setBolt("exclaim2", new HeavyBolt(), 1)
                     .shuffleGrouping("exclaim1").setCPULoad(450.0);
-
+```
 <div id='Limiting-the-Heap-Size-per-Worker-(JVM)Process'/>
 ###	Limiting the Heap Size per Worker (JVM) Process
-
+```
     public void setTopologyWorkerMaxHeapSize(Number size)
-
+```
 Parameters:
 * Number size – The memory limit a worker process will be allocated in megabytes
 
 The user can limit the amount of memory resources the resource aware scheduler allocates to a single worker on a per topology basis by using the above API.  This API is in place so that the users can spread executors to multiple workers.  However, spreading executors to multiple workers may increase the communication latency since executors will not be able to use Disruptor Queue for intra-process communication.
 
 Example of Usage:
-
+```
     Config conf = new Config();
     conf.setTopologyWorkerMaxHeapSize(512.0);
-    
+```
 <div id='Setting-Available-Resources-on-Node'/>
 ### Setting Available Resources on Node
 
 A storm administrator can specify node resource availability by modifying the *conf/storm.yaml* file located in the storm home directory of that node.
 
 A storm administrator can specify how much available memory a node has in megabytes adding the following to *storm.yaml*
-
+```
     supervisor.memory.capacity.mb: [amount<Double>]
-
+```
 A storm administrator can also specify how much available CPU resources a node has available adding the following to *storm.yaml*
-
+```
     supervisor.cpu.capacity: [amount<Double>]
-
+```
 
 Note: that the amount the user can specify for the available CPU is represented using a point system like discussed earlier.
 
 Example of Usage:
-
+```
     supervisor.memory.capacity.mb: 20480.0
     supervisor.cpu.capacity: 100.0
-
+```
 <div id='Other-Configurations'/>
 ### Other Configurations
 
 The user can set some default configurations for the Resource Aware Scheduler in *conf/storm.yaml*:
-
+```
     //default value if on heap memory requirement is not specified for a component 
     topology.component.resources.onheap.memory.mb: 128.0
 
@@ -145,7 +169,7 @@ The user can set some default configurations for the Resource Aware Scheduler in
 
     //default value for the max heap size for a worker  
     topology.worker.max.heap.size.mb: 768.0
-
+```
 <div id='Topology-Priorities-and-Per-User-Resource'/>
 ## Topology Priorities and Per User Resource 
 
@@ -155,14 +179,14 @@ The Resource Aware Scheduler or RAS also has multitenant capabilities since many
 ### Setup
 
 The resource guarantees of a user can be specified *conf/user-resource-pools.yaml*.  Specify the resource guarantees of a user in the following format:
-
+```
     resource.aware.scheduler.user.pools:
 	[UserId]
 		cpu: [Amount of Guarantee CPU Resources]
 		memory: [Amount of Guarantee Memory Resources]
-
+```
 An example of what *user-resource-pools.yaml* can look like:
-
+```
     resource.aware.scheduler.user.pools:
         jerry:
             cpu: 1000
@@ -173,7 +197,7 @@ An example of what *user-resource-pools.yaml* can look like:
         bobby:
             cpu: 5000.0
             memory: 16384.0
-
+```
 Please note that the specified amount of Guaranteed CPU and Memory can be either a integer or double
 
 <div id='Specifying-Topology-Priority'/>
@@ -186,9 +210,9 @@ For example we can create a priority level mapping:
     DEV => 20 – 29
 
 Thus, each priority level contains 10 sub priorities. Users can set the priority level of a topology by using the following API
-
+```
     conf.setTopologyPriority(int priority)
-    
+``` 
 Parameters:
 * priority – an integer representing the priority of the topology
 
@@ -198,16 +222,16 @@ Please note that the 0-29 range is not a hard limit.  Thus, a user can set a pri
 ### Specifying Scheduling Strategy
 
 A user can specify on a per topology basis what scheduling strategy to use.  Users can implement the IStrategy interface and define new strategies to schedule specific topologies.  This pluggable interface was created since we realize different topologies might have different scheduling needs.  A user can set the topology strategy within the topology definition by using the API:
-
+```
     public void setTopologyStrategy(Class<? extends IStrategy> clazz)
-    
+```
 Parameters:
 * clazz – The strategy class that implements the IStrategy interface
 
 Example Usage:
-
+```
     conf.setTopologyStrategy(org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);
-
+```
 A default scheduling is provided.  The DefaultResourceAwareStrategy is implemented based off the scheduling algorithm in the original paper describing resource aware scheduling in Storm:
 
 Peng, Boyang, Mohammad Hosseini, Zhihao Hong, Reza Farivar, and Roy Campbell. "R-storm: Resource-aware scheduling in storm." In Proceedings of the 16th Annual Middleware Conference, pp. 149-161. ACM, 2015.
@@ -220,9 +244,9 @@ http://dl.acm.org/citation.cfm?id=2814808
 ### Specifying Topology Prioritization Strategy
 
 The order of scheduling is a pluggable interface in which a user could define a strategy that prioritizes topologies.  For a user to define his or her own prioritization strategy, he or she needs to implement the ISchedulingPriorityStrategy interface.  A user can set the scheduling priority strategy by setting the *Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY* to point to the class that implements the strategy. For instance:
-
+```
     resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
-    
+```
 A default strategy will be provided.  The following explains how the default scheduling priority strategy works.
 
 **DefaultSchedulingPriorityStrategy**
@@ -251,9 +275,9 @@ When scheduling, RAS sorts users by the average percentage satisfied of resource
 <div id='Specifying-Eviction-Strategy'/>
 ### Specifying Eviction Strategy
 The eviction strategy is used when there are not enough free resources in the cluster to schedule new topologies. If the cluster is full, we need a mechanism to evict topologies so that user resource guarantees can be met and additional resource can be shared fairly among users. The strategy for evicting topologies is also a pluggable interface in which the user can implement his or her own topology eviction strategy.  For a user to implement his or her own eviction strategy, he or she needs to implement the IEvictionStrategy Interface and set *Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY* to point to the implemented strategy class. For instance:
-
+```
     resource.aware.scheduler.eviction.strategy: "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy"
-
+```
 A default eviction strategy is provided.  The following explains how the default topology eviction strategy works
 
 **DefaultEvictionStrategy**
@@ -270,7 +294,7 @@ We should never evict a topology from a user that does not have his or her resou
 Figuring out resource usage for your topology:
  
 To get an idea of how much memory/CPU your topology is actually using you can add the following to your topology launch code.
- 
+ ```
     //Log all storm metrics
     conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);
  
@@ -278,23 +302,23 @@ To get an idea of how much memory/CPU your topology is actually using you can ad
     Map<String, String> workerMetrics = new HashMap<String, String>();
     workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
     conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
- 
+```
 The CPU metrics will require you to add
- 
+``` 
     <dependency>
         <groupId>org.apache.storm</groupId>
         <artifactId>storm-metrics</artifactId>
         <version>1.0.0</version>
     </dependency>
- 
+```
 as a topology dependency (1.0.0 or higher).
  
 You can then go to your topology on the UI, turn on the system metrics, and find the log that the LoggingMetricsConsumer is writing to.  It will output results in the log like.
- 
+```
     1454526100 node1.nodes.com:6707 -1:__system CPU {user-ms=74480, sys-ms=10780}
     1454526100 node1.nodes.com:6707 -1:__system memory/nonHeap     {unusedBytes=2077536, virtualFreeBytes=-64621729, initBytes=2555904, committedBytes=66699264, maxBytes=-1, usedBytes=64621728}
     1454526100 node1.nodes.com:6707 -1:__system memory/heap  {unusedBytes=573861408, virtualFreeBytes=694644256, initBytes=805306368, committedBytes=657719296, maxBytes=778502144, usedBytes=83857888}
-
+```
 The metrics with -1:__system are generally metrics for the entire worker.  In the example above that worker is running on node1.nodes.com:6707.  These metrics are collected every 60 seconds.  For the CPU you can see that over the 60 seconds this worker used  74480 + 10780 = 85260 ms of CPU time.  This is equivalent to 85260/60000 or about 1.5 cores.
  
 The Memory usage is similar but look at the usedBytes.  offHeap is 64621728 or about 62MB, and onHeap is 83857888 or about 80MB, but you should know what you set your heap to in each of your workers already.  How do you divide this up per bolt/spout?  That is a bit harder and may require some trial and error from your end.
@@ -328,7 +352,7 @@ Racks and nodes will be sorted from best choice to worst choice.  When finding a
 
 3. Average of the all the resource availability  
  -- This is simply taking the average of the percent available (available resources on node or rack divied by theavailable resources on rack or cluster, repectively).  This situation will only be used when "effective resources" for two objects (rack or node) are the same. Then we consider the average of all the percentages of resources as a metric for sorting. For example:
-
+```
         Avail Resources:
         node 1: CPU = 50 Memory = 1024 Slots = 20
         node 2: CPU = 50 Memory = 8192 Slots = 40
@@ -338,7 +362,7 @@ Racks and nodes will be sorted from best choice to worst choice.  When finding a
         node 1 = 50 / (50+50+1000) = 0.045 (CPU bound)
         node 2 = 50 / (50+50+1000) = 0.045 (CPU bound)
         node 3 = 0 (memory and slots are 0)
-
+```
 ode 1 and node 2 have the same effective resources but clearly node 2 has more resources (memory and slots) than node 1 and we would want to pick node 2 first since there is a higher probability we will be able to schedule more executors on it. This is what the phase 2 averaging does
 
 Thus the sorting follows the following progression. Compare based on 1) and if equal then compare based on 2) and if equal compare based on 3) and if equal we break ties by arbitrarly assigning ordering based on comparing the ids of the node or rack.
@@ -350,7 +374,7 @@ Originally the getBestClustering algorithm for RAS finds the "Best" rack based o
 The new strategy/algorithm to find the "best" rack or node, I dub subordinate resource availability ordering (inspired by Dominant Resource Fairness), sorts racks and nodes by the subordinate (not dominant) resource availability.
 
 For example given 4 racks with the following resource availabilities
-
+```
     //generate some that has alot of memory but little of cpu
     rack-3 Avail [ CPU 100.0 MEM 200000.0 Slots 40 ] Total [ CPU 100.0 MEM 200000.0 Slots 40 ]
     //generate some supervisors that are depleted of one resource
@@ -362,7 +386,7 @@ For example given 4 racks with the following resource availabilities
     //best rack to choose
     rack-0 Avail [ CPU 4000.0 MEM 80000.0 Slots 40( ] Total [ CPU 4000.0 MEM 80000.0 Slots 40 ]
     Cluster Overall Avail [ CPU 12200.0 MEM 410000.0 Slots 200 ] Total [ CPU 12200.0 MEM 410000.0 Slots 200 ]
-
+```
 It is clear that rack-0 is the best cluster since its the most balanced and can potentially schedule the most executors, while rack-2 is the worst rack since rack-2 is depleted of cpu resource thus rendering it unschedulable even though there are other resources available.
 
 We first calculate the resource availability percentage of all the racks for each resource by computing:
@@ -372,13 +396,13 @@ We first calculate the resource availability percentage of all the racks for eac
 We do this calculation to normalize the values otherwise the resource values would not be comparable.
 
 So for our example:
-
+```
     rack-3 Avail [ CPU 0.819672131147541% MEM 48.78048780487805% Slots 20.0% ] effective resources: 0.00819672131147541
     rack-2 Avail [ 0.0% MEM 19.51219512195122% Slots 20.0% ] effective resources: 0.0
     rack-4 Avail [ CPU 50.0% MEM 2.4390243902439024% Slots 20.0% ] effective resources: 0.024390243902439025
     rack-1 Avail [ CPU 16.39344262295082% MEM 9.75609756097561% Slots 20.0% ] effective resources: 0.0975609756097561
     rack-0 Avail [ CPU 32.78688524590164% MEM 19.51219512195122% Slots 20.0% ] effective resources: 0.1951219512195122
-
+```
 The effective resource of a rack, which is also the subordinate resource, is computed by: 
 
     MIN(resource availability percentage of {CPU, Memory, # of free Slots}).

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/docs/STORM-UI-REST-API.md
----------------------------------------------------------------------
diff --git a/docs/STORM-UI-REST-API.md b/docs/STORM-UI-REST-API.md
index 53adeb8..b0d06c7 100644
--- a/docs/STORM-UI-REST-API.md
+++ b/docs/STORM-UI-REST-API.md
@@ -290,7 +290,7 @@ Sample response:
     "supervisors": [{ 
         "totalMem": 4096.0, 
         "host":"192.168.10.237",
-        "id":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212",
+        "id":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e",
         "uptime":"7m 8s",
         "totalCpu":400.0,
         "usedCpu":495.0,
@@ -305,12 +305,12 @@ Sample response:
         "topologyName":"ras",
         "topologyId":"ras-4-1460229987",
         "host":"192.168.10.237",
-        "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212",
+        "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e",
         "assignedMemOnHeap":704.0,
         "uptime":"2m 47s",
         "uptimeSeconds":167,
         "port":6707,
-        "workerLogLink":"http:\/\/192.168.10.237:8000\/log?file=ras-4-1460229987%2F6707%2Fworker.log",
+        "workerLogLink":"http:\/\/host:8000\/log?file=ras-4-1460229987%2F6707%2Fworker.log",
         "componentNumTasks": {
             "word":5
         },
@@ -322,11 +322,11 @@ Sample response:
         "topologyName":"ras",
         "topologyId":"ras-4-1460229987",
         "host":"192.168.10.237",
-        "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212",
+        "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e",
         "assignedMemOnHeap":904.0,
         "uptime":"2m 53s",
         "port":6706,
-        "workerLogLink":"http:\/\/192.168.10.237:8000\/log?file=ras-4-1460229987%2F6706%2Fworker.log",
+        "workerLogLink":"http:\/\/host:8000\/log?file=ras-4-1460229987%2F6706%2Fworker.log",
         "componentNumTasks":{
             "exclaim2":2,
             "exclaim1":3,
@@ -602,6 +602,25 @@ Sample response:
     "msgTimeout": 30,
     "windowHint": "10m 0s",
     "schedulerDisplayResource": true,
+    "workers": [{
+        "topologyName": "WordCount3",
+        "topologyId": "WordCount3-1-1402960825",
+        "host": "my-host",
+        "supervisorId": "9124ca9a-42e8-481e-9bf3-a041d9595430",
+        "assignedMemOnHeap": 1452.0,
+        "uptime": "27m 26s",
+        "port": 6702,
+        "workerLogLink": "logs",
+        "componentNumTasks": {
+            "spout": 2,
+            "count": 3,
+            "split": 10
+        },
+        "executorsTotal": 15,
+        "uptimeSeconds": 1646,
+        "assignedCpu": 260.0,
+        "assignedMemOffHeap": 160.0
+    }]
     "topologyStats": [
         {
             "windowPretty": "10m 0s",

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
index 8e9dcc8..861f4f5 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java
@@ -15,17 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.starter;
 
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.TestWordSpout;
-import org.apache.storm.topology.BoltDeclarer;
 import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.SharedOffHeapWithinNode;
+import org.apache.storm.topology.SharedOnHeap;
 import org.apache.storm.topology.SpoutDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.topology.base.BaseRichBolt;
@@ -34,65 +39,119 @@ import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 
 public class ResourceAwareExampleTopology {
-  public static class ExclamationBolt extends BaseRichBolt {
-    OutputCollector _collector;
-
-    @Override
-    public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
-      _collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
-      _collector.ack(tuple);
+    public static class ExclamationBolt extends BaseRichBolt {
+        //Have a crummy cache to show off shared memory accounting
+        private static final ConcurrentHashMap<String, String> myCrummyCache =
+            new ConcurrentHashMap<>();
+        private static final int CACHE_SIZE = 100_000;
+        OutputCollector _collector;
+
+        protected static String getFromCache(String key) {
+            return myCrummyCache.get(key);
+        }
+
+        protected static void addToCache(String key, String value) {
+            myCrummyCache.putIfAbsent(key, value);
+            int numToRemove = myCrummyCache.size() - CACHE_SIZE;
+            if (numToRemove > 0) {
+                //Remove something randomly...
+                Iterator<Entry<String, String>> it = myCrummyCache.entrySet().iterator();
+                for (; numToRemove > 0 && it.hasNext(); numToRemove--) {
+                    it.next();
+                    it.remove();
+                }
+            }
+        }
+
+        @Override
+        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+            _collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple tuple) {
+            String orig = tuple.getString(0);
+            String ret = getFromCache(orig);
+            if (ret == null) {
+                ret = orig + "!!!";
+                addToCache(orig, ret);
+            }
+            _collector.emit(tuple, new Values(ret));
+            _collector.ack(tuple);
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
     }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("word"));
+    public static void main(String[] args) throws Exception {
+        TopologyBuilder builder = new TopologyBuilder();
+
+        //A topology can set resources in terms of CPU and Memory for each component
+        // These can be chained (like with setting the CPU requirement)
+        SpoutDeclarer spout = builder.setSpout("word", new TestWordSpout(), 10).setCPULoad(20);
+        // Or done separately like with setting the
+        // onheap and offheap memory requirement
+        spout.setMemoryLoad(64, 16);
+        //On heap memory is used to help calculate the heap of the java process for the worker
+        // off heap memory is for things like JNI memory allocated off heap, or when using the
+        // ShellBolt or ShellSpout.  In this case the 16 MB of off heap is just as an example
+        // as we are not using it.
+
+        // Some times a Bolt or Spout will have some memory that is shared between the instances
+        // These are typically caches, but could be anything like a static database that is memory
+        // mapped into the processes. These can be declared separately and added to the bolts and
+        // spouts that use them.  Or if only one uses it they can be created inline with the add
+        SharedOnHeap exclaimCache = new SharedOnHeap(100, "exclaim-cache");
+        SharedOffHeapWithinNode notImplementedButJustAnExample =
+            new SharedOffHeapWithinNode(500, "not-implemented-node-level-cache");
+
+        //If CPU or memory is not set the values stored in topology.component.resources.onheap.memory.mb,
+        // topology.component.resources.offheap.memory.mb and topology.component.cpu.pcore.percent
+        // will be used instead
+        builder
+            .setBolt("exclaim1", new ExclamationBolt(), 3)
+            .shuffleGrouping("word")
+            .addSharedMemory(exclaimCache);
+
+        builder
+            .setBolt("exclaim2", new ExclamationBolt(), 2)
+            .shuffleGrouping("exclaim1")
+            .setMemoryLoad(100)
+            .addSharedMemory(exclaimCache)
+            .addSharedMemory(notImplementedButJustAnExample);
+
+        Config conf = new Config();
+        conf.setDebug(true);
+
+        //Under RAS the number of workers is determined by the scheduler and the settings in the conf are ignored
+        //conf.setNumWorkers(3);
+
+        //Instead the scheduler lets you set the maximum heap size for any worker.
+        conf.setTopologyWorkerMaxHeapSize(1024.0);
+        //The scheduler generally will try to pack executors into workers until the max heap size is met, but
+        // this can vary depending on the specific scheduling strategy selected.
+        // The reason for this is to try and balance the maximum pause time GC might take (which is larger for larger heaps)
+        // against better performance because of not needing to serialize/deserialize tuples.
+
+        //The priority of a topology describes the importance of the topology in decreasing importance
+        // starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases).
+        //Recommended range of 0-29 but no hard limit set.
+        // If there are not enough resources in a cluster the priority in combination with how far over a guarantees
+        // a user is will decide which topologies are run and which ones are not.
+        conf.setTopologyPriority(29);
+
+        //set to use the default resource aware strategy when using the MultitenantResourceAwareBridgeScheduler
+        conf.setTopologyStrategy(
+            "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy");
+
+        String topoName = "test";
+        if (args != null && args.length > 0) {
+            topoName = args[0];
+        }
+
+        StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
     }
-  }
-
-  public static void main(String[] args) throws Exception {
-    TopologyBuilder builder = new TopologyBuilder();
-
-    SpoutDeclarer spout =  builder.setSpout("word", new TestWordSpout(), 5);
-    //set cpu requirement
-    spout.setCPULoad(20);
-    //set onheap and offheap memory requirement
-    spout.setMemoryLoad(64, 16);
-
-    BoltDeclarer bolt1 = builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
-    //sets cpu requirement.  Not neccessary to set both CPU and memory.
-    //For requirements not set, a default value will be used
-    bolt1.setCPULoad(15);
-
-    BoltDeclarer bolt2 = builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
-    bolt2.setMemoryLoad(100);
-
-    Config conf = new Config();
-    conf.setDebug(true);
-
-    /**
-     * Use to limit the maximum amount of memory (in MB) allocated to one worker process.
-     * Can be used to spread executors to to multiple workers
-     */
-    conf.setTopologyWorkerMaxHeapSize(1024.0);
-
-    //topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases).
-    //Recommended range of 0-29 but no hard limit set.
-    conf.setTopologyPriority(29);
-
-    // Set strategy to schedule topology. If not specified, default to org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
-    conf.setTopologyStrategy(org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);
-
-    String topoName = "test";
-    if (args != null && args.length > 0) {
-        topoName = args[0];
-    }
-    conf.setNumWorkers(3);
-
-    StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createTopology());
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/external/storm-mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/pom.xml b/external/storm-mongodb/pom.xml
index 0e0133c..bfe8e88 100644
--- a/external/storm-mongodb/pom.xml
+++ b/external/storm-mongodb/pom.xml
@@ -81,4 +81,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/external/storm-pmml/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-pmml/pom.xml b/external/storm-pmml/pom.xml
index 839cb3c..21744ed 100644
--- a/external/storm-pmml/pom.xml
+++ b/external/storm-pmml/pom.xml
@@ -91,4 +91,4 @@
         </plugins>
     </build>
 
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-client-misc/pom.xml
----------------------------------------------------------------------
diff --git a/storm-client-misc/pom.xml b/storm-client-misc/pom.xml
index 9161569..276e2c8 100644
--- a/storm-client-misc/pom.xml
+++ b/storm-client-misc/pom.xml
@@ -60,4 +60,4 @@
         </plugins>
     </build>
 
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 311fca7..bc13754 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm;
 
-import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
 import org.apache.storm.serialization.IKryoDecorator;
 import org.apache.storm.serialization.IKryoFactory;
 import org.apache.storm.validation.ConfigValidation;
@@ -256,7 +255,10 @@ public class Config extends HashMap<String, Object> {
      * The strategy to use when scheduling a topology with Resource Aware Scheduler
      */
     @NotNull
-    @isImplementationOfClass(implementsClass = IStrategy.class)
+    @isString
+    //NOTE: @isImplementationOfClass(implementsClass = IStrategy.class) is enforced in DaemonConf, so
+    // an error will be thrown by nimbus on topology submission and not by the client prior to submitting
+    // the topology.
     public static final String TOPOLOGY_SCHEDULER_STRATEGY = "topology.scheduler.strategy";
 
     /**
@@ -1758,14 +1760,8 @@ public class Config extends HashMap<String, Object> {
         this.put(Config.TOPOLOGY_PRIORITY, priority);
     }
 
-    /**
-     * Takes as input the strategy class name. Strategy must implement the IStrategy interface
-     * @param clazz class of the strategy to use
-     */
-    public void setTopologyStrategy(Class<? extends IStrategy> clazz) {
-        if (clazz != null) {
-            this.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, clazz.getName());
-        }
+    public void setTopologyStrategy(String strategy) {
+        this.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, strategy);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
index abb21d9..e73f60e 100644
--- a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
@@ -17,37 +17,42 @@
  */
 package org.apache.storm;
 
-import com.google.common.collect.Sets;
-
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.storm.blobstore.NimbusBlobStore;
 import org.apache.storm.dependency.DependencyPropertiesParser;
 import org.apache.storm.dependency.DependencyUploader;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.NotAliveException;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologyInitialStatus;
 import org.apache.storm.hooks.SubmitterHookException;
-import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.utils.BufferFileInputStream;
+import org.apache.storm.utils.NimbusClient;
 import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.validation.ConfigValidation;
-import org.apache.commons.lang.StringUtils;
 import org.apache.thrift.TException;
 import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.storm.security.auth.IAutoCredentials;
-import org.apache.storm.security.auth.AuthUtils;
-import org.apache.storm.generated.*;
-import org.apache.storm.utils.BufferFileInputStream;
-import org.apache.storm.utils.NimbusClient;
+import com.google.common.collect.Sets;
 
 /**
  * Use this class to submit topologies to run on the Storm cluster. You should run your program
@@ -558,39 +563,9 @@ public class StormSubmitter {
 
     private static void validateConfs(Map<String, Object> topoConf, StormTopology topology) throws IllegalArgumentException, InvalidTopologyException {
         ConfigValidation.validateFields(topoConf);
-        validateTopologyWorkerMaxHeapSizeMBConfigs(topoConf, topology);
         Utils.validateTopologyBlobStoreMap(topoConf, getListOfKeysFromBlobStore(topoConf));
     }
 
-    private static void validateTopologyWorkerMaxHeapSizeMBConfigs(Map<String, Object> topoConf, StormTopology topology) {
-        double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, topoConf);
-        Double topologyWorkerMaxHeapSize = ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
-        if(topologyWorkerMaxHeapSize < largestMemReq) {
-            throw new IllegalArgumentException("Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB="
-                    + ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < "
-                    + largestMemReq + " (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount");
-        }
-    }
-
-    private static double getMaxExecutorMemoryUsageForTopo(StormTopology topology, Map<String, Object> topologyConf) {
-        double largestMemoryOperator = 0.0;
-        for(Map<String, Double> entry : ResourceUtils.getBoltsResources(topology, topologyConf).values()) {
-            double memoryRequirement = entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
-                    + entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
-            if(memoryRequirement > largestMemoryOperator) {
-                largestMemoryOperator = memoryRequirement;
-            }
-        }
-        for(Map<String, Double> entry : ResourceUtils.getSpoutsResources(topology, topologyConf).values()) {
-            double memoryRequirement = entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
-                    + entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
-            if(memoryRequirement > largestMemoryOperator) {
-                largestMemoryOperator = memoryRequirement;
-            }
-        }
-        return largestMemoryOperator;
-    }
-
     private static Set<String> getListOfKeysFromBlobStore(Map<String, Object> topoConf) {
         try (NimbusBlobStore client = new NimbusBlobStore()) {
             client.prepare(topoConf);

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java
index 6a167c4..142e5f9 100644
--- a/storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java
+++ b/storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java
@@ -21,6 +21,7 @@ import org.apache.storm.Constants;
 import org.apache.storm.coordination.CoordinatedBolt.SourceArgs;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SharedMemory;
 import org.apache.storm.grouping.CustomStreamGrouping;
 import org.apache.storm.grouping.PartialKeyGrouping;
 import org.apache.storm.topology.BaseConfigurationDeclarer;
@@ -108,6 +109,9 @@ public class BatchSubtopologyBuilder {
                                                                       coordinatedArgs,
                                                                       null),
                                                   component.parallelism);
+            for (SharedMemory request: component.sharedMemory) {
+                input.addSharedMemory(request);
+            }
             for(Map<String, Object> conf: component.componentConfs) {
                 input.addConfigurations(conf);
             }
@@ -129,10 +133,11 @@ public class BatchSubtopologyBuilder {
     }
 
     private static class Component {
-        public IRichBolt bolt;
-        public Integer parallelism;
-        public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
-        public List<Map<String, Object>> componentConfs = new ArrayList<>();
+        public final IRichBolt bolt;
+        public final Integer parallelism;
+        public final List<InputDeclaration> declarations = new ArrayList<>();
+        public final List<Map<String, Object>> componentConfs = new ArrayList<>();
+        public final Set<SharedMemory> sharedMemory = new HashSet<>();
         
         public Component(IRichBolt bolt, Integer parallelism) {
             this.bolt = bolt;
@@ -443,5 +448,11 @@ public class BatchSubtopologyBuilder {
             _component.componentConfs.add(conf);
             return this;
         }
+
+        @Override
+        public BoltDeclarer addSharedMemory(SharedMemory request) {
+            _component.sharedMemory.add(request);
+            return this;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
index 9b5163b..158c2eb 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -144,8 +145,7 @@ public class StormCommon {
     }
 
     public static Map<String, Object> allComponents(StormTopology topology) {
-        Map<String, Object> components = new HashMap<>();
-        components.putAll(topology.get_bolts());
+        Map<String, Object> components = new HashMap<>(topology.get_bolts());
         components.putAll(topology.get_spouts());
         components.putAll(topology.get_state_spouts());
         return components;

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
index 04c4496..bee4588 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -80,25 +80,32 @@ public class AdvancedFSOps implements IAdvancedFSOps {
         @Override
         public void deleteIfExists(File path, String user, String logPrefix) throws IOException {
             String absolutePath = path.getAbsolutePath();
-            LOG.info("Deleting path {}", absolutePath);
-            if (user == null) {
-                user = Files.getOwner(path.toPath()).getName();
-            }
-            List<String> commands = new ArrayList<>();
-            commands.add("rmr");
-            commands.add(absolutePath);
-            ClientSupervisorUtils.processLauncherAndWait(_conf, user, commands, null, logPrefix);
-
             if (Utils.checkFileExists(absolutePath)) {
-                // It's possible that permissions were not set properly on the directory, and
-                // the user who is *supposed* to own the dir does not. In this case, try the
-                // delete as the supervisor user.
-                Utils.forceDelete(absolutePath);
+                LOG.info("Deleting path (runAsUser) {}", absolutePath);
+                if (user == null) {
+                    user = Files.getOwner(path.toPath()).getName();
+                }
+                List<String> commands = new ArrayList<>();
+                commands.add("rmr");
+                commands.add(absolutePath);
+                ClientSupervisorUtils.processLauncherAndWait(_conf, user, commands, null, logPrefix);
+
                 if (Utils.checkFileExists(absolutePath)) {
-                    throw new RuntimeException(path + " was not deleted.");
+                    // It's possible that permissions were not set properly on the directory, and
+                    // the user who is *supposed* to own the dir does not. In this case, try the
+                    // delete as the supervisor user.
+                    Utils.forceDelete(absolutePath);
+                    if (Utils.checkFileExists(absolutePath)) {
+                        throw new RuntimeException(path + " was not deleted.");
+                    }
                 }
             }
         }
+
+        @Override
+        public void deleteIfExists(File path) throws IOException {
+            deleteIfExists(path, null, "UNNAMED");
+        }
         
         @Override
         public void setupStormCodeDir(String user, File path) throws IOException {

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java
index dc702a3..8fe85f3 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java
@@ -25,6 +25,7 @@ import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
 import org.apache.storm.coordination.CoordinatedBolt.IdStreamSpec;
 import org.apache.storm.coordination.CoordinatedBolt.SourceArgs;
 import org.apache.storm.coordination.IBatchBolt;
+import org.apache.storm.generated.SharedMemory;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.StreamInfo;
 import org.apache.storm.grouping.CustomStreamGrouping;
@@ -40,8 +41,10 @@ import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 
 // Trident subsumes the functionality provided by this class, so it's deprecated
@@ -119,7 +122,11 @@ public class LinearDRPCTopologyBuilder {
                     boltId(i),
                     new CoordinatedBolt(component.bolt, source, idSpec),
                     component.parallelism);
-            
+
+            for (SharedMemory request: component.sharedMemory) {
+                declarer.addSharedMemory(request);
+            }
+
             for(Map<String, Object> conf: component.componentConfs) {
                 declarer.addConfigurations(conf);
             }
@@ -172,11 +179,12 @@ public class LinearDRPCTopologyBuilder {
     }
     
     private static class Component {
-        public IRichBolt bolt;
-        public int parallelism;
-        public List<Map<String, Object>> componentConfs = new ArrayList<>();
-        public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
-        
+        public final IRichBolt bolt;
+        public final int parallelism;
+        public final List<Map<String, Object>> componentConfs = new ArrayList<>();
+        public final List<InputDeclaration> declarations = new ArrayList<>();
+        public final Set<SharedMemory> sharedMemory = new HashSet<>();
+
         public Component(IRichBolt bolt, int parallelism) {
             this.bolt = bolt;
             this.parallelism = parallelism;
@@ -389,5 +397,11 @@ public class LinearDRPCTopologyBuilder {
             _component.componentConfs.add(conf);
             return this;
         }
+
+        @Override
+        public LinearDRPCInputDeclarer addSharedMemory(SharedMemory request) {
+            _component.sharedMemory.add(request);
+            return this;
+        }
     }
 }