You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:31 UTC

[42/52] [partial] storm git commit: STORM-2441 Break down 'storm-core' to extract client (worker) artifacts

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java
new file mode 100755
index 0000000..c38f5fe
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.Device;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class DevicesCore implements CgroupCore {
+
+    private final String dir;
+
+    private static final String DEVICES_ALLOW = "/devices.allow";
+    private static final String DEVICES_DENY = "/devices.deny";
+    private static final String DEVICES_LIST = "/devices.list";
+
+    private static final char TYPE_ALL = 'a';
+    private static final char TYPE_BLOCK = 'b';
+    private static final char TYPE_CHAR = 'c';
+
+    private static final int ACCESS_READ = 1;
+    private static final int ACCESS_WRITE = 2;
+    private static final int ACCESS_CREATE = 4;
+
+    private static final char ACCESS_READ_CH = 'r';
+    private static final char ACCESS_WRITE_CH = 'w';
+    private static final char ACCESS_CREATE_CH = 'm';
+
+    private static final Logger LOG = LoggerFactory.getLogger(DevicesCore.class);
+
+    public DevicesCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.devices;
+    }
+
+    public static class Record {
+        Device device;
+        char type;
+        int accesses;
+
+        public Record(char type, Device device, int accesses) {
+            this.type = type;
+            this.device = device;
+            this.accesses = accesses;
+        }
+
+        public Record(String output) {
+            if (output.contains("*")) {
+                LOG.debug("Pre: {}", output);
+                output = output.replaceAll("\\*", "-1");
+                LOG.debug("After: {}",output);
+            }
+            String[] splits = output.split("[: ]");
+            type = splits[0].charAt(0);
+            int major = Integer.parseInt(splits[1]);
+            int minor = Integer.parseInt(splits[2]);
+            device = new Device(major, minor);
+            accesses = 0;
+            for (char c : splits[3].toCharArray()) {
+                if (c == ACCESS_READ_CH) {
+                    accesses |= ACCESS_READ;
+                }
+                if (c == ACCESS_CREATE_CH) {
+                    accesses |= ACCESS_CREATE;
+                }
+                if (c == ACCESS_WRITE_CH) {
+                    accesses |= ACCESS_WRITE;
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append(type);
+            sb.append(' ');
+            sb.append(device.major);
+            sb.append(':');
+            sb.append(device.minor);
+            sb.append(' ');
+            sb.append(getAccessesFlag(accesses));
+
+            return sb.toString();
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + accesses;
+            result = prime * result + ((device == null) ? 0 : device.hashCode());
+            result = prime * result + type;
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            Record other = (Record) obj;
+            if (accesses != other.accesses) {
+                return false;
+            }
+            if (device == null) {
+                if (other.device != null) {
+                    return false;
+                }
+            } else if (!device.equals(other.device)) {
+                return false;
+            }
+            if (type != other.type) {
+                return false;
+            }
+            return true;
+        }
+
+        public static Record[] parseRecordList(List<String> output) {
+            Record[] records = new Record[output.size()];
+            for (int i = 0, l = output.size(); i < l; i++) {
+                records[i] = new Record(output.get(i));
+            }
+
+            return records;
+        }
+
+        public static StringBuilder getAccessesFlag(int accesses) {
+            StringBuilder sb = new StringBuilder();
+            if ((accesses & ACCESS_READ) != 0) {
+                sb.append(ACCESS_READ_CH);
+            }
+            if ((accesses & ACCESS_WRITE) != 0) {
+                sb.append(ACCESS_WRITE_CH);
+            }
+            if ((accesses & ACCESS_CREATE) != 0) {
+                sb.append(ACCESS_CREATE_CH);
+            }
+            return sb;
+        }
+    }
+
+    private void setPermission(String prop, char type, Device device, int accesses) throws IOException {
+        Record record = new Record(type, device, accesses);
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, prop), record.toString());
+    }
+
+    public void setAllow(char type, Device device, int accesses) throws IOException {
+        setPermission(DEVICES_ALLOW, type, device, accesses);
+    }
+
+    public void setDeny(char type, Device device, int accesses) throws IOException {
+        setPermission(DEVICES_DENY, type, device, accesses);
+    }
+
+    public Record[] getList() throws IOException {
+        List<String> output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, DEVICES_LIST));
+        return Record.parseRecordList(output);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java
new file mode 100755
index 0000000..89e13dd
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+
+public class FreezerCore implements CgroupCore {
+
+    public static final String FREEZER_STATE = "/freezer.state";
+
+    private final String dir;
+
+    public FreezerCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.freezer;
+    }
+
+    public void setState(State state) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, FREEZER_STATE), state.name().toUpperCase());
+    }
+
+    public State getState() throws IOException {
+        return State.getStateValue(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, FREEZER_STATE)).get(0));
+    }
+
+    public enum State {
+        frozen, freezing, thawed;
+
+        public static State getStateValue(String state) {
+            if (state.equals("FROZEN")) {
+                return frozen;
+            }
+            else if (state.equals("FREEZING")) {
+                return freezing;
+            }
+            else if (state.equals("THAWED")) {
+                return thawed;
+            }
+            else {
+                return null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java
new file mode 100755
index 0000000..9bd6a72
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+
+public class MemoryCore implements CgroupCore {
+
+    public static final String MEMORY_STAT = "/memory.stat";
+    public static final String MEMORY_USAGE_IN_BYTES = "/memory.usage_in_bytes";
+    public static final String MEMORY_MEMSW_USAGE_IN_BYTES = "/memory.memsw.usage_in_bytes";
+    public static final String MEMORY_MAX_USAGE_IN_BYTES = "/memory.max_usage_in_bytes";
+    public static final String MEMORY_MEMSW_MAX_USAGE_IN_BYTES = "/memory.memsw.max_usage_in_bytes";
+    public static final String MEMORY_LIMIT_IN_BYTES = "/memory.limit_in_bytes";
+    public static final String MEMORY_MEMSW_LIMIT_IN_BYTES = "/memory.memsw.limit_in_bytes";
+    public static final String MEMORY_FAILCNT = "/memory.failcnt";
+    public static final String MEMORY_MEMSW_FAILCNT = "/memory.memsw.failcnt";
+    public static final String MEMORY_FORCE_EMPTY = "/memory.force_empty";
+    public static final String MEMORY_SWAPPINESS = "/memory.swappiness";
+    public static final String MEMORY_USE_HIERARCHY = "/memory.use_hierarchy";
+    public static final String MEMORY_OOM_CONTROL = "/memory.oom_control";
+
+    private final String dir;
+
+    public MemoryCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.memory;
+    }
+
+    public static class Stat {
+        public final long cacheSize;
+        public final long rssSize;
+        public final long mappedFileSize;
+        public final long pgpginNum;
+        public final long pgpgoutNum;
+        public final long swapSize;
+        public final long activeAnonSize;
+        public final long inactiveAnonSize;
+        public final long activeFileSize;
+        public final long inactiveFileSize;
+        public final long unevictableSize;
+        public final long hierarchicalMemoryLimitSize;
+        public final long hierarchicalMemSwapLimitSize;
+        public final long totalCacheSize;
+        public final long totalRssSize;
+        public final long totalMappedFileSize;
+        public final long totalPgpginNum;
+        public final long totalPgpgoutNum;
+        public final long totalSwapSize;
+        public final long totalActiveAnonSize;
+        public final long totalInactiveAnonSize;
+        public final long totalActiveFileSize;
+        public final long totalInactiveFileSize;
+        public final long totalUnevictableSize;
+        public final long totalHierarchicalMemoryLimitSize;
+        public final long totalHierarchicalMemSwapLimitSize;
+
+        public Stat(String output) {
+            String[] splits = output.split("\n");
+            this.cacheSize = Long.parseLong(splits[0]);
+            this.rssSize = Long.parseLong(splits[1]);
+            this.mappedFileSize = Long.parseLong(splits[2]);
+            this.pgpginNum = Long.parseLong(splits[3]);
+            this.pgpgoutNum = Long.parseLong(splits[4]);
+            this.swapSize = Long.parseLong(splits[5]);
+            this.inactiveAnonSize = Long.parseLong(splits[6]);
+            this.activeAnonSize = Long.parseLong(splits[7]);
+            this.inactiveFileSize = Long.parseLong(splits[8]);
+            this.activeFileSize = Long.parseLong(splits[9]);
+            this.unevictableSize = Long.parseLong(splits[10]);
+            this.hierarchicalMemoryLimitSize = Long.parseLong(splits[11]);
+            this.hierarchicalMemSwapLimitSize = Long.parseLong(splits[12]);
+            this.totalCacheSize = Long.parseLong(splits[13]);
+            this.totalRssSize = Long.parseLong(splits[14]);
+            this.totalMappedFileSize = Long.parseLong(splits[15]);
+            this.totalPgpginNum = Long.parseLong(splits[16]);
+            this.totalPgpgoutNum = Long.parseLong(splits[17]);
+            this.totalSwapSize = Long.parseLong(splits[18]);
+            this.totalInactiveAnonSize = Long.parseLong(splits[19]);
+            this.totalActiveAnonSize = Long.parseLong(splits[20]);
+            this.totalInactiveFileSize = Long.parseLong(splits[21]);
+            this.totalActiveFileSize = Long.parseLong(splits[22]);
+            this.totalUnevictableSize = Long.parseLong(splits[23]);
+            this.totalHierarchicalMemoryLimitSize = Long.parseLong(splits[24]);
+            this.totalHierarchicalMemSwapLimitSize = Long.parseLong(splits[25]);
+        }
+    }
+
+    public Stat getStat() throws IOException {
+        String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_STAT)).get(0);
+        Stat stat = new Stat(output);
+        return stat;
+    }
+
+    public long getPhysicalUsage() throws IOException {
+        return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_USAGE_IN_BYTES)).get(0));
+    }
+
+    public long getWithSwapUsage() throws IOException {
+        return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_USAGE_IN_BYTES)).get(0));
+    }
+
+    public long getMaxPhysicalUsage() throws IOException {
+        return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MAX_USAGE_IN_BYTES)).get(0));
+    }
+
+    public long getMaxWithSwapUsage() throws IOException {
+        return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_MAX_USAGE_IN_BYTES)).get(0));
+    }
+
+    public void setPhysicalUsageLimit(long value) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_LIMIT_IN_BYTES), String.valueOf(value));
+    }
+
+    public long getPhysicalUsageLimit() throws IOException {
+        return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_LIMIT_IN_BYTES)).get(0));
+    }
+
+    public void setWithSwapUsageLimit(long value) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES), String.valueOf(value));
+    }
+
+    public long getWithSwapUsageLimit() throws IOException {
+        return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES)).get(0));
+    }
+
+    public int getPhysicalFailCount() throws IOException {
+        return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_FAILCNT)).get(0));
+    }
+
+    public int getWithSwapFailCount() throws IOException {
+        return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_FAILCNT)).get(0));
+    }
+
+    public void clearForceEmpty() throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_FORCE_EMPTY), String.valueOf(0));
+    }
+
+    public void setSwappiness(int value) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_SWAPPINESS), String.valueOf(value));
+    }
+
+    public int getSwappiness() throws IOException {
+        return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_SWAPPINESS)).get(0));
+    }
+
+    public void setUseHierarchy(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_USE_HIERARCHY), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isUseHierarchy() throws IOException {
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_USE_HIERARCHY)).get(0));
+        return output > 0;
+    }
+
+    public void setOomControl(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_OOM_CONTROL), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isOomControl() throws IOException {
+        String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_OOM_CONTROL)).get(0);
+        output = output.split("\n")[0].split("[\\s]")[1];
+        int value = Integer.parseInt(output);
+        return value > 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java
new file mode 100755
index 0000000..d3dd5a7
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.Device;
+
+import java.io.IOException;
+
+public class NetClsCore implements CgroupCore {
+
+    public static final String NET_CLS_CLASSID = "/net_cls.classid";
+
+    private final String dir;
+
+    public NetClsCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.net_cls;
+    }
+
+    private StringBuilder toHex(int num) {
+        String hex = num + "";
+        StringBuilder sb = new StringBuilder();
+        int l = hex.length();
+        if (l > 4) {
+            hex = hex.substring(l - 4 - 1, l);
+        }
+        for (; l < 4; l++) {
+            sb.append('0');
+        }
+        sb.append(hex);
+        return sb;
+    }
+
+    public void setClassId(int major, int minor) throws IOException {
+        StringBuilder sb = new StringBuilder("0x");
+        sb.append(toHex(major));
+        sb.append(toHex(minor));
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, NET_CLS_CLASSID), sb.toString());
+    }
+
+    public Device getClassId() throws IOException {
+        String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, NET_CLS_CLASSID)).get(0);
+        output = Integer.toHexString(Integer.parseInt(output));
+        int major = Integer.parseInt(output.substring(0, output.length() - 4));
+        int minor = Integer.parseInt(output.substring(output.length() - 4));
+        return new Device(major, minor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java
new file mode 100755
index 0000000..b83b81a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class NetPrioCore implements CgroupCore {
+
+    public static final String NET_PRIO_PRIOIDX = "/net_prio.prioidx";
+    public static final String NET_PRIO_IFPRIOMAP = "/net_prio.ifpriomap";
+
+    private final String dir;
+
+    public NetPrioCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.net_prio;
+    }
+
+    public int getPrioId() throws IOException {
+        return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, NET_PRIO_PRIOIDX)).get(0));
+    }
+
+    public void setIfPrioMap(String iface, int priority) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        sb.append(iface);
+        sb.append(' ');
+        sb.append(priority);
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, NET_PRIO_IFPRIOMAP), sb.toString());
+    }
+
+    public Map<String, Integer> getIfPrioMap() throws IOException {
+        Map<String, Integer> result = new HashMap<String, Integer>();
+        List<String> strs = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, NET_PRIO_IFPRIOMAP));
+        for (String str : strs) {
+            String[] strArgs = str.split(" ");
+            result.put(strArgs[0], Integer.valueOf(strArgs[1]));
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/BatchBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/coordination/BatchBoltExecutor.java
new file mode 100644
index 0000000..2a9a1d8
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/coordination/BatchBoltExecutor.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.coordination;
+
+import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
+import org.apache.storm.coordination.CoordinatedBolt.TimeoutCallback;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.Utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCallback {
+    public static final Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class);
+
+    byte[] _boltSer;
+    Map<Object, IBatchBolt> _openTransactions;
+    Map _conf;
+    TopologyContext _context;
+    BatchOutputCollectorImpl _collector;
+    
+    public BatchBoltExecutor(IBatchBolt bolt) {
+        _boltSer = Utils.javaSerialize(bolt);
+    }
+    
+    @Override
+    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+        _conf = conf;
+        _context = context;
+        _collector = new BatchOutputCollectorImpl(collector);
+        _openTransactions = new HashMap<>();
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        Object id = input.getValue(0);
+        IBatchBolt bolt = getBatchBolt(id);
+        try {
+             bolt.execute(input);
+            _collector.ack(input);
+        } catch(FailedException e) {
+            LOG.error("Failed to process tuple in batch", e);
+            _collector.fail(input);                
+        }
+    }
+
+    @Override
+    public void cleanup() {
+    }
+
+    @Override
+    public void finishedId(Object id) {
+        IBatchBolt bolt = getBatchBolt(id);
+        _openTransactions.remove(id);
+        bolt.finishBatch();
+    }
+
+    @Override
+    public void timeoutId(Object attempt) {
+        _openTransactions.remove(attempt);        
+    }    
+    
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        newTransactionalBolt().declareOutputFields(declarer);
+    }
+    
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return newTransactionalBolt().getComponentConfiguration();
+    }
+    
+    private IBatchBolt getBatchBolt(Object id) {
+        IBatchBolt bolt = _openTransactions.get(id);
+        if(bolt==null) {
+            bolt = newTransactionalBolt();
+            bolt.prepare(_conf, _context, _collector, id);
+            _openTransactions.put(id, bolt);            
+        }
+        return bolt;
+    }
+    
+    private IBatchBolt newTransactionalBolt() {
+        return Utils.javaDeserialize(_boltSer, IBatchBolt.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
new file mode 100644
index 0000000..e1581eb
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.coordination;
+
+import org.apache.storm.utils.Utils;
+import java.util.List;
+
+public abstract class BatchOutputCollector {
+
+    /**
+     * Emits a tuple to the default output stream.
+     */
+    public List<Integer> emit(List<Object> tuple) {
+        return emit(Utils.DEFAULT_STREAM_ID, tuple);
+    }
+
+    public abstract List<Integer> emit(String streamId, List<Object> tuple);
+    
+    /**
+     * Emits a tuple to the specified task on the default output stream. This output
+     * stream must have been declared as a direct stream, and the specified task must
+     * use a direct grouping on this stream to receive the message.
+     */
+    public void emitDirect(int taskId, List<Object> tuple) {
+        emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
+    }
+    
+    public abstract void emitDirect(int taskId, String streamId, List<Object> tuple); 
+    
+    public abstract void reportError(Throwable error);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
new file mode 100644
index 0000000..246ae5d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.coordination;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
+import java.util.List;
+
+public class BatchOutputCollectorImpl extends BatchOutputCollector {
+    OutputCollector _collector;
+    
+    public BatchOutputCollectorImpl(OutputCollector collector) {
+        _collector = collector;
+    }
+    
+    @Override
+    public List<Integer> emit(String streamId, List<Object> tuple) {
+        return _collector.emit(streamId, tuple);
+    }
+
+    @Override
+    public void emitDirect(int taskId, String streamId, List<Object> tuple) {
+        _collector.emitDirect(taskId, streamId, tuple);
+    }
+
+    @Override
+    public void reportError(Throwable error) {
+        _collector.reportError(error);
+    }
+    
+    public void ack(Tuple tup) {
+        _collector.ack(tup);
+    }
+    
+    public void fail(Tuple tup) {
+        _collector.fail(tup);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java
new file mode 100644
index 0000000..bd72c89
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.coordination;
+
+import org.apache.storm.Constants;
+import org.apache.storm.coordination.CoordinatedBolt.SourceArgs;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.grouping.PartialKeyGrouping;
+import org.apache.storm.topology.BaseConfigurationDeclarer;
+import org.apache.storm.topology.BasicBoltExecutor;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.InputDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class BatchSubtopologyBuilder {
+    Map<String, Component> _bolts = new HashMap<String, Component>();
+    Component _masterBolt;
+    String _masterId;
+    
+    public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) {
+        Integer p = boltParallelism == null ? null : boltParallelism.intValue();
+        _masterBolt = new Component(new BasicBoltExecutor(masterBolt), p);
+        _masterId = masterBoltId;
+    }
+    
+    public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) {
+        this(masterBoltId, masterBolt, null);
+    }
+    
+    public BoltDeclarer getMasterDeclarer() {
+        return new BoltDeclarerImpl(_masterBolt);
+    }
+        
+    public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
+        return setBolt(id, bolt, null);
+    }
+    
+    public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
+        return setBolt(id, new BatchBoltExecutor(bolt), parallelism);
+    }     
+    
+    public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
+        return setBolt(id, bolt, null);
+    }    
+    
+    public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
+        return setBolt(id, new BasicBoltExecutor(bolt), parallelism);
+    }
+    
+    private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
+        Integer p = null;
+        if(parallelism!=null) p = parallelism.intValue();
+        Component component = new Component(bolt, p);
+        _bolts.put(id, component);
+        return new BoltDeclarerImpl(component);
+    }
+    
+    public void extendTopology(TopologyBuilder builder) {
+        BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
+        for(InputDeclaration decl: _masterBolt.declarations) {
+            decl.declare(declarer);
+        }
+        for(Map conf: _masterBolt.componentConfs) {
+            declarer.addConfigurations(conf);
+        }
+        for(String id: _bolts.keySet()) {
+            Component component = _bolts.get(id);
+            Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
+            for(String c: componentBoltSubscriptions(component)) {
+                SourceArgs source;
+                if(c.equals(_masterId)) {
+                    source = SourceArgs.single();
+                } else {
+                    source = SourceArgs.all();
+                }
+                coordinatedArgs.put(c, source);                    
+            }
+            
+
+            BoltDeclarer input = builder.setBolt(id,
+                                                  new CoordinatedBolt(component.bolt,
+                                                                      coordinatedArgs,
+                                                                      null),
+                                                  component.parallelism);
+            for(Map conf: component.componentConfs) {
+                input.addConfigurations(conf);
+            }
+            for(String c: componentBoltSubscriptions(component)) {
+                input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
+            }
+            for(InputDeclaration d: component.declarations) {
+                d.declare(input);
+            }
+        }        
+    }
+        
+    private Set<String> componentBoltSubscriptions(Component component) {
+        Set<String> ret = new HashSet<String>();
+        for(InputDeclaration d: component.declarations) {
+            ret.add(d.getComponent());
+        }
+        return ret;
+    }
+
+    private static class Component {
+        public IRichBolt bolt;
+        public Integer parallelism;
+        public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
+        public List<Map<String, Object>> componentConfs = new ArrayList<>();
+        
+        public Component(IRichBolt bolt, Integer parallelism) {
+            this.bolt = bolt;
+            this.parallelism = parallelism;
+        }
+    }
+    
+    private static interface InputDeclaration {
+        void declare(InputDeclarer declarer);
+        String getComponent();
+    }
+        
+    private static class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
+        Component _component;
+        
+        public BoltDeclarerImpl(Component component) {
+            _component = component;
+        }
+        
+        @Override
+        public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.fieldsGrouping(component, fields);
+                }
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.fieldsGrouping(component, streamId, fields);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer globalGrouping(final String component) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.globalGrouping(component);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer globalGrouping(final String component, final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.globalGrouping(component, streamId);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer shuffleGrouping(final String component) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.shuffleGrouping(component);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer shuffleGrouping(final String component, final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.shuffleGrouping(component, streamId);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer localOrShuffleGrouping(final String component) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.localOrShuffleGrouping(component);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.localOrShuffleGrouping(component, streamId);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+        
+        @Override
+        public BoltDeclarer noneGrouping(final String component) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.noneGrouping(component);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer noneGrouping(final String component, final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.noneGrouping(component, streamId);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer allGrouping(final String component) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.allGrouping(component);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer allGrouping(final String component, final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.allGrouping(component, streamId);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer directGrouping(final String component) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.directGrouping(component);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer directGrouping(final String component, final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.directGrouping(component, streamId);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
+            return customGrouping(componentId, new PartialKeyGrouping(fields));
+        }
+
+        @Override
+        public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
+            return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
+        }
+        
+        @Override
+        public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.customGrouping(component, grouping);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;        
+        }
+
+        @Override
+        public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.customGrouping(component, streamId, grouping);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.grouping(stream, grouping);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return stream.get_componentId();
+                }                
+            });
+            return this;
+        }
+        
+        private void addDeclaration(InputDeclaration declaration) {
+            _component.declarations.add(declaration);
+        }
+
+        @Override
+        public BoltDeclarer addConfigurations(Map<String, Object> conf) {
+            _component.componentConfs.add(conf);
+            return this;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java b/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
new file mode 100644
index 0000000..71919a0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.coordination;
+
+import org.apache.storm.topology.FailedException;
+import java.util.Map.Entry;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.generated.GlobalStreamId;
+import java.util.Collection;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.TimeCacheMap;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused
+ * in the case of retries.
+ */
+public class CoordinatedBolt implements IRichBolt {
+    public static final Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
+
+    public static interface FinishedCallback {
+        void finishedId(Object id);
+    }
+
+    public static interface TimeoutCallback {
+        void timeoutId(Object id);
+    }
+    
+    
+    public static class SourceArgs implements Serializable {
+        public boolean singleCount;
+
+        protected SourceArgs(boolean singleCount) {
+            this.singleCount = singleCount;
+        }
+
+        public static SourceArgs single() {
+            return new SourceArgs(true);
+        }
+
+        public static SourceArgs all() {
+            return new SourceArgs(false);
+        }
+        
+        @Override
+        public String toString() {
+            return "<Single: " + singleCount + ">";
+        }
+    }
+
+    public class CoordinatedOutputCollector implements IOutputCollector {
+        IOutputCollector _delegate;
+
+        public CoordinatedOutputCollector(IOutputCollector delegate) {
+            _delegate = delegate;
+        }
+
+        public List<Integer> emit(String stream, Collection<Tuple> anchors, List<Object> tuple) {
+            List<Integer> tasks = _delegate.emit(stream, anchors, tuple);
+            updateTaskCounts(tuple.get(0), tasks);
+            return tasks;
+        }
+
+        public void emitDirect(int task, String stream, Collection<Tuple> anchors, List<Object> tuple) {
+            updateTaskCounts(tuple.get(0), Arrays.asList(task));
+            _delegate.emitDirect(task, stream, anchors, tuple);
+        }
+
+        public void ack(Tuple tuple) {
+            Object id = tuple.getValue(0);
+            synchronized(_tracked) {
+                TrackingInfo track = _tracked.get(id);
+                if (track != null)
+                    track.receivedTuples++;
+            }
+            boolean failed = checkFinishId(tuple, TupleType.REGULAR);
+            if(failed) {
+                _delegate.fail(tuple);                
+            } else {
+                _delegate.ack(tuple);
+            }
+        }
+
+        public void fail(Tuple tuple) {
+            Object id = tuple.getValue(0);
+            synchronized(_tracked) {
+                TrackingInfo track = _tracked.get(id);
+                if (track != null)
+                    track.failed = true;
+            }
+            checkFinishId(tuple, TupleType.REGULAR);
+            _delegate.fail(tuple);
+        }
+
+        public void resetTimeout(Tuple tuple) {
+            _delegate.resetTimeout(tuple);
+        }
+        
+        public void reportError(Throwable error) {
+            _delegate.reportError(error);
+        }
+
+
+        private void updateTaskCounts(Object id, List<Integer> tasks) {
+            synchronized(_tracked) {
+                TrackingInfo track = _tracked.get(id);
+                if (track != null) {
+                    Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
+                    for(Integer task: tasks) {
+                        int newCount = Utils.get(taskEmittedTuples, task, 0) + 1;
+                        taskEmittedTuples.put(task, newCount);
+                    }
+                }
+            }
+        }
+    }
+
+    private Map<String, SourceArgs> _sourceArgs;
+    private IdStreamSpec _idStreamSpec;
+    private IRichBolt _delegate;
+    private Integer _numSourceReports;
+    private List<Integer> _countOutTasks = new ArrayList<>();
+    private OutputCollector _collector;
+    private TimeCacheMap<Object, TrackingInfo> _tracked;
+
+    public static class TrackingInfo {
+        int reportCount = 0;
+        int expectedTupleCount = 0;
+        int receivedTuples = 0;
+        boolean failed = false;
+        Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
+        boolean receivedId = false;
+        boolean finished = false;
+        List<Tuple> ackTuples = new ArrayList<>();
+        
+        @Override
+        public String toString() {
+            return "reportCount: " + reportCount + "\n" +
+                   "expectedTupleCount: " + expectedTupleCount + "\n" +
+                   "receivedTuples: " + receivedTuples + "\n" +
+                   "failed: " + failed + "\n" +
+                   taskEmittedTuples.toString();
+        }
+    }
+
+    
+    public static class IdStreamSpec implements Serializable {
+        GlobalStreamId _id;
+        
+        public GlobalStreamId getGlobalStreamId() {
+            return _id;
+        }
+
+        public static IdStreamSpec makeDetectSpec(String component, String stream) {
+            return new IdStreamSpec(component, stream);
+        }        
+        
+        protected IdStreamSpec(String component, String stream) {
+            _id = new GlobalStreamId(component, stream);
+        }
+    }
+    
+    public CoordinatedBolt(IRichBolt delegate) {
+        this(delegate, null, null);
+    }
+
+    public CoordinatedBolt(IRichBolt delegate, String sourceComponent, SourceArgs sourceArgs, IdStreamSpec idStreamSpec) {
+        this(delegate, singleSourceArgs(sourceComponent, sourceArgs), idStreamSpec);
+    }
+    
+    public CoordinatedBolt(IRichBolt delegate, Map<String, SourceArgs> sourceArgs, IdStreamSpec idStreamSpec) {
+        _sourceArgs = sourceArgs;
+        if(_sourceArgs==null) _sourceArgs = new HashMap<>();
+        _delegate = delegate;
+        _idStreamSpec = idStreamSpec;
+    }
+    
+    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
+        TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
+        if(_delegate instanceof TimeoutCallback) {
+            callback = new TimeoutItems();
+        }
+        _tracked = new TimeCacheMap<>(context.maxTopologyMessageTimeout(), callback);
+        _collector = collector;
+        _delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)));
+        for(String component: Utils.get(context.getThisTargets(),
+                                        Constants.COORDINATED_STREAM_ID,
+                                        new HashMap<String, Grouping>())
+                                        .keySet()) {
+            for(Integer task: context.getComponentTasks(component)) {
+                _countOutTasks.add(task);
+            }
+        }
+        if(!_sourceArgs.isEmpty()) {
+            _numSourceReports = 0;
+            for(Entry<String, SourceArgs> entry: _sourceArgs.entrySet()) {
+                if(entry.getValue().singleCount) {
+                    _numSourceReports+=1;
+                } else {
+                    _numSourceReports+=context.getComponentTasks(entry.getKey()).size();
+                }
+            }
+        }
+    }
+
+    private boolean checkFinishId(Tuple tup, TupleType type) {
+        Object id = tup.getValue(0);
+        boolean failed = false;
+        
+        synchronized(_tracked) {
+            TrackingInfo track = _tracked.get(id);
+            try {
+                if(track!=null) {
+                    boolean delayed = false;
+                    if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
+                        track.ackTuples.add(tup);
+                        delayed = true;
+                    }
+                    if(track.failed) {
+                        failed = true;
+                        for(Tuple t: track.ackTuples) {
+                            _collector.fail(t);
+                        }
+                        _tracked.remove(id);
+                    } else if(track.receivedId
+                             && (_sourceArgs.isEmpty() ||
+                                  track.reportCount==_numSourceReports &&
+                                  track.expectedTupleCount == track.receivedTuples)){
+                        if(_delegate instanceof FinishedCallback) {
+                            ((FinishedCallback)_delegate).finishedId(id);
+                        }
+                        if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
+                            throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
+                        }
+                        Iterator<Integer> outTasks = _countOutTasks.iterator();
+                        while(outTasks.hasNext()) {
+                            int task = outTasks.next();
+                            int numTuples = Utils.get(track.taskEmittedTuples, task, 0);
+                            _collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
+                        }
+                        for(Tuple t: track.ackTuples) {
+                            _collector.ack(t);
+                        }
+                        track.finished = true;
+                        _tracked.remove(id);
+                    }
+                    if(!delayed && type!=TupleType.REGULAR) {
+                        if(track.failed) {
+                            _collector.fail(tup);
+                        } else {
+                            _collector.ack(tup);                            
+                        }
+                    }
+                } else {
+                    if(type!=TupleType.REGULAR) _collector.fail(tup);
+                }
+            } catch(FailedException e) {
+                LOG.error("Failed to finish batch", e);
+                for(Tuple t: track.ackTuples) {
+                    _collector.fail(t);
+                }
+                _tracked.remove(id);
+                failed = true;
+            }
+        }
+        return failed;
+    }
+
+    public void execute(Tuple tuple) {
+        Object id = tuple.getValue(0);
+        TrackingInfo track;
+        TupleType type = getTupleType(tuple);
+        synchronized(_tracked) {
+            track = _tracked.get(id);
+            if(track==null) {
+                track = new TrackingInfo();
+                if(_idStreamSpec==null) track.receivedId = true;
+                _tracked.put(id, track);
+            }
+        }
+        
+        if(type==TupleType.ID) {
+            synchronized(_tracked) {
+                track.receivedId = true;
+            }
+            checkFinishId(tuple, type);            
+        } else if(type==TupleType.COORD) {
+            int count = (Integer) tuple.getValue(1);
+            synchronized(_tracked) {
+                track.reportCount++;
+                track.expectedTupleCount+=count;
+            }
+            checkFinishId(tuple, type);
+        } else {            
+            synchronized(_tracked) {
+                _delegate.execute(tuple);
+            }
+        }
+    }
+
+    public void cleanup() {
+        _delegate.cleanup();
+        _tracked.cleanup();
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        _delegate.declareOutputFields(declarer);
+        declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields("id", "count"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return _delegate.getComponentConfiguration();
+    }
+    
+    private static Map<String, SourceArgs> singleSourceArgs(String sourceComponent, SourceArgs sourceArgs) {
+        Map<String, SourceArgs> ret = new HashMap<>();
+        ret.put(sourceComponent, sourceArgs);
+        return ret;
+    }
+    
+    private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, TrackingInfo> {
+        @Override
+        public void expire(Object id, TrackingInfo val) {
+            synchronized(_tracked) {
+                // the combination of the lock and the finished flag ensure that
+                // an id is never timed out if it has been finished
+                val.failed = true;
+                if(!val.finished) {
+                    ((TimeoutCallback) _delegate).timeoutId(id);
+                }
+            }
+        }
+    }
+    
+    private TupleType getTupleType(Tuple tuple) {
+        if(_idStreamSpec!=null
+                && tuple.getSourceGlobalStreamId().equals(_idStreamSpec._id)) {
+            return TupleType.ID;
+        } else if(!_sourceArgs.isEmpty()
+                && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
+            return TupleType.COORD;
+        } else {
+            return TupleType.REGULAR;
+        }
+    }
+    
+    static enum TupleType {
+        REGULAR,
+        ID,
+        COORD
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/coordination/IBatchBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/IBatchBolt.java b/storm-client/src/jvm/org/apache/storm/coordination/IBatchBolt.java
new file mode 100644
index 0000000..7ea46b0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/coordination/IBatchBolt.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.coordination;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IComponent;
+import org.apache.storm.tuple.Tuple;
+import java.io.Serializable;
+import java.util.Map;
+
+public interface IBatchBolt<T> extends Serializable, IComponent {
+    void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
+    void execute(Tuple tuple);
+    void finishBatch();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Acker.java b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
new file mode 100644
index 0000000..339e91c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class Acker implements IBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
+
+    private static final long serialVersionUID = 4430906880683183091L;
+
+    public static final String ACKER_COMPONENT_ID = "__acker";
+    public static final String ACKER_INIT_STREAM_ID = "__ack_init";
+    public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
+    public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
+    public static final String ACKER_RESET_TIMEOUT_STREAM_ID = "__ack_reset_timeout";
+
+    public static final int TIMEOUT_BUCKET_NUM = 3;
+
+    private OutputCollector collector;
+    private RotatingMap<Object, AckObject> pending;
+
+    private class AckObject {
+        public long val = 0L;
+        public Integer spoutTask = null;
+        public boolean failed = false;
+        public long startTime = System.currentTimeMillis();
+
+        // val xor value
+        public void updateAck(Long value) {
+            val = Utils.bitXor(val, value);
+        }
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        if (TupleUtils.isTick(input)) {
+            Map<Object, AckObject> tmp = pending.rotate();
+            LOG.debug("Number of timeout tuples:{}", tmp.size());
+            return;
+        }
+
+        String streamId = input.getSourceStreamId();
+        Object id = input.getValue(0);
+        AckObject curr = pending.get(id);
+        if (ACKER_INIT_STREAM_ID.equals(streamId)) {
+            if (curr == null) {
+                curr = new AckObject();
+                curr.val = input.getLong(1);
+                curr.spoutTask = input.getInteger(2);
+                pending.put(id, curr);
+            } else {
+                // If receiving bolt's ack before the init message from spout, just update the xor value.
+                curr.updateAck(input.getLong(1));
+                curr.spoutTask = input.getInteger(2);
+            }
+        } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
+            if (curr != null) {
+                curr.updateAck(input.getLong(1));
+            } else {
+                curr = new AckObject();
+                curr.val = input.getLong(1);
+                pending.put(id, curr);
+            }
+        } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
+            // For the case that ack_fail message arrives before ack_init
+            if (curr == null) {
+                curr = new AckObject();
+            }
+            curr.failed = true;
+            pending.put(id, curr);
+        } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+            if (curr == null) {
+                curr = new AckObject();
+            }
+            pending.put(id, curr);
+        } else {
+            LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
+            return;
+        }
+
+        Integer task = curr.spoutTask;
+        if (curr != null && task != null) {
+            Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime));
+            if (curr.val == 0) {
+                pending.remove(id);
+                collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple);
+            } else if (curr.failed) {
+                pending.remove(id);
+                collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple);
+            } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+                collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, tuple);
+            }
+        }
+
+        collector.ack(input);
+    }
+
+    @Override
+    public void cleanup() {
+        LOG.info("Acker: cleanup successfully");
+    }
+
+    private long getTimeDeltaMillis(long startTimeMillis) {
+        return System.currentTimeMillis() - startTimeMillis;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/DaemonCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/DaemonCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/DaemonCommon.java
new file mode 100644
index 0000000..d1b71a7
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/DaemonCommon.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+public interface DaemonCommon {
+    public boolean isWaiting();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
new file mode 100644
index 0000000..c4fdaf9
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+
+import org.apache.storm.Config;
+import org.apache.storm.Thrift;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.grouping.LoadAwareShuffleGrouping;
+import org.apache.storm.grouping.LoadMapping;
+import org.apache.storm.grouping.ShuffleGrouping;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.TupleUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+public class GrouperFactory {
+
+    public static LoadAwareCustomStreamGrouping mkGrouper(WorkerTopologyContext context, String componentId, String streamId, Fields outFields,
+                                    Grouping thriftGrouping,
+                                    List<Integer> unsortedTargetTasks,
+                                    Map topoConf) {
+        List<Integer> targetTasks = Ordering.natural().sortedCopy(unsortedTargetTasks);
+        final boolean isNotLoadAware = (null != topoConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING) && (boolean) topoConf
+            .get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING));
+        CustomStreamGrouping result = null;
+        switch (Thrift.groupingType(thriftGrouping)) {
+            case FIELDS:
+                if (Thrift.isGlobalGrouping(thriftGrouping)) {
+                    result = new GlobalGrouper();
+                } else {
+                    result = new FieldsGrouper(outFields, thriftGrouping);
+                }
+                break;
+            case SHUFFLE:
+                if (isNotLoadAware) {
+                    result = new ShuffleGrouping();
+                } else {
+                    result = new LoadAwareShuffleGrouping();
+                }
+                break;
+            case ALL:
+                result = new AllGrouper();
+                break;
+            case LOCAL_OR_SHUFFLE:
+                // Prefer local tasks as target tasks if possible
+                Set<Integer> sameTasks = Sets.intersection(Sets.newHashSet(targetTasks), Sets.newHashSet(context.getThisWorkerTasks()));
+                targetTasks = (sameTasks.isEmpty()) ? targetTasks : new ArrayList<>(sameTasks);
+                if (isNotLoadAware) {
+                    result = new ShuffleGrouping();
+                } else {
+                    result = new LoadAwareShuffleGrouping();
+                }
+                break;
+            case NONE:
+                result = new NoneGrouper();
+                break;
+            case CUSTOM_OBJECT:
+                result = (CustomStreamGrouping) Thrift.instantiateJavaObject(thriftGrouping.get_custom_object());
+                break;
+            case CUSTOM_SERIALIZED:
+                result = Utils.javaDeserialize(thriftGrouping.get_custom_serialized(), CustomStreamGrouping.class);
+                break;
+            case DIRECT:
+                result = DIRECT;
+                break;
+            default:
+                result = null;
+                break;
+        }
+
+        if (null != result) {
+            result.prepare(context, new GlobalStreamId(componentId, streamId), targetTasks);
+        }
+
+        if (result instanceof LoadAwareCustomStreamGrouping) {
+            return (LoadAwareCustomStreamGrouping) result;
+        } else {
+            return new BasicLoadAwareCustomStreamGrouping (result);
+        }
+    }
+
+    /**
+     * A bridge between CustomStreamGrouping and LoadAwareCustomStreamGrouping
+     */
+    public static class BasicLoadAwareCustomStreamGrouping implements LoadAwareCustomStreamGrouping {
+
+        private final CustomStreamGrouping customStreamGrouping;
+
+        public BasicLoadAwareCustomStreamGrouping(CustomStreamGrouping customStreamGrouping) {
+            this.customStreamGrouping = customStreamGrouping;
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
+            return customStreamGrouping.chooseTasks(taskId, values);
+        }
+
+        @Override
+        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+            customStreamGrouping.prepare(context, stream, targetTasks);
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values) {
+            return customStreamGrouping.chooseTasks(taskId, values);
+        }
+    }
+
+    public static class FieldsGrouper implements CustomStreamGrouping {
+
+        private Fields outFields;
+        private List<Integer> targetTasks;
+        private Fields groupFields;
+        private int numTasks;
+
+        public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
+            this.outFields = outFields;
+            this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));
+
+        }
+
+        @Override
+        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+            this.targetTasks = targetTasks;
+            this.numTasks = targetTasks.size();
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values) {
+            int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
+            return Collections.singletonList(targetTasks.get(targetTaskIndex));
+        }
+
+    }
+
+    public static class GlobalGrouper implements CustomStreamGrouping {
+
+        private List<Integer> targetTasks;
+
+        public GlobalGrouper() {
+        }
+
+        @Override
+        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+            this.targetTasks = targetTasks;
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values) {
+            if (targetTasks.isEmpty()) {
+                return null;
+            }
+            // It's possible for target to have multiple tasks if it reads multiple sources
+            return Collections.singletonList(targetTasks.get(0));
+        }
+    }
+
+    public static class NoneGrouper implements CustomStreamGrouping {
+
+        private List<Integer> targetTasks;
+        private int numTasks;
+        private final Random random;
+
+        public NoneGrouper() {
+            random = new Random();
+        }
+
+        @Override
+        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+            this.targetTasks = targetTasks;
+            this.numTasks = targetTasks.size();
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values) {
+            int index = random.nextInt(numTasks);
+            return Collections.singletonList(targetTasks.get(index));
+        }
+    }
+
+    public static class AllGrouper implements CustomStreamGrouping {
+
+        private List<Integer> targetTasks;
+
+        @Override
+        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+            this.targetTasks = targetTasks;
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values) {
+            return targetTasks;
+        }
+    }
+
+    // A no-op grouper
+    public static final LoadAwareCustomStreamGrouping DIRECT = new LoadAwareCustomStreamGrouping() {
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
+            return null;
+        }
+
+        @Override
+        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+
+        }
+
+        @Override
+        public List<Integer> chooseTasks(int taskId, List<Object> values) {
+            return null;
+        }
+
+    };
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/Shutdownable.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Shutdownable.java b/storm-client/src/jvm/org/apache/storm/daemon/Shutdownable.java
new file mode 100644
index 0000000..fe9e6c9
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Shutdownable.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+public interface Shutdownable {
+    public void shutdown();
+}