You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:31 UTC
[42/52] [partial] storm git commit: STORM-2441 Break down
'storm-core' to extract client (worker) artifacts
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java
new file mode 100755
index 0000000..c38f5fe
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.Device;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class DevicesCore implements CgroupCore {
+
+ private final String dir;
+
+ private static final String DEVICES_ALLOW = "/devices.allow";
+ private static final String DEVICES_DENY = "/devices.deny";
+ private static final String DEVICES_LIST = "/devices.list";
+
+ private static final char TYPE_ALL = 'a';
+ private static final char TYPE_BLOCK = 'b';
+ private static final char TYPE_CHAR = 'c';
+
+ private static final int ACCESS_READ = 1;
+ private static final int ACCESS_WRITE = 2;
+ private static final int ACCESS_CREATE = 4;
+
+ private static final char ACCESS_READ_CH = 'r';
+ private static final char ACCESS_WRITE_CH = 'w';
+ private static final char ACCESS_CREATE_CH = 'm';
+
+ private static final Logger LOG = LoggerFactory.getLogger(DevicesCore.class);
+
+ public DevicesCore(String dir) {
+ this.dir = dir;
+ }
+
+ @Override
+ public SubSystemType getType() {
+ return SubSystemType.devices;
+ }
+
+ public static class Record {
+ Device device;
+ char type;
+ int accesses;
+
+ public Record(char type, Device device, int accesses) {
+ this.type = type;
+ this.device = device;
+ this.accesses = accesses;
+ }
+
+ public Record(String output) {
+ if (output.contains("*")) {
+ LOG.debug("Pre: {}", output);
+ output = output.replaceAll("\\*", "-1");
+ LOG.debug("After: {}",output);
+ }
+ String[] splits = output.split("[: ]");
+ type = splits[0].charAt(0);
+ int major = Integer.parseInt(splits[1]);
+ int minor = Integer.parseInt(splits[2]);
+ device = new Device(major, minor);
+ accesses = 0;
+ for (char c : splits[3].toCharArray()) {
+ if (c == ACCESS_READ_CH) {
+ accesses |= ACCESS_READ;
+ }
+ if (c == ACCESS_CREATE_CH) {
+ accesses |= ACCESS_CREATE;
+ }
+ if (c == ACCESS_WRITE_CH) {
+ accesses |= ACCESS_WRITE;
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(type);
+ sb.append(' ');
+ sb.append(device.major);
+ sb.append(':');
+ sb.append(device.minor);
+ sb.append(' ');
+ sb.append(getAccessesFlag(accesses));
+
+ return sb.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + accesses;
+ result = prime * result + ((device == null) ? 0 : device.hashCode());
+ result = prime * result + type;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Record other = (Record) obj;
+ if (accesses != other.accesses) {
+ return false;
+ }
+ if (device == null) {
+ if (other.device != null) {
+ return false;
+ }
+ } else if (!device.equals(other.device)) {
+ return false;
+ }
+ if (type != other.type) {
+ return false;
+ }
+ return true;
+ }
+
+ public static Record[] parseRecordList(List<String> output) {
+ Record[] records = new Record[output.size()];
+ for (int i = 0, l = output.size(); i < l; i++) {
+ records[i] = new Record(output.get(i));
+ }
+
+ return records;
+ }
+
+ public static StringBuilder getAccessesFlag(int accesses) {
+ StringBuilder sb = new StringBuilder();
+ if ((accesses & ACCESS_READ) != 0) {
+ sb.append(ACCESS_READ_CH);
+ }
+ if ((accesses & ACCESS_WRITE) != 0) {
+ sb.append(ACCESS_WRITE_CH);
+ }
+ if ((accesses & ACCESS_CREATE) != 0) {
+ sb.append(ACCESS_CREATE_CH);
+ }
+ return sb;
+ }
+ }
+
+ private void setPermission(String prop, char type, Device device, int accesses) throws IOException {
+ Record record = new Record(type, device, accesses);
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, prop), record.toString());
+ }
+
+ public void setAllow(char type, Device device, int accesses) throws IOException {
+ setPermission(DEVICES_ALLOW, type, device, accesses);
+ }
+
+ public void setDeny(char type, Device device, int accesses) throws IOException {
+ setPermission(DEVICES_DENY, type, device, accesses);
+ }
+
+ public Record[] getList() throws IOException {
+ List<String> output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, DEVICES_LIST));
+ return Record.parseRecordList(output);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java
new file mode 100755
index 0000000..89e13dd
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+
+public class FreezerCore implements CgroupCore {
+
+ public static final String FREEZER_STATE = "/freezer.state";
+
+ private final String dir;
+
+ public FreezerCore(String dir) {
+ this.dir = dir;
+ }
+
+ @Override
+ public SubSystemType getType() {
+ return SubSystemType.freezer;
+ }
+
+ public void setState(State state) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, FREEZER_STATE), state.name().toUpperCase());
+ }
+
+ public State getState() throws IOException {
+ return State.getStateValue(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, FREEZER_STATE)).get(0));
+ }
+
+ public enum State {
+ frozen, freezing, thawed;
+
+ public static State getStateValue(String state) {
+ if (state.equals("FROZEN")) {
+ return frozen;
+ }
+ else if (state.equals("FREEZING")) {
+ return freezing;
+ }
+ else if (state.equals("THAWED")) {
+ return thawed;
+ }
+ else {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java
new file mode 100755
index 0000000..9bd6a72
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+
+public class MemoryCore implements CgroupCore {
+
+ public static final String MEMORY_STAT = "/memory.stat";
+ public static final String MEMORY_USAGE_IN_BYTES = "/memory.usage_in_bytes";
+ public static final String MEMORY_MEMSW_USAGE_IN_BYTES = "/memory.memsw.usage_in_bytes";
+ public static final String MEMORY_MAX_USAGE_IN_BYTES = "/memory.max_usage_in_bytes";
+ public static final String MEMORY_MEMSW_MAX_USAGE_IN_BYTES = "/memory.memsw.max_usage_in_bytes";
+ public static final String MEMORY_LIMIT_IN_BYTES = "/memory.limit_in_bytes";
+ public static final String MEMORY_MEMSW_LIMIT_IN_BYTES = "/memory.memsw.limit_in_bytes";
+ public static final String MEMORY_FAILCNT = "/memory.failcnt";
+ public static final String MEMORY_MEMSW_FAILCNT = "/memory.memsw.failcnt";
+ public static final String MEMORY_FORCE_EMPTY = "/memory.force_empty";
+ public static final String MEMORY_SWAPPINESS = "/memory.swappiness";
+ public static final String MEMORY_USE_HIERARCHY = "/memory.use_hierarchy";
+ public static final String MEMORY_OOM_CONTROL = "/memory.oom_control";
+
+ private final String dir;
+
+ public MemoryCore(String dir) {
+ this.dir = dir;
+ }
+
+ @Override
+ public SubSystemType getType() {
+ return SubSystemType.memory;
+ }
+
+ public static class Stat {
+ public final long cacheSize;
+ public final long rssSize;
+ public final long mappedFileSize;
+ public final long pgpginNum;
+ public final long pgpgoutNum;
+ public final long swapSize;
+ public final long activeAnonSize;
+ public final long inactiveAnonSize;
+ public final long activeFileSize;
+ public final long inactiveFileSize;
+ public final long unevictableSize;
+ public final long hierarchicalMemoryLimitSize;
+ public final long hierarchicalMemSwapLimitSize;
+ public final long totalCacheSize;
+ public final long totalRssSize;
+ public final long totalMappedFileSize;
+ public final long totalPgpginNum;
+ public final long totalPgpgoutNum;
+ public final long totalSwapSize;
+ public final long totalActiveAnonSize;
+ public final long totalInactiveAnonSize;
+ public final long totalActiveFileSize;
+ public final long totalInactiveFileSize;
+ public final long totalUnevictableSize;
+ public final long totalHierarchicalMemoryLimitSize;
+ public final long totalHierarchicalMemSwapLimitSize;
+
+ public Stat(String output) {
+ String[] splits = output.split("\n");
+ this.cacheSize = Long.parseLong(splits[0]);
+ this.rssSize = Long.parseLong(splits[1]);
+ this.mappedFileSize = Long.parseLong(splits[2]);
+ this.pgpginNum = Long.parseLong(splits[3]);
+ this.pgpgoutNum = Long.parseLong(splits[4]);
+ this.swapSize = Long.parseLong(splits[5]);
+ this.inactiveAnonSize = Long.parseLong(splits[6]);
+ this.activeAnonSize = Long.parseLong(splits[7]);
+ this.inactiveFileSize = Long.parseLong(splits[8]);
+ this.activeFileSize = Long.parseLong(splits[9]);
+ this.unevictableSize = Long.parseLong(splits[10]);
+ this.hierarchicalMemoryLimitSize = Long.parseLong(splits[11]);
+ this.hierarchicalMemSwapLimitSize = Long.parseLong(splits[12]);
+ this.totalCacheSize = Long.parseLong(splits[13]);
+ this.totalRssSize = Long.parseLong(splits[14]);
+ this.totalMappedFileSize = Long.parseLong(splits[15]);
+ this.totalPgpginNum = Long.parseLong(splits[16]);
+ this.totalPgpgoutNum = Long.parseLong(splits[17]);
+ this.totalSwapSize = Long.parseLong(splits[18]);
+ this.totalInactiveAnonSize = Long.parseLong(splits[19]);
+ this.totalActiveAnonSize = Long.parseLong(splits[20]);
+ this.totalInactiveFileSize = Long.parseLong(splits[21]);
+ this.totalActiveFileSize = Long.parseLong(splits[22]);
+ this.totalUnevictableSize = Long.parseLong(splits[23]);
+ this.totalHierarchicalMemoryLimitSize = Long.parseLong(splits[24]);
+ this.totalHierarchicalMemSwapLimitSize = Long.parseLong(splits[25]);
+ }
+ }
+
+ public Stat getStat() throws IOException {
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_STAT)).get(0);
+ Stat stat = new Stat(output);
+ return stat;
+ }
+
+ public long getPhysicalUsage() throws IOException {
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_USAGE_IN_BYTES)).get(0));
+ }
+
+ public long getWithSwapUsage() throws IOException {
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_USAGE_IN_BYTES)).get(0));
+ }
+
+ public long getMaxPhysicalUsage() throws IOException {
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MAX_USAGE_IN_BYTES)).get(0));
+ }
+
+ public long getMaxWithSwapUsage() throws IOException {
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_MAX_USAGE_IN_BYTES)).get(0));
+ }
+
+ public void setPhysicalUsageLimit(long value) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_LIMIT_IN_BYTES), String.valueOf(value));
+ }
+
+ public long getPhysicalUsageLimit() throws IOException {
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_LIMIT_IN_BYTES)).get(0));
+ }
+
+ public void setWithSwapUsageLimit(long value) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES), String.valueOf(value));
+ }
+
+ public long getWithSwapUsageLimit() throws IOException {
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES)).get(0));
+ }
+
+ public int getPhysicalFailCount() throws IOException {
+ return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_FAILCNT)).get(0));
+ }
+
+ public int getWithSwapFailCount() throws IOException {
+ return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_FAILCNT)).get(0));
+ }
+
+ public void clearForceEmpty() throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_FORCE_EMPTY), String.valueOf(0));
+ }
+
+ public void setSwappiness(int value) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_SWAPPINESS), String.valueOf(value));
+ }
+
+ public int getSwappiness() throws IOException {
+ return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_SWAPPINESS)).get(0));
+ }
+
+ public void setUseHierarchy(boolean flag) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_USE_HIERARCHY), String.valueOf(flag ? 1 : 0));
+ }
+
+ public boolean isUseHierarchy() throws IOException {
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_USE_HIERARCHY)).get(0));
+ return output > 0;
+ }
+
+ public void setOomControl(boolean flag) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_OOM_CONTROL), String.valueOf(flag ? 1 : 0));
+ }
+
+ public boolean isOomControl() throws IOException {
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_OOM_CONTROL)).get(0);
+ output = output.split("\n")[0].split("[\\s]")[1];
+ int value = Integer.parseInt(output);
+ return value > 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java
new file mode 100755
index 0000000..d3dd5a7
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.Device;
+
+import java.io.IOException;
+
+public class NetClsCore implements CgroupCore {
+
+ public static final String NET_CLS_CLASSID = "/net_cls.classid";
+
+ private final String dir;
+
+ public NetClsCore(String dir) {
+ this.dir = dir;
+ }
+
+ @Override
+ public SubSystemType getType() {
+ return SubSystemType.net_cls;
+ }
+
+ private StringBuilder toHex(int num) {
+ String hex = num + "";
+ StringBuilder sb = new StringBuilder();
+ int l = hex.length();
+ if (l > 4) {
+ hex = hex.substring(l - 4 - 1, l);
+ }
+ for (; l < 4; l++) {
+ sb.append('0');
+ }
+ sb.append(hex);
+ return sb;
+ }
+
+ public void setClassId(int major, int minor) throws IOException {
+ StringBuilder sb = new StringBuilder("0x");
+ sb.append(toHex(major));
+ sb.append(toHex(minor));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, NET_CLS_CLASSID), sb.toString());
+ }
+
+ public Device getClassId() throws IOException {
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, NET_CLS_CLASSID)).get(0);
+ output = Integer.toHexString(Integer.parseInt(output));
+ int major = Integer.parseInt(output.substring(0, output.length() - 4));
+ int minor = Integer.parseInt(output.substring(output.length() - 4));
+ return new Device(major, minor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java
new file mode 100755
index 0000000..b83b81a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class NetPrioCore implements CgroupCore {
+
+ public static final String NET_PRIO_PRIOIDX = "/net_prio.prioidx";
+ public static final String NET_PRIO_IFPRIOMAP = "/net_prio.ifpriomap";
+
+ private final String dir;
+
+ public NetPrioCore(String dir) {
+ this.dir = dir;
+ }
+
+ @Override
+ public SubSystemType getType() {
+ return SubSystemType.net_prio;
+ }
+
+ public int getPrioId() throws IOException {
+ return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, NET_PRIO_PRIOIDX)).get(0));
+ }
+
+ public void setIfPrioMap(String iface, int priority) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append(iface);
+ sb.append(' ');
+ sb.append(priority);
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, NET_PRIO_IFPRIOMAP), sb.toString());
+ }
+
+ public Map<String, Integer> getIfPrioMap() throws IOException {
+ Map<String, Integer> result = new HashMap<String, Integer>();
+ List<String> strs = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, NET_PRIO_IFPRIOMAP));
+ for (String str : strs) {
+ String[] strArgs = str.split(" ");
+ result.put(strArgs[0], Integer.valueOf(strArgs[1]));
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/BatchBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/coordination/BatchBoltExecutor.java
new file mode 100644
index 0000000..2a9a1d8
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/coordination/BatchBoltExecutor.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.coordination;
+
+import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
+import org.apache.storm.coordination.CoordinatedBolt.TimeoutCallback;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.Utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCallback {
+ public static final Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class);
+
+ byte[] _boltSer;
+ Map<Object, IBatchBolt> _openTransactions;
+ Map _conf;
+ TopologyContext _context;
+ BatchOutputCollectorImpl _collector;
+
+ public BatchBoltExecutor(IBatchBolt bolt) {
+ _boltSer = Utils.javaSerialize(bolt);
+ }
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ _conf = conf;
+ _context = context;
+ _collector = new BatchOutputCollectorImpl(collector);
+ _openTransactions = new HashMap<>();
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ Object id = input.getValue(0);
+ IBatchBolt bolt = getBatchBolt(id);
+ try {
+ bolt.execute(input);
+ _collector.ack(input);
+ } catch(FailedException e) {
+ LOG.error("Failed to process tuple in batch", e);
+ _collector.fail(input);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public void finishedId(Object id) {
+ IBatchBolt bolt = getBatchBolt(id);
+ _openTransactions.remove(id);
+ bolt.finishBatch();
+ }
+
+ @Override
+ public void timeoutId(Object attempt) {
+ _openTransactions.remove(attempt);
+ }
+
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ newTransactionalBolt().declareOutputFields(declarer);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return newTransactionalBolt().getComponentConfiguration();
+ }
+
+ private IBatchBolt getBatchBolt(Object id) {
+ IBatchBolt bolt = _openTransactions.get(id);
+ if(bolt==null) {
+ bolt = newTransactionalBolt();
+ bolt.prepare(_conf, _context, _collector, id);
+ _openTransactions.put(id, bolt);
+ }
+ return bolt;
+ }
+
+ private IBatchBolt newTransactionalBolt() {
+ return Utils.javaDeserialize(_boltSer, IBatchBolt.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
new file mode 100644
index 0000000..e1581eb
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.coordination;
+
+import org.apache.storm.utils.Utils;
+import java.util.List;
+
+public abstract class BatchOutputCollector {
+
+ /**
+ * Emits a tuple to the default output stream.
+ */
+ public List<Integer> emit(List<Object> tuple) {
+ return emit(Utils.DEFAULT_STREAM_ID, tuple);
+ }
+
+ public abstract List<Integer> emit(String streamId, List<Object> tuple);
+
+ /**
+ * Emits a tuple to the specified task on the default output stream. This output
+ * stream must have been declared as a direct stream, and the specified task must
+ * use a direct grouping on this stream to receive the message.
+ */
+ public void emitDirect(int taskId, List<Object> tuple) {
+ emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
+ }
+
+ public abstract void emitDirect(int taskId, String streamId, List<Object> tuple);
+
+ public abstract void reportError(Throwable error);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
new file mode 100644
index 0000000..246ae5d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.coordination;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
+import java.util.List;
+
+public class BatchOutputCollectorImpl extends BatchOutputCollector {
+ OutputCollector _collector;
+
+ public BatchOutputCollectorImpl(OutputCollector collector) {
+ _collector = collector;
+ }
+
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple) {
+ return _collector.emit(streamId, tuple);
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId, List<Object> tuple) {
+ _collector.emitDirect(taskId, streamId, tuple);
+ }
+
+ @Override
+ public void reportError(Throwable error) {
+ _collector.reportError(error);
+ }
+
+ public void ack(Tuple tup) {
+ _collector.ack(tup);
+ }
+
+ public void fail(Tuple tup) {
+ _collector.fail(tup);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java
new file mode 100644
index 0000000..bd72c89
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.coordination;
+
+import org.apache.storm.Constants;
+import org.apache.storm.coordination.CoordinatedBolt.SourceArgs;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.grouping.PartialKeyGrouping;
+import org.apache.storm.topology.BaseConfigurationDeclarer;
+import org.apache.storm.topology.BasicBoltExecutor;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.InputDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class BatchSubtopologyBuilder {
+ Map<String, Component> _bolts = new HashMap<String, Component>();
+ Component _masterBolt;
+ String _masterId;
+
+ public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) {
+ Integer p = boltParallelism == null ? null : boltParallelism.intValue();
+ _masterBolt = new Component(new BasicBoltExecutor(masterBolt), p);
+ _masterId = masterBoltId;
+ }
+
+ public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) {
+ this(masterBoltId, masterBolt, null);
+ }
+
+ public BoltDeclarer getMasterDeclarer() {
+ return new BoltDeclarerImpl(_masterBolt);
+ }
+
+ public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
+ return setBolt(id, bolt, null);
+ }
+
+ public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
+ return setBolt(id, new BatchBoltExecutor(bolt), parallelism);
+ }
+
+ public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
+ return setBolt(id, bolt, null);
+ }
+
+ public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
+ return setBolt(id, new BasicBoltExecutor(bolt), parallelism);
+ }
+
+ private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
+ Integer p = null;
+ if(parallelism!=null) p = parallelism.intValue();
+ Component component = new Component(bolt, p);
+ _bolts.put(id, component);
+ return new BoltDeclarerImpl(component);
+ }
+
+ public void extendTopology(TopologyBuilder builder) {
+ BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
+ for(InputDeclaration decl: _masterBolt.declarations) {
+ decl.declare(declarer);
+ }
+ for(Map conf: _masterBolt.componentConfs) {
+ declarer.addConfigurations(conf);
+ }
+ for(String id: _bolts.keySet()) {
+ Component component = _bolts.get(id);
+ Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
+ for(String c: componentBoltSubscriptions(component)) {
+ SourceArgs source;
+ if(c.equals(_masterId)) {
+ source = SourceArgs.single();
+ } else {
+ source = SourceArgs.all();
+ }
+ coordinatedArgs.put(c, source);
+ }
+
+
+ BoltDeclarer input = builder.setBolt(id,
+ new CoordinatedBolt(component.bolt,
+ coordinatedArgs,
+ null),
+ component.parallelism);
+ for(Map conf: component.componentConfs) {
+ input.addConfigurations(conf);
+ }
+ for(String c: componentBoltSubscriptions(component)) {
+ input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
+ }
+ for(InputDeclaration d: component.declarations) {
+ d.declare(input);
+ }
+ }
+ }
+
+ private Set<String> componentBoltSubscriptions(Component component) {
+ Set<String> ret = new HashSet<String>();
+ for(InputDeclaration d: component.declarations) {
+ ret.add(d.getComponent());
+ }
+ return ret;
+ }
+
+ private static class Component {
+ public IRichBolt bolt;
+ public Integer parallelism;
+ public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
+ public List<Map<String, Object>> componentConfs = new ArrayList<>();
+
+ public Component(IRichBolt bolt, Integer parallelism) {
+ this.bolt = bolt;
+ this.parallelism = parallelism;
+ }
+ }
+
+ private static interface InputDeclaration {
+ void declare(InputDeclarer declarer);
+ String getComponent();
+ }
+
+ private static class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
+ Component _component;
+
+ public BoltDeclarerImpl(Component component) {
+ _component = component;
+ }
+
+ @Override
+ public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.fieldsGrouping(component, fields);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.fieldsGrouping(component, streamId, fields);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer globalGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.globalGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer globalGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.globalGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer shuffleGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.shuffleGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer shuffleGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.shuffleGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer localOrShuffleGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.localOrShuffleGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.localOrShuffleGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer noneGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.noneGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer noneGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.noneGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer allGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.allGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer allGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.allGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer directGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.directGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer directGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.directGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
+ return customGrouping(componentId, new PartialKeyGrouping(fields));
+ }
+
+ @Override
+ public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
+ return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
+ }
+
+ @Override
+ public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.customGrouping(component, grouping);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.customGrouping(component, streamId, grouping);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.grouping(stream, grouping);
+ }
+
+ @Override
+ public String getComponent() {
+ return stream.get_componentId();
+ }
+ });
+ return this;
+ }
+
+ private void addDeclaration(InputDeclaration declaration) {
+ _component.declarations.add(declaration);
+ }
+
+ @Override
+ public BoltDeclarer addConfigurations(Map<String, Object> conf) {
+ _component.componentConfs.add(conf);
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java b/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
new file mode 100644
index 0000000..71919a0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.coordination;
+
+import org.apache.storm.topology.FailedException;
+import java.util.Map.Entry;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.generated.GlobalStreamId;
+import java.util.Collection;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.TimeCacheMap;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused
+ * in the case of retries.
+ */
+public class CoordinatedBolt implements IRichBolt {
+ public static final Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
+
+ public static interface FinishedCallback {
+ void finishedId(Object id);
+ }
+
+ public static interface TimeoutCallback {
+ void timeoutId(Object id);
+ }
+
+
+ public static class SourceArgs implements Serializable {
+ public boolean singleCount;
+
+ protected SourceArgs(boolean singleCount) {
+ this.singleCount = singleCount;
+ }
+
+ public static SourceArgs single() {
+ return new SourceArgs(true);
+ }
+
+ public static SourceArgs all() {
+ return new SourceArgs(false);
+ }
+
+ @Override
+ public String toString() {
+ return "<Single: " + singleCount + ">";
+ }
+ }
+
+ public class CoordinatedOutputCollector implements IOutputCollector {
+ IOutputCollector _delegate;
+
+ public CoordinatedOutputCollector(IOutputCollector delegate) {
+ _delegate = delegate;
+ }
+
+ public List<Integer> emit(String stream, Collection<Tuple> anchors, List<Object> tuple) {
+ List<Integer> tasks = _delegate.emit(stream, anchors, tuple);
+ updateTaskCounts(tuple.get(0), tasks);
+ return tasks;
+ }
+
+ public void emitDirect(int task, String stream, Collection<Tuple> anchors, List<Object> tuple) {
+ updateTaskCounts(tuple.get(0), Arrays.asList(task));
+ _delegate.emitDirect(task, stream, anchors, tuple);
+ }
+
+ public void ack(Tuple tuple) {
+ Object id = tuple.getValue(0);
+ synchronized(_tracked) {
+ TrackingInfo track = _tracked.get(id);
+ if (track != null)
+ track.receivedTuples++;
+ }
+ boolean failed = checkFinishId(tuple, TupleType.REGULAR);
+ if(failed) {
+ _delegate.fail(tuple);
+ } else {
+ _delegate.ack(tuple);
+ }
+ }
+
+ public void fail(Tuple tuple) {
+ Object id = tuple.getValue(0);
+ synchronized(_tracked) {
+ TrackingInfo track = _tracked.get(id);
+ if (track != null)
+ track.failed = true;
+ }
+ checkFinishId(tuple, TupleType.REGULAR);
+ _delegate.fail(tuple);
+ }
+
+ public void resetTimeout(Tuple tuple) {
+ _delegate.resetTimeout(tuple);
+ }
+
+ public void reportError(Throwable error) {
+ _delegate.reportError(error);
+ }
+
+
+ private void updateTaskCounts(Object id, List<Integer> tasks) {
+ synchronized(_tracked) {
+ TrackingInfo track = _tracked.get(id);
+ if (track != null) {
+ Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
+ for(Integer task: tasks) {
+ int newCount = Utils.get(taskEmittedTuples, task, 0) + 1;
+ taskEmittedTuples.put(task, newCount);
+ }
+ }
+ }
+ }
+ }
+
+ private Map<String, SourceArgs> _sourceArgs;
+ private IdStreamSpec _idStreamSpec;
+ private IRichBolt _delegate;
+ private Integer _numSourceReports;
+ private List<Integer> _countOutTasks = new ArrayList<>();
+ private OutputCollector _collector;
+ private TimeCacheMap<Object, TrackingInfo> _tracked;
+
+ public static class TrackingInfo {
+ int reportCount = 0;
+ int expectedTupleCount = 0;
+ int receivedTuples = 0;
+ boolean failed = false;
+ Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
+ boolean receivedId = false;
+ boolean finished = false;
+ List<Tuple> ackTuples = new ArrayList<>();
+
+ @Override
+ public String toString() {
+ return "reportCount: " + reportCount + "\n" +
+ "expectedTupleCount: " + expectedTupleCount + "\n" +
+ "receivedTuples: " + receivedTuples + "\n" +
+ "failed: " + failed + "\n" +
+ taskEmittedTuples.toString();
+ }
+ }
+
+
+ public static class IdStreamSpec implements Serializable {
+ GlobalStreamId _id;
+
+ public GlobalStreamId getGlobalStreamId() {
+ return _id;
+ }
+
+ public static IdStreamSpec makeDetectSpec(String component, String stream) {
+ return new IdStreamSpec(component, stream);
+ }
+
+ protected IdStreamSpec(String component, String stream) {
+ _id = new GlobalStreamId(component, stream);
+ }
+ }
+
+ public CoordinatedBolt(IRichBolt delegate) {
+ this(delegate, null, null);
+ }
+
+ public CoordinatedBolt(IRichBolt delegate, String sourceComponent, SourceArgs sourceArgs, IdStreamSpec idStreamSpec) {
+ this(delegate, singleSourceArgs(sourceComponent, sourceArgs), idStreamSpec);
+ }
+
+ public CoordinatedBolt(IRichBolt delegate, Map<String, SourceArgs> sourceArgs, IdStreamSpec idStreamSpec) {
+ _sourceArgs = sourceArgs;
+ if(_sourceArgs==null) _sourceArgs = new HashMap<>();
+ _delegate = delegate;
+ _idStreamSpec = idStreamSpec;
+ }
+
+ public void prepare(Map config, TopologyContext context, OutputCollector collector) {
+ TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
+ if(_delegate instanceof TimeoutCallback) {
+ callback = new TimeoutItems();
+ }
+ _tracked = new TimeCacheMap<>(context.maxTopologyMessageTimeout(), callback);
+ _collector = collector;
+ _delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)));
+ for(String component: Utils.get(context.getThisTargets(),
+ Constants.COORDINATED_STREAM_ID,
+ new HashMap<String, Grouping>())
+ .keySet()) {
+ for(Integer task: context.getComponentTasks(component)) {
+ _countOutTasks.add(task);
+ }
+ }
+ if(!_sourceArgs.isEmpty()) {
+ _numSourceReports = 0;
+ for(Entry<String, SourceArgs> entry: _sourceArgs.entrySet()) {
+ if(entry.getValue().singleCount) {
+ _numSourceReports+=1;
+ } else {
+ _numSourceReports+=context.getComponentTasks(entry.getKey()).size();
+ }
+ }
+ }
+ }
+
+ private boolean checkFinishId(Tuple tup, TupleType type) {
+ Object id = tup.getValue(0);
+ boolean failed = false;
+
+ synchronized(_tracked) {
+ TrackingInfo track = _tracked.get(id);
+ try {
+ if(track!=null) {
+ boolean delayed = false;
+ if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
+ track.ackTuples.add(tup);
+ delayed = true;
+ }
+ if(track.failed) {
+ failed = true;
+ for(Tuple t: track.ackTuples) {
+ _collector.fail(t);
+ }
+ _tracked.remove(id);
+ } else if(track.receivedId
+ && (_sourceArgs.isEmpty() ||
+ track.reportCount==_numSourceReports &&
+ track.expectedTupleCount == track.receivedTuples)){
+ if(_delegate instanceof FinishedCallback) {
+ ((FinishedCallback)_delegate).finishedId(id);
+ }
+ if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
+ throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
+ }
+ Iterator<Integer> outTasks = _countOutTasks.iterator();
+ while(outTasks.hasNext()) {
+ int task = outTasks.next();
+ int numTuples = Utils.get(track.taskEmittedTuples, task, 0);
+ _collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
+ }
+ for(Tuple t: track.ackTuples) {
+ _collector.ack(t);
+ }
+ track.finished = true;
+ _tracked.remove(id);
+ }
+ if(!delayed && type!=TupleType.REGULAR) {
+ if(track.failed) {
+ _collector.fail(tup);
+ } else {
+ _collector.ack(tup);
+ }
+ }
+ } else {
+ if(type!=TupleType.REGULAR) _collector.fail(tup);
+ }
+ } catch(FailedException e) {
+ LOG.error("Failed to finish batch", e);
+ for(Tuple t: track.ackTuples) {
+ _collector.fail(t);
+ }
+ _tracked.remove(id);
+ failed = true;
+ }
+ }
+ return failed;
+ }
+
+ public void execute(Tuple tuple) {
+ Object id = tuple.getValue(0);
+ TrackingInfo track;
+ TupleType type = getTupleType(tuple);
+ synchronized(_tracked) {
+ track = _tracked.get(id);
+ if(track==null) {
+ track = new TrackingInfo();
+ if(_idStreamSpec==null) track.receivedId = true;
+ _tracked.put(id, track);
+ }
+ }
+
+ if(type==TupleType.ID) {
+ synchronized(_tracked) {
+ track.receivedId = true;
+ }
+ checkFinishId(tuple, type);
+ } else if(type==TupleType.COORD) {
+ int count = (Integer) tuple.getValue(1);
+ synchronized(_tracked) {
+ track.reportCount++;
+ track.expectedTupleCount+=count;
+ }
+ checkFinishId(tuple, type);
+ } else {
+ synchronized(_tracked) {
+ _delegate.execute(tuple);
+ }
+ }
+ }
+
+ public void cleanup() {
+ _delegate.cleanup();
+ _tracked.cleanup();
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ _delegate.declareOutputFields(declarer);
+ declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields("id", "count"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return _delegate.getComponentConfiguration();
+ }
+
+ private static Map<String, SourceArgs> singleSourceArgs(String sourceComponent, SourceArgs sourceArgs) {
+ Map<String, SourceArgs> ret = new HashMap<>();
+ ret.put(sourceComponent, sourceArgs);
+ return ret;
+ }
+
+ private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, TrackingInfo> {
+ @Override
+ public void expire(Object id, TrackingInfo val) {
+ synchronized(_tracked) {
+ // the combination of the lock and the finished flag ensure that
+ // an id is never timed out if it has been finished
+ val.failed = true;
+ if(!val.finished) {
+ ((TimeoutCallback) _delegate).timeoutId(id);
+ }
+ }
+ }
+ }
+
+ private TupleType getTupleType(Tuple tuple) {
+ if(_idStreamSpec!=null
+ && tuple.getSourceGlobalStreamId().equals(_idStreamSpec._id)) {
+ return TupleType.ID;
+ } else if(!_sourceArgs.isEmpty()
+ && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
+ return TupleType.COORD;
+ } else {
+ return TupleType.REGULAR;
+ }
+ }
+
+ static enum TupleType {
+ REGULAR,
+ ID,
+ COORD
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/coordination/IBatchBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/IBatchBolt.java b/storm-client/src/jvm/org/apache/storm/coordination/IBatchBolt.java
new file mode 100644
index 0000000..7ea46b0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/coordination/IBatchBolt.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.coordination;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IComponent;
+import org.apache.storm.tuple.Tuple;
+import java.io.Serializable;
+import java.util.Map;
+
+public interface IBatchBolt<T> extends Serializable, IComponent {
+ void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
+ void execute(Tuple tuple);
+ void finishBatch();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Acker.java b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
new file mode 100644
index 0000000..339e91c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class Acker implements IBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
+
+ private static final long serialVersionUID = 4430906880683183091L;
+
+ public static final String ACKER_COMPONENT_ID = "__acker";
+ public static final String ACKER_INIT_STREAM_ID = "__ack_init";
+ public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
+ public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
+ public static final String ACKER_RESET_TIMEOUT_STREAM_ID = "__ack_reset_timeout";
+
+ public static final int TIMEOUT_BUCKET_NUM = 3;
+
+ private OutputCollector collector;
+ private RotatingMap<Object, AckObject> pending;
+
+ private class AckObject {
+ public long val = 0L;
+ public Integer spoutTask = null;
+ public boolean failed = false;
+ public long startTime = System.currentTimeMillis();
+
+ // val xor value
+ public void updateAck(Long value) {
+ val = Utils.bitXor(val, value);
+ }
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if (TupleUtils.isTick(input)) {
+ Map<Object, AckObject> tmp = pending.rotate();
+ LOG.debug("Number of timeout tuples:{}", tmp.size());
+ return;
+ }
+
+ String streamId = input.getSourceStreamId();
+ Object id = input.getValue(0);
+ AckObject curr = pending.get(id);
+ if (ACKER_INIT_STREAM_ID.equals(streamId)) {
+ if (curr == null) {
+ curr = new AckObject();
+ curr.val = input.getLong(1);
+ curr.spoutTask = input.getInteger(2);
+ pending.put(id, curr);
+ } else {
+ // If receiving bolt's ack before the init message from spout, just update the xor value.
+ curr.updateAck(input.getLong(1));
+ curr.spoutTask = input.getInteger(2);
+ }
+ } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
+ if (curr != null) {
+ curr.updateAck(input.getLong(1));
+ } else {
+ curr = new AckObject();
+ curr.val = input.getLong(1);
+ pending.put(id, curr);
+ }
+ } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
+ // For the case that ack_fail message arrives before ack_init
+ if (curr == null) {
+ curr = new AckObject();
+ }
+ curr.failed = true;
+ pending.put(id, curr);
+ } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+ if (curr == null) {
+ curr = new AckObject();
+ }
+ pending.put(id, curr);
+ } else {
+ LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
+ return;
+ }
+
+ Integer task = curr.spoutTask;
+ if (curr != null && task != null) {
+ Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime));
+ if (curr.val == 0) {
+ pending.remove(id);
+ collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple);
+ } else if (curr.failed) {
+ pending.remove(id);
+ collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple);
+ } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+ collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, tuple);
+ }
+ }
+
+ collector.ack(input);
+ }
+
+ @Override
+ public void cleanup() {
+ LOG.info("Acker: cleanup successfully");
+ }
+
+ private long getTimeDeltaMillis(long startTimeMillis) {
+ return System.currentTimeMillis() - startTimeMillis;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/DaemonCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/DaemonCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/DaemonCommon.java
new file mode 100644
index 0000000..d1b71a7
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/DaemonCommon.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+public interface DaemonCommon {
+ public boolean isWaiting();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
new file mode 100644
index 0000000..c4fdaf9
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+
+import org.apache.storm.Config;
+import org.apache.storm.Thrift;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.grouping.LoadAwareShuffleGrouping;
+import org.apache.storm.grouping.LoadMapping;
+import org.apache.storm.grouping.ShuffleGrouping;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.TupleUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+public class GrouperFactory {
+
+ public static LoadAwareCustomStreamGrouping mkGrouper(WorkerTopologyContext context, String componentId, String streamId, Fields outFields,
+ Grouping thriftGrouping,
+ List<Integer> unsortedTargetTasks,
+ Map topoConf) {
+ List<Integer> targetTasks = Ordering.natural().sortedCopy(unsortedTargetTasks);
+ final boolean isNotLoadAware = (null != topoConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING) && (boolean) topoConf
+ .get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING));
+ CustomStreamGrouping result = null;
+ switch (Thrift.groupingType(thriftGrouping)) {
+ case FIELDS:
+ if (Thrift.isGlobalGrouping(thriftGrouping)) {
+ result = new GlobalGrouper();
+ } else {
+ result = new FieldsGrouper(outFields, thriftGrouping);
+ }
+ break;
+ case SHUFFLE:
+ if (isNotLoadAware) {
+ result = new ShuffleGrouping();
+ } else {
+ result = new LoadAwareShuffleGrouping();
+ }
+ break;
+ case ALL:
+ result = new AllGrouper();
+ break;
+ case LOCAL_OR_SHUFFLE:
+ // Prefer local tasks as target tasks if possible
+ Set<Integer> sameTasks = Sets.intersection(Sets.newHashSet(targetTasks), Sets.newHashSet(context.getThisWorkerTasks()));
+ targetTasks = (sameTasks.isEmpty()) ? targetTasks : new ArrayList<>(sameTasks);
+ if (isNotLoadAware) {
+ result = new ShuffleGrouping();
+ } else {
+ result = new LoadAwareShuffleGrouping();
+ }
+ break;
+ case NONE:
+ result = new NoneGrouper();
+ break;
+ case CUSTOM_OBJECT:
+ result = (CustomStreamGrouping) Thrift.instantiateJavaObject(thriftGrouping.get_custom_object());
+ break;
+ case CUSTOM_SERIALIZED:
+ result = Utils.javaDeserialize(thriftGrouping.get_custom_serialized(), CustomStreamGrouping.class);
+ break;
+ case DIRECT:
+ result = DIRECT;
+ break;
+ default:
+ result = null;
+ break;
+ }
+
+ if (null != result) {
+ result.prepare(context, new GlobalStreamId(componentId, streamId), targetTasks);
+ }
+
+ if (result instanceof LoadAwareCustomStreamGrouping) {
+ return (LoadAwareCustomStreamGrouping) result;
+ } else {
+ return new BasicLoadAwareCustomStreamGrouping (result);
+ }
+ }
+
+ /**
+ * A bridge between CustomStreamGrouping and LoadAwareCustomStreamGrouping
+ */
+ public static class BasicLoadAwareCustomStreamGrouping implements LoadAwareCustomStreamGrouping {
+
+ private final CustomStreamGrouping customStreamGrouping;
+
+ public BasicLoadAwareCustomStreamGrouping(CustomStreamGrouping customStreamGrouping) {
+ this.customStreamGrouping = customStreamGrouping;
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
+ return customStreamGrouping.chooseTasks(taskId, values);
+ }
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ customStreamGrouping.prepare(context, stream, targetTasks);
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ return customStreamGrouping.chooseTasks(taskId, values);
+ }
+ }
+
+ public static class FieldsGrouper implements CustomStreamGrouping {
+
+ private Fields outFields;
+ private List<Integer> targetTasks;
+ private Fields groupFields;
+ private int numTasks;
+
+ public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
+ this.outFields = outFields;
+ this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));
+
+ }
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ this.targetTasks = targetTasks;
+ this.numTasks = targetTasks.size();
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
+ return Collections.singletonList(targetTasks.get(targetTaskIndex));
+ }
+
+ }
+
+ public static class GlobalGrouper implements CustomStreamGrouping {
+
+ private List<Integer> targetTasks;
+
+ public GlobalGrouper() {
+ }
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ this.targetTasks = targetTasks;
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ if (targetTasks.isEmpty()) {
+ return null;
+ }
+ // It's possible for target to have multiple tasks if it reads multiple sources
+ return Collections.singletonList(targetTasks.get(0));
+ }
+ }
+
+ public static class NoneGrouper implements CustomStreamGrouping {
+
+ private List<Integer> targetTasks;
+ private int numTasks;
+ private final Random random;
+
+ public NoneGrouper() {
+ random = new Random();
+ }
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ this.targetTasks = targetTasks;
+ this.numTasks = targetTasks.size();
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ int index = random.nextInt(numTasks);
+ return Collections.singletonList(targetTasks.get(index));
+ }
+ }
+
+ public static class AllGrouper implements CustomStreamGrouping {
+
+ private List<Integer> targetTasks;
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ this.targetTasks = targetTasks;
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ return targetTasks;
+ }
+ }
+
+ // A no-op grouper
+ public static final LoadAwareCustomStreamGrouping DIRECT = new LoadAwareCustomStreamGrouping() {
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
+ return null;
+ }
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ return null;
+ }
+
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/daemon/Shutdownable.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Shutdownable.java b/storm-client/src/jvm/org/apache/storm/daemon/Shutdownable.java
new file mode 100644
index 0000000..fe9e6c9
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Shutdownable.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+public interface Shutdownable {
+ public void shutdown();
+}