You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/03/28 21:22:19 UTC
[09/10] storm git commit: [STORM-2693] Heartbeats and assignments
promotion for storm2.0
[STORM-2693] Heartbeats and assignments promotion for storm2.0
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0dac58b0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0dac58b0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0dac58b0
Branch: refs/heads/master
Commit: 0dac58b0aa82133df242b3b2ebeb65bfea7d63cc
Parents: adf4efb
Author: chenyuzhao <ch...@meituan.com>
Authored: Wed Mar 28 23:22:03 2018 +0800
Committer: chenyuzhao <ch...@meituan.com>
Committed: Wed Mar 28 23:22:03 2018 +0800
----------------------------------------------------------------------
conf/defaults.yaml | 16 +
.../src/jvm/org/apache/storm/Config.java | 72 +-
.../src/jvm/org/apache/storm/StormTimer.java | 24 +-
.../src/jvm/org/apache/storm/Thrift.java | 42 +-
.../assignments/ILocalAssignmentsBackend.java | 116 +
.../assignments/InMemoryAssignmentBackend.java | 158 +
.../LocalAssignmentsBackendFactory.java | 48 +
.../org/apache/storm/cluster/ClusterUtils.java | 19 +-
.../org/apache/storm/cluster/DaemonType.java | 2 +
.../storm/cluster/IStormClusterState.java | 78 +-
.../storm/cluster/StormClusterStateImpl.java | 124 +-
.../apache/storm/cluster/ZKStateStorage.java | 15 +-
.../storm/cluster/ZKStateStorageFactory.java | 7 +-
.../org/apache/storm/daemon/worker/Worker.java | 57 +-
.../apache/storm/daemon/worker/WorkerState.java | 49 +-
.../jvm/org/apache/storm/generated/HBNodes.java | 32 +-
.../org/apache/storm/generated/HBRecords.java | 36 +-
.../jvm/org/apache/storm/generated/Nimbus.java | 5135 +++++++++++++-----
.../org/apache/storm/generated/Supervisor.java | 2968 ++++++++++
.../storm/generated/SupervisorAssignments.java | 478 ++
.../apache/storm/generated/SupervisorInfo.java | 177 +-
.../generated/SupervisorWorkerHeartbeat.java | 660 +++
.../generated/SupervisorWorkerHeartbeats.java | 561 ++
.../storm/generated/WorkerMetricList.java | 36 +-
.../storm/generated/WorkerTokenServiceType.java | 5 +-
.../grouping/LoadAwareShuffleGrouping.java | 8 +-
.../apache/storm/security/auth/IAuthorizer.java | 6 +-
.../security/auth/ThriftConnectionType.java | 5 +-
.../auth/authorizer/SimpleACLAuthorizer.java | 7 +-
.../SupervisorSimpleACLAuthorizer.java | 157 +
.../auth/kerberos/ServerCallbackHandler.java | 3 +-
.../jvm/org/apache/storm/stats/StatsUtil.java | 141 +-
.../apache/storm/topology/TopologyBuilder.java | 21 +-
.../apache/storm/utils/SupervisorClient.java | 88 +
storm-client/src/py/storm/Nimbus-remote | 21 +
storm-client/src/py/storm/Nimbus.py | 633 ++-
storm-client/src/py/storm/Supervisor-remote | 126 +
storm-client/src/py/storm/Supervisor.py | 694 +++
storm-client/src/py/storm/__init__.py | 2 +-
storm-client/src/py/storm/ttypes.py | 369 +-
storm-client/src/storm.thrift | 46 +-
.../LocalAssignmentsBackendTest.java | 102 +
.../cluster/StormClusterStateImplTest.java | 5 +-
.../apache/storm/command/shell_submission.clj | 2 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 57 +-
.../storm/security/auth/nimbus_auth_test.clj | 4 +
.../org/apache/storm/stats/TestStatsUtil.java | 10 +-
.../java/org/apache/storm/DaemonConfig.java | 26 +
.../java/org/apache/storm/LocalCluster.java | 19 +
.../org/apache/storm/daemon/nimbus/Nimbus.java | 527 +-
.../storm/daemon/supervisor/BasicContainer.java | 40 +-
.../supervisor/BasicContainerLauncher.java | 13 +-
.../storm/daemon/supervisor/Container.java | 5 +-
.../daemon/supervisor/ContainerLauncher.java | 10 +-
.../storm/daemon/supervisor/LocalContainer.java | 7 +-
.../supervisor/LocalContainerLauncher.java | 7 +-
.../daemon/supervisor/ReadClusterState.java | 56 +-
.../daemon/supervisor/RunAsUserContainer.java | 19 +-
.../supervisor/RunAsUserContainerLauncher.java | 15 +-
.../storm/daemon/supervisor/Supervisor.java | 224 +-
.../daemon/supervisor/SupervisorUtils.java | 17 +-
.../timer/ReportWorkerHeartbeats.java | 112 +
.../supervisor/timer/SupervisorHeartbeat.java | 17 +-
.../timer/SynchronizeAssignments.java | 133 +
.../nimbus/AssignmentDistributionService.java | 312 ++
.../org/apache/storm/nimbus/ILeaderElector.java | 6 +-
.../IWorkerHeartbeatsRecoveryStrategy.java | 48 +
.../storm/nimbus/LeaderListenerCallback.java | 231 +
.../nimbus/NimbusHeartbeatsPressureTest.java | 241 +
...TimeOutWorkerHeartbeatsRecoveryStrategy.java | 80 +
...WorkerHeartbeatsRecoveryStrategyFactory.java | 53 +
.../storm/scheduler/SupervisorDetails.java | 27 +-
.../storm/zookeeper/LeaderElectorImp.java | 12 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 145 +-
.../daemon/supervisor/BasicContainerTest.java | 244 +-
.../storm/daemon/supervisor/ContainerTest.java | 16 +-
76 files changed, 14135 insertions(+), 1949 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index d64623e..2ed4599 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -77,6 +77,10 @@ topology.max.replication.wait.time.sec: 60
nimbus.credential.renewers.freq.secs: 600
nimbus.queue.size: 100000
scheduler.display.resource: false
+nimbus.local.assignments.backend.class: "org.apache.storm.assignments.InMemoryAssignmentBackend"
+nimbus.assignments.service.threads: 10
+nimbus.assignments.service.thread.queue.size: 100
+nimbus.worker.heartbeats.recovery.strategy.class: "org.apache.storm.nimbus.TimeOutWorkerHeartbeatsRecoveryStrategy"
### ui.* configs are for the master
ui.host: 0.0.0.0
@@ -154,6 +158,8 @@ supervisor.worker.shutdown.sleep.secs: 3
supervisor.monitor.frequency.secs: 3
#how frequently the supervisor heartbeats to the cluster state (for nimbus)
supervisor.heartbeat.frequency.secs: 5
+#max timeout for a node worker heartbeats when master gains leadership
+supervisor.worker.heartbeats.max.timeout.secs: 600
supervisor.enable: true
supervisor.supervisors: []
supervisor.supervisors.commands: []
@@ -163,6 +169,13 @@ supervisor.memory.capacity.mb: 4096.0
# for single threaded bolts
supervisor.cpu.capacity: 400.0
+#Supervisor thrift config
+supervisor.thrift.port: 6628
+supervisor.queue.size: 128
+supervisor.thrift.threads: 16
+supervisor.thrift.max_buffer_size: 1048576
+supervisor.thrift.socket.timeout.ms: 5000
+
### worker.* configs are for task workers
worker.heap.memory.mb: 768
worker.childopts: "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump"
@@ -182,6 +195,9 @@ worker.log.level.reset.poll.secs: 30
# control how many worker receiver threads we need per worker
topology.worker.receiver.thread.count: 1
+# Executor metrics reporting interval.
+executor.metrics.frequency.secs: 60
+
task.heartbeat.frequency.secs: 3
task.refresh.poll.secs: 10
task.credentials.poll.secs: 30
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index ed6068c..b275985 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1102,6 +1102,13 @@ public class Config extends HashMap<String, Object> {
public static final String NIMBUS_QUEUE_SIZE = "nimbus.queue.size";
/**
+ * Nimbus assignments backend for storing local assignments. We will use it to store physical plan and runtime storm ids.
+ */
+ @isString
+ @isImplementationOfClass(implementsClass = org.apache.storm.assignments.ILocalAssignmentsBackend.class)
+ public static final String NIMBUS_LOCAL_ASSIGNMENTS_BACKEND_CLASS = "nimbus.local.assignments.backend.class";
+
+ /**
* The number of threads that should be used by the nimbus thrift server.
*/
@isInteger
@@ -1411,6 +1418,44 @@ public class Config extends HashMap<String, Object> {
@isPositiveNumber
public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity";
+ @isInteger
+ @isPositiveNumber
+ /**
+ * Port used for supervisor thrift server.
+ */
+ public static final String SUPERVISOR_THRIFT_PORT = "supervisor.thrift.port";
+
+ @isString
+ /**
+ * The Supervisor invocations transport plug-in for Thrift client/server communication.
+ */
+ public static final String SUPERVISOR_THRIFT_TRANSPORT_PLUGIN = "supervisor.thrift.transport";
+
+ @isInteger
+ @isPositiveNumber
+ /**
+ * Supervisor thrift server queue size.
+ */
+ public static final String SUPERVISOR_QUEUE_SIZE = "supervisor.queue.size";
+
+ @isInteger
+ @isPositiveNumber
+ /**
+ * The number of threads that should be used by the supervisor thrift server.
+ */
+ public static final String SUPERVISOR_THRIFT_THREADS = "supervisor.thrift.threads";
+
+ @isNumber
+ @isPositiveNumber
+ public static final String SUPERVISOR_THRIFT_MAX_BUFFER_SIZE = "supervisor.thrift.max_buffer_size";
+
+ /**
+ * How long before a supervisor Thrift Client socket hangs before timeout
+ * and restart the socket.
+ */
+ @isInteger
+ public static final String SUPERVISOR_THRIFT_SOCKET_TIMEOUT_MS = "supervisor.thrift.socket.timeout.ms";
+
/**
* A map of resources the Supervisor has e.g {"cpu.pcore.percent" : 200.0. "onheap.memory.mb": 256.0, "gpu.count" : 2.0 }
*/
@@ -1524,6 +1569,12 @@ public class Config extends HashMap<String, Object> {
public static final String SUPERVISOR_RUN_WORKER_AS_USER = "supervisor.run.worker.as.user";
/**
+ * max timeout for supervisor reported heartbeats when master gains leadership
+ */
+ @isInteger
+ public static final String SUPERVISOR_WORKER_HEARTBEATS_MAX_TIMEOUT_SECS = "supervisor.worker.heartbeats.max.timeout.secs";
+
+ /**
* On some systems (windows for example) symlinks require special privileges that not everyone wants to
* grant a headless user. You can completely disable the use of symlinks by setting this config to true, but
* by doing so you may also lose some features from storm. For example the blobstore feature
@@ -1572,12 +1623,20 @@ public class Config extends HashMap<String, Object> {
/**
* A list of users that run the supervisors and should be authorized to interact with
* nimbus as a supervisor would. To use this set
- * nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
+ * nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer.
*/
@isStringList
public static final String NIMBUS_SUPERVISOR_USERS = "nimbus.supervisor.users";
/**
+ * A list of users that nimbus runs as and should be authorized to interact with
+ * the supervisor as nimbus would. To use this set supervisor.authorizer to
+ * org.apache.storm.security.auth.authorizer.SupervisorSimpleACLAuthorizer.
+ */
+ @isStringList
+ public static final String NIMBUS_DAEMON_USERS = "nimbus.daemon.users";
+
+ /**
* A list of users that are cluster admins and can run any command. To use this set
* nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
*/
@@ -1781,10 +1840,19 @@ public class Config extends HashMap<String, Object> {
public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";
/**
- * How often a task should heartbeat its status to the master.
+ * How often executor metrics should report to master, used for RPC heartbeat mode.
*/
@isInteger
@isPositiveNumber
+ public static final String EXECUTOR_METRICS_FREQUENCY_SECS = "executor.metrics.frequency.secs";
+
+ /**
+ * How often a task should heartbeat its status to the master,
+ * deprecated for 2.0 RPC heartbeat reporting, see {@code EXECUTOR_METRICS_FREQUENCY_SECS }.
+ */
+ @Deprecated
+ @isInteger
+ @isPositiveNumber
public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs";
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormTimer.java b/storm-client/src/jvm/org/apache/storm/StormTimer.java
index 0f54ce1..72b20d3 100644
--- a/storm-client/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-client/src/jvm/org/apache/storm/StormTimer.java
@@ -18,19 +18,19 @@
package org.apache.storm;
-import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Utils;
-
import java.nio.channels.ClosedByInterruptException;
import java.util.Comparator;
import java.util.Random;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+
/**
* The timer defined in this file is very similar to java.util.Timer, except
* it integrates with Storm's time simulation capabilities. This lets us test
- * code that does asynchronous work on the timer thread
+ * code that does asynchronous work on the timer thread.
*/
public class StormTimer implements AutoCloseable {
@@ -131,7 +131,7 @@ public class StormTimer implements AutoCloseable {
private StormTimerTask task = new StormTimerTask();
/**
- * Makes a Timer in the form of a StormTimerTask Object
+ * Makes a Timer in the form of a StormTimerTask Object.
* @param name name of the timer
* @param onKill function to call when timer is killed unexpectedly
* @return StormTimerTask object that was initialized
@@ -154,7 +154,7 @@ public class StormTimer implements AutoCloseable {
}
/**
- * Schedule a function to be executed in the timer
+ * Schedule a function to be executed in the timer.
* @param delaySecs the number of seconds to delay before running the function
* @param func the function to run
* @param checkActive whether to check is the timer is active
@@ -195,7 +195,7 @@ public class StormTimer implements AutoCloseable {
}
/**
- * Schedule a function to run recurrently
+ * Schedule a function to run recurrently.
* @param delaySecs the number of seconds to delay before running the function
* @param recurSecs the time between each invocation
* @param func the function to run
@@ -212,7 +212,7 @@ public class StormTimer implements AutoCloseable {
}
/**
- * Schedule a function to run recurrently
+ * Schedule a function to run recurrently.
* @param delayMs the number of millis to delay before running the function
* @param recurMs the time between each invocation
* @param func the function to run
@@ -230,7 +230,7 @@ public class StormTimer implements AutoCloseable {
/**
- * schedule a function to run recurrently with jitter
+ * Schedule a function to run recurrently with jitter.
* @param delaySecs the number of seconds to delay before running the function
* @param recurSecs the time between each invocation
* @param jitterMs jitter added to the run
@@ -248,7 +248,7 @@ public class StormTimer implements AutoCloseable {
}
/**
- * check if timer is active
+ * check if timer is active.
*/
private void checkActive() {
if (!this.task.isActive()) {
@@ -257,7 +257,7 @@ public class StormTimer implements AutoCloseable {
}
/**
- * cancel timer
+ * cancel timer.
*/
@Override
@@ -270,7 +270,7 @@ public class StormTimer implements AutoCloseable {
}
/**
- * is timer waiting. Used in timer simulation
+ * is timer waiting. Used in timer simulation.
*/
public boolean isTimerWaiting() {
return Time.isThreadWaiting(task);
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/Thrift.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Thrift.java b/storm-client/src/jvm/org/apache/storm/Thrift.java
index e00b0ee..85f8a85 100644
--- a/storm-client/src/jvm/org/apache/storm/Thrift.java
+++ b/storm-client/src/jvm/org/apache/storm/Thrift.java
@@ -15,46 +15,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm;
-import org.apache.storm.generated.Bolt;
-import org.apache.storm.generated.JavaObjectArg;
-import org.apache.storm.generated.SpoutSpec;
-import org.apache.storm.generated.StateSpoutSpec;
-import org.apache.storm.generated.StreamInfo;
+package org.apache.storm;
+import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.List;
import java.util.HashMap;
-import java.io.Serializable;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.apache.storm.generated.JavaObject;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.JavaObject;
+import org.apache.storm.generated.JavaObjectArg;
+import org.apache.storm.generated.NullStruct;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.StormTopology._Fields;
-import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.NullStruct;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.generated.ComponentObject;
-
+import org.apache.storm.generated.StreamInfo;
import org.apache.storm.task.IBolt;
import org.apache.storm.topology.BoltDeclarer;
-import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.SpoutDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.storm.topology.TopologyBuilder;
-
public class Thrift {
private static Logger LOG = LoggerFactory.getLogger(Thrift.class);
@@ -328,19 +326,19 @@ public class Thrift {
public static StormTopology buildTopology(Map<String, SpoutDetails> spoutMap, Map<String, BoltDetails> boltMap) {
TopologyBuilder builder = new TopologyBuilder();
for (Entry<String, SpoutDetails> entry : spoutMap.entrySet()) {
- String spoutID = entry.getKey();
+ String spoutId = entry.getKey();
SpoutDetails spec = entry.getValue();
- SpoutDeclarer spoutDeclarer = builder.setSpout(spoutID, spec.getSpout(), spec.getParallelism());
+ SpoutDeclarer spoutDeclarer = builder.setSpout(spoutId, spec.getSpout(), spec.getParallelism());
spoutDeclarer.addConfigurations(spec.getConf());
}
for (Entry<String, BoltDetails> entry : boltMap.entrySet()) {
- String spoutID = entry.getKey();
+ String spoutId = entry.getKey();
BoltDetails spec = entry.getValue();
BoltDeclarer boltDeclarer = null;
if (spec.bolt instanceof IRichBolt) {
- boltDeclarer = builder.setBolt(spoutID, (IRichBolt)spec.getBolt(), spec.getParallelism());
+ boltDeclarer = builder.setBolt(spoutId, (IRichBolt)spec.getBolt(), spec.getParallelism());
} else {
- boltDeclarer = builder.setBolt(spoutID, (IBasicBolt)spec.getBolt(), spec.getParallelism());
+ boltDeclarer = builder.setBolt(spoutId, (IBasicBolt)spec.getBolt(), spec.getParallelism());
}
boltDeclarer.addConfigurations(spec.getConf());
addInputs(boltDeclarer, spec.getInputs());
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/assignments/ILocalAssignmentsBackend.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/assignments/ILocalAssignmentsBackend.java b/storm-client/src/jvm/org/apache/storm/assignments/ILocalAssignmentsBackend.java
new file mode 100644
index 0000000..10b0b0f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/assignments/ILocalAssignmentsBackend.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.assignments;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.generated.Assignment;
+
+/**
+ * Interface for storing local assignments.
+ */
+public interface ILocalAssignmentsBackend extends AutoCloseable{
+ /**
+ * Decide if the assignments is synchronized from remote state-store.
+ */
+ boolean isSynchronized();
+
+ /**
+ * Mark this backend as synchronized when sync work is done.
+ */
+ void setSynchronized();
+
+ /**
+ * Initial function for creating backend.
+ * @param conf config
+ */
+ void prepare(Map conf);
+
+ /**
+ * Keep a storm assignment to local state or update old assignment.
+ * @param stormId storm runtime id
+ * @param assignment assignment as thrift
+ */
+ void keepOrUpdateAssignment(String stormId, Assignment assignment);
+
+ /**
+ * Get assignment as {@link Assignment} for a storm.
+ * @param stormId storm runtime id
+ * @return assignment
+ */
+ Assignment getAssignment(String stormId);
+
+ void removeAssignment(String stormId);
+
+ /**
+ * List all the storm runtime ids of local assignments.
+ * @return a list of storm ids
+ */
+ List<String> assignments();
+
+ /**
+ * Get all the local assignments of local state.
+ * @return mapping of storm-id -> assignment
+ */
+ Map<String, Assignment> assignmentsInfo();
+
+ /**
+ * Sync remote assignments to local, if remote is null, we will sync it from zk.
+ * @param remote specific remote assignments, if it is null, it will sync from zookeeper[only used for nimbus]
+ */
+ void syncRemoteAssignments(Map<String, byte[]> remote);
+
+ /**
+ * Keep a mapping storm-name -> storm-id to local state.
+ * @param stormName storm name
+ * @param stormId storm runtime id
+ */
+ void keepStormId(String stormName, String stormId);
+
+ /**
+ * Get storm runtime id from local.
+ * @param stormName name of a storm
+ * @return runtime storm id
+ */
+ String getStormId(String stormName);
+
+ /**
+ * Sync remote storm ids to local, will just used for nimbus.
+ * @param remote remote ids from state store
+ */
+ void syncRemoteIds(Map<String, String> remote);
+
+ /**
+ * Delete a local cache of stormId which is mapped to a specific storm name.
+ * @param stormName storm name
+ */
+ void deleteStormId(String stormName);
+
+ /**
+ * Clear all the state for a storm.
+ * @param stormId storm id
+ */
+ void clearStateForStorm(String stormId);
+
+ /**
+ * Function to release resource.
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/assignments/InMemoryAssignmentBackend.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/assignments/InMemoryAssignmentBackend.java b/storm-client/src/jvm/org/apache/storm/assignments/InMemoryAssignmentBackend.java
new file mode 100644
index 0000000..44cf6d2
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/assignments/InMemoryAssignmentBackend.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.assignments;
+
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.generated.Assignment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An assignment backend which will keep all assignments and id-info in memory. Only used if no backend is specified
+ * internal.
+ * <p>About thread safe: idToAssignment,idToName,nameToId are all memory cache in nimbus local, for
+ * <ul>
+ * <li>idToAssignment: nimbus will modify it and supervisors will sync it at fixed interval,
+ * so the assignments would come to eventual consistency.</li>
+ * <li>idToName: storm submitting/killing is guarded by the same lock, a {@link ConcurrentHashMap} is ok.</li>
+ * <li>nameToId: same as <i>idToName</i>.
+ * </ul>
+ */
+public class InMemoryAssignmentBackend implements ILocalAssignmentsBackend {
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryAssignmentBackend.class);
+
+ private Map<String, Assignment> idToAssignment;
+ private Map<String, String> idToName;
+ private Map<String, String> nameToId;
+ private volatile boolean isSynchronized = false;
+
+ @Override
+ public boolean isSynchronized() {
+ return this.isSynchronized;
+ }
+
+ @Override
+ public void setSynchronized() {
+ this.isSynchronized = true;
+ }
+
+ @Override
+ public void prepare(Map conf) {
+ // do nothing for conf now
+ this.idToAssignment = new ConcurrentHashMap<>();
+ this.idToName = new ConcurrentHashMap<>();
+ this.nameToId = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void keepOrUpdateAssignment(String stormId, Assignment assignment) {
+ this.idToAssignment.put(stormId, assignment);
+ }
+
+ @Override
+ public Assignment getAssignment(String stormId) {
+ return this.idToAssignment.get(stormId);
+ }
+
+ @Override
+ public void removeAssignment(String stormId) {
+ this.idToAssignment.remove(stormId);
+ }
+
+ @Override
+ public List<String> assignments() {
+ if(idToAssignment == null) {
+ return new ArrayList<>();
+ }
+ List<String> ret = new ArrayList<>();
+ ret.addAll(this.idToAssignment.keySet());
+ return ret;
+ }
+
+ @Override
+ public Map<String, Assignment> assignmentsInfo() {
+ Map<String, Assignment> ret = new HashMap<>();
+ ret.putAll(this.idToAssignment);
+
+ return ret;
+ }
+
+ @Override
+ public void syncRemoteAssignments(Map<String, byte[]> remote) {
+ Map<String, Assignment> tmp = new ConcurrentHashMap<>();
+ for(Map.Entry<String, byte[]> entry: remote.entrySet()) {
+ tmp.put(entry.getKey(), ClusterUtils.maybeDeserialize(entry.getValue(), Assignment.class));
+ }
+ this.idToAssignment = tmp;
+ }
+
+ @Override
+ public void keepStormId(String stormName, String stormId) {
+ this.nameToId.put(stormName, stormId);
+ this.idToName.put(stormId, stormName);
+ }
+
+ @Override
+ public String getStormId(String stormName) {
+ return this.nameToId.get(stormName);
+ }
+
+ @Override
+ public void syncRemoteIds(Map<String, String> remote) {
+ Map<String, String> tmpNameToId = new ConcurrentHashMap<>();
+ Map<String, String> tmpIdToName = new ConcurrentHashMap<>();
+ for(Map.Entry<String, String> entry: remote.entrySet()) {
+ tmpIdToName.put(entry.getKey(), entry.getValue());
+ tmpNameToId.put(entry.getValue(), entry.getKey());
+ }
+ this.idToName = tmpIdToName;
+ this.nameToId = tmpNameToId;
+ }
+
+ @Override
+ public void deleteStormId(String stormName) {
+ String id = this.nameToId.remove(stormName);
+ if (null != id) {
+ this.idToName.remove(id);
+ }
+ }
+
+ @Override
+ public void clearStateForStorm(String stormId) {
+ this.idToAssignment.remove(stormId);
+
+ String name = this.idToName.remove(stormId);
+ if (null != name) {
+ this.nameToId.remove(name);
+ }
+ }
+
+ @Override
+ public void close() {
+ this.idToAssignment = null;
+ this.nameToId = null;
+ this.idToName = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/assignments/LocalAssignmentsBackendFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/assignments/LocalAssignmentsBackendFactory.java b/storm-client/src/jvm/org/apache/storm/assignments/LocalAssignmentsBackendFactory.java
new file mode 100644
index 0000000..f3110f1
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/assignments/LocalAssignmentsBackendFactory.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.assignments;
+
+import com.google.common.base.Preconditions;
+import org.apache.storm.Config;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ReflectionUtils;
+
+import java.util.Map;
+
+/**
+ * Factory class for creating local assignments.
+ */
+public class LocalAssignmentsBackendFactory {
+
+ public static ILocalAssignmentsBackend getBackend(Map<String, Object> conf) {
+ if (conf.get(Config.NIMBUS_LOCAL_ASSIGNMENTS_BACKEND_CLASS) != null) {
+ Object targetObj = ReflectionUtils.newInstance((String) conf.get(Config.NIMBUS_LOCAL_ASSIGNMENTS_BACKEND_CLASS));
+ Preconditions.checkState(targetObj instanceof ILocalAssignmentsBackend, "{} must implements ILocalAssignmentsBackend", Config.NIMBUS_LOCAL_ASSIGNMENTS_BACKEND_CLASS);
+ ((ILocalAssignmentsBackend)targetObj).prepare(conf);
+ return (ILocalAssignmentsBackend) targetObj;
+ }
+
+ return getDefault();
+ }
+
+ public static ILocalAssignmentsBackend getDefault() {
+ ILocalAssignmentsBackend backend = new InMemoryAssignmentBackend();
+ backend.prepare(ConfigUtils.readStormConfig());
+ return backend;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index 4bd5ded..b3dfc7d 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -18,6 +18,8 @@
package org.apache.storm.cluster;
import org.apache.storm.Config;
+import org.apache.storm.assignments.ILocalAssignmentsBackend;
+import org.apache.storm.assignments.LocalAssignmentsBackendFactory;
import org.apache.storm.generated.ClusterWorkerHeartbeat;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.ExecutorStats;
@@ -49,6 +51,7 @@ public class ClusterUtils {
public static final String SUPERVISORS_ROOT = "supervisors";
public static final String WORKERBEATS_ROOT = "workerbeats";
public static final String BACKPRESSURE_ROOT = "backpressure";
+ public static final String LEADERINFO_ROOT = "leader-info";
public static final String ERRORS_ROOT = "errors";
public static final String BLOBSTORE_ROOT = "blobstore";
public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT = "blobstoremaxkeysequencenumber";
@@ -63,6 +66,7 @@ public class ClusterUtils {
public static final String SUPERVISORS_SUBTREE = ZK_SEPERATOR + SUPERVISORS_ROOT;
public static final String WORKERBEATS_SUBTREE = ZK_SEPERATOR + WORKERBEATS_ROOT;
public static final String BACKPRESSURE_SUBTREE = ZK_SEPERATOR + BACKPRESSURE_ROOT;
+ public static final String LEADERINFO_SUBTREE = ZK_SEPERATOR + LEADERINFO_ROOT;
public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT;
public static final String BLOBSTORE_SUBTREE = ZK_SEPERATOR + BLOBSTORE_ROOT;
public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE = ZK_SEPERATOR + BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT;
@@ -252,12 +256,13 @@ public class ClusterUtils {
return executorWhb;
}
- public IStormClusterState mkStormClusterStateImpl(Object stateStorage, ClusterStateContext context) throws Exception {
+ public IStormClusterState mkStormClusterStateImpl(Object stateStorage, ILocalAssignmentsBackend backend, ClusterStateContext context) throws Exception {
if (stateStorage instanceof IStateStorage) {
- return new StormClusterStateImpl((IStateStorage) stateStorage, context, false);
+ return new StormClusterStateImpl((IStateStorage) stateStorage, backend, context, false);
} else {
- IStateStorage Storage = _instance.mkStateStorageImpl((Map<String, Object>) stateStorage, (Map<String, Object>) stateStorage, context);
- return new StormClusterStateImpl(Storage, context, true);
+ IStateStorage Storage = _instance.mkStateStorageImpl((Map<String, Object>) stateStorage,
+ (Map<String, Object>) stateStorage, context);
+ return new StormClusterStateImpl(Storage, backend, context, true);
}
}
@@ -279,8 +284,12 @@ public class ClusterUtils {
return _instance.mkStateStorageImpl(config, auth_conf, context);
}
+ public static IStormClusterState mkStormClusterState(Object StateStorage, ILocalAssignmentsBackend backend, ClusterStateContext context) throws Exception {
+ return _instance.mkStormClusterStateImpl(StateStorage, backend, context);
+ }
+
public static IStormClusterState mkStormClusterState(Object StateStorage, ClusterStateContext context) throws Exception {
- return _instance.mkStormClusterStateImpl(StateStorage, context);
+ return _instance.mkStormClusterStateImpl(StateStorage, LocalAssignmentsBackendFactory.getDefault(), context);
}
public static String stringifyError(Throwable error) {
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java b/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
index 8593b7b..4f3865e 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
@@ -57,6 +57,8 @@ public enum DaemonType {
}
switch (type) {
case NIMBUS:
+ //Fall through on purpose
+ case SUPERVISOR:
return ZooDefs.Ids.CREATOR_ALL_ACL;
case DRPC:
List<ACL> ret = new ArrayList<>(ZooDefs.Ids.CREATOR_ALL_ACL);
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index ab893be..8a922ac 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -42,8 +42,46 @@ import org.apache.storm.nimbus.NimbusInfo;
public interface IStormClusterState {
List<String> assignments(Runnable callback);
+ /**
+ * Get the assignment based on storm id from local backend.
+ * @param stormId topology id
+ * @param callback callback function
+ * @return {@link Assignment}
+ */
Assignment assignmentInfo(String stormId, Runnable callback);
+ /**
+ * Get the assignment based on storm id from remote state store, eg: ZK.
+ * @param stormId topology id
+ * @param callback callback function
+ * @return {@link Assignment}
+ */
+ Assignment remoteAssignmentInfo(String stormId, Runnable callback);
+
+ /**
+ * Get all the topologies assignments mapping stormId -> Assignment from local backend.
+ * @return stormId -> Assignment mapping
+ */
+ Map<String, Assignment> assignmentsInfo();
+
+ /**
+ * Sync the remote state store assignments to local backend, used when master gains leadership, see
+ * {@link LeaderListenerCallback}
+ * @param remote assigned assignments for a specific {@link IStormClusterState} instance, usually a supervisor/node.
+ */
+ void syncRemoteAssignments(Map<String, byte[]> remote);
+
+ /**
+ * Flag to indicate if the assignments synced successfully, see {@link #syncRemoteAssignments(Map)}.
+ * @return true if is synced successfully
+ */
+ boolean isAssignmentsBackendSynchronized();
+
+ /**
+ * Mark the assignments as synced successfully, see {@link #isAssignmentsBackendSynchronized()}
+ */
+ void setAssignmentsBackendSynchronized();
+
VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
@@ -64,6 +102,19 @@ public interface IStormClusterState {
*/
StormBase stormBase(String stormId, Runnable callback);
+ /**
+ * Get storm id from passed name, null if the name doesn't exist on cluster.
+ * @param stormName storm name
+ * @return storm id
+ */
+ String stormId(String stormName);
+
+ /**
+ * Sync all the active storm ids of the cluster, used now when master gains leadership.
+ * @param ids stormName -> stormId mapping
+ */
+ void syncRemoteIds(Map<String, String> ids);
+
ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
@@ -94,6 +145,15 @@ public interface IStormClusterState {
@Deprecated
List<String> backpressureTopologies();
+ /**
+ * Get leader info from state store, which was written when a master gains leadership.
+ * <p>Caution: it can not be used for fencing and is only for informational purposes because we use ZK as our
+ * backend now, which could have a overdue info of nodes.
+ * @param callback callback func
+ * @return {@link NimbusInfo}
+ */
+ NimbusInfo getLeader(Runnable callback);
+
void setTopologyLogConfig(String stormId, LogConfig logConfig);
LogConfig topologyLogConfig(String stormId, Runnable cb);
@@ -233,23 +293,7 @@ public interface IStormClusterState {
* @return the id of the topology or null if it is not alive.
*/
default Optional<String> getTopoId(final String topologyName) {
- String ret = null;
- for (String topoId: activeStorms()) {
- StormBase base = stormBase(topoId, null);
- if (base != null && topologyName.equals(base.get_name())) {
- ret = topoId;
- break;
- }
- }
- return Optional.ofNullable(ret);
- }
-
- default Map<String, Assignment> topologyAssignments() {
- Map<String, Assignment> ret = new HashMap<>();
- for (String topoId: assignments(null)) {
- ret.put(topoId, assignmentInfo(topoId, null));
- }
- return ret;
+ return Optional.ofNullable(stormId(topologyName));
}
default Map<String, StormBase> topologyBases() {
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 719ebbf..fc02b8e 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -20,6 +20,8 @@ package org.apache.storm.cluster;
import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -28,12 +30,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.storm.assignments.ILocalAssignmentsBackend;
import org.apache.storm.callback.ZKStateChangedCallback;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.ClusterWorkerHeartbeat;
@@ -64,6 +66,7 @@ public class StormClusterStateImpl implements IStormClusterState {
private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
private IStateStorage stateStorage;
+ private ILocalAssignmentsBackend assignmentsBackend;
private ConcurrentHashMap<String, Runnable> assignmentInfoCallback;
private ConcurrentHashMap<String, Runnable> assignmentInfoWithVersionCallback;
@@ -71,7 +74,7 @@ public class StormClusterStateImpl implements IStormClusterState {
private AtomicReference<Runnable> supervisorsCallback;
// we want to register a topo directory getChildren callback for all workers of this dir
private ConcurrentHashMap<String, Runnable> backPressureCallback;
-
+ private AtomicReference<Runnable> leaderInfoCallback;
private AtomicReference<Runnable> assignmentsCallback;
private ConcurrentHashMap<String, Runnable> stormBaseCallback;
private AtomicReference<Runnable> blobstoreCallback;
@@ -83,18 +86,20 @@ public class StormClusterStateImpl implements IStormClusterState {
private final boolean solo;
private final ClusterStateContext context;
- public StormClusterStateImpl(IStateStorage StateStorage, ClusterStateContext context, boolean solo) throws Exception {
+ public StormClusterStateImpl(IStateStorage StateStorage, ILocalAssignmentsBackend assignmentsassignmentsBackend,
+ ClusterStateContext context, boolean solo) throws Exception {
this.stateStorage = StateStorage;
this.solo = solo;
this.defaultAcls = context.getDefaultZkAcls();
this.context = context;
-
+ this.assignmentsBackend = assignmentsassignmentsBackend;
assignmentInfoCallback = new ConcurrentHashMap<>();
assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
assignmentVersionCallback = new ConcurrentHashMap<>();
supervisorsCallback = new AtomicReference<>();
backPressureCallback = new ConcurrentHashMap<>();
+ leaderInfoCallback = new AtomicReference<>();
assignmentsCallback = new AtomicReference<>();
stormBaseCallback = new ConcurrentHashMap<>();
credentialsCallback = new ConcurrentHashMap<>();
@@ -130,6 +135,8 @@ public class StormClusterStateImpl implements IStormClusterState {
issueMapCallback(logConfigCallback, toks.get(1));
} else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) {
issueMapCallback(backPressureCallback, toks.get(1));
+ } else if (root.equals(ClusterUtils.LEADERINFO_ROOT)) {
+ issueCallback(leaderInfoCallback);
} else {
LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
Runtime.getRuntime().exit(30);
@@ -159,14 +166,16 @@ public class StormClusterStateImpl implements IStormClusterState {
protected void issueCallback(AtomicReference<Runnable> cb) {
Runnable callback = cb.getAndSet(null);
- if (callback != null)
+ if (callback != null) {
callback.run();
+ }
}
protected void issueMapCallback(ConcurrentHashMap<String, Runnable> callbackConcurrentHashMap, String key) {
Runnable callback = callbackConcurrentHashMap.remove(key);
- if (callback != null)
+ if (callback != null) {
callback.run();
+ }
}
@Override
@@ -174,7 +183,7 @@ public class StormClusterStateImpl implements IStormClusterState {
if (callback != null) {
assignmentsCallback.set(callback);
}
- return stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, callback != null);
+ return this.assignmentsBackend.assignments();
}
@Override
@@ -182,11 +191,49 @@ public class StormClusterStateImpl implements IStormClusterState {
if (callback != null) {
assignmentInfoCallback.put(stormId, callback);
}
+ return this.assignmentsBackend.getAssignment(stormId);
+ }
+
+ @Override
+ public Assignment remoteAssignmentInfo(String stormId, Runnable callback) {
+ if (callback != null) {
+ assignmentInfoCallback.put(stormId, callback);
+ }
byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null);
return ClusterUtils.maybeDeserialize(serialized, Assignment.class);
}
@Override
+ public Map<String, Assignment> assignmentsInfo() {
+ return this.assignmentsBackend.assignmentsInfo();
+ }
+
+ @Override
+ public void syncRemoteAssignments(Map<String, byte[]> remote) {
+ if (null != remote) {
+ this.assignmentsBackend.syncRemoteAssignments(remote);
+ } else {
+ Map<String, byte[]> tmp = new HashMap<>();
+ List<String> stormIds = this.stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, false);
+ for (String stormId : stormIds) {
+ byte[] assignment = this.stateStorage.get_data(ClusterUtils.assignmentPath(stormId), false);
+ tmp.put(stormId, assignment);
+ }
+ this.assignmentsBackend.syncRemoteAssignments(tmp);
+ }
+ }
+
+ @Override
+ public boolean isAssignmentsBackendSynchronized() {
+ return this.assignmentsBackend.isSynchronized();
+ }
+
+ @Override
+ public void setAssignmentsBackendSynchronized() {
+ this.assignmentsBackend.setSynchronized();
+ }
+
+ @Override
public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback) {
if (callback != null) {
assignmentInfoWithVersionCallback.put(stormId, callback);
@@ -268,6 +315,25 @@ public class StormClusterStateImpl implements IStormClusterState {
}
@Override
+ public String stormId(String stormName) {
+ return this.assignmentsBackend.getStormId(stormName);
+ }
+
+ @Override
+ public void syncRemoteIds(Map<String, String> remote) {
+ if (null != remote) {
+ this.assignmentsBackend.syncRemoteIds(remote);
+ }else {
+ Map<String, String> tmp = new HashMap<>();
+ List<String> activeStorms = activeStorms();
+ for (String stormId: activeStorms) {
+ tmp.put(stormId, stormBase(stormId, null).get_name());
+ }
+ this.assignmentsBackend.syncRemoteIds(tmp);
+ }
+ }
+
+ @Override
public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
byte[] bytes = stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), false);
return ClusterUtils.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
@@ -280,8 +346,9 @@ public class StormClusterStateImpl implements IStormClusterState {
List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId);
for (ProfileRequest profileRequest : profileRequests) {
NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
- if (nodeInfo1.equals(nodeInfo))
+ if (nodeInfo1.equals(nodeInfo)) {
requests.add(profileRequest);
+ }
}
return requests;
}
@@ -296,8 +363,9 @@ public class StormClusterStateImpl implements IStormClusterState {
String childPath = path + ClusterUtils.ZK_SEPERATOR + str;
byte[] raw = stateStorage.get_data(childPath, false);
ProfileRequest request = ClusterUtils.maybeDeserialize(raw, ProfileRequest.class);
- if (request != null)
+ if (request != null) {
profileRequests.add(request);
+ }
}
}
return profileRequests;
@@ -322,13 +390,13 @@ public class StormClusterStateImpl implements IStormClusterState {
}
/**
- * need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock overrides all the
- * timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, we avoid
- * situations like that
+ * need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker
+ * with a skewed clock overrides all the timestamps. By only checking heartbeats with an assigned node+port,
+ * and only reading executors from that heartbeat that are actually assigned, we avoid situations like that.
*
- * @param stormId
- * @param executorNodePort
- * @return
+ * @param stormId topology id
+ * @param executorNodePort executor id -> node + port
+ * @return mapping of executorInfo -> executor beat
*/
@Override
public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
@@ -345,8 +413,9 @@ public class StormClusterStateImpl implements IStormClusterState {
for (List<Long> list : entry.getValue()) {
executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
}
- if (whb != null)
+ if (whb != null) {
executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
+ }
}
return executorWhbs;
}
@@ -399,6 +468,14 @@ public class StormClusterStateImpl implements IStormClusterState {
}
@Override
+ public NimbusInfo getLeader(Runnable callback) {
+ if (null != callback) {
+ this.leaderInfoCallback.set(callback);
+ }
+ return Utils.javaDeserialize(this.stateStorage.get_data(ClusterUtils.LEADERINFO_SUBTREE, callback != null), NimbusInfo.class);
+ }
+
+ @Override
public List<String> backpressureTopologies() {
return stateStorage.get_children(ClusterUtils.BACKPRESSURE_SUBTREE, false);
}
@@ -508,6 +585,7 @@ public class StormClusterStateImpl implements IStormClusterState {
public void activateStorm(String stormId, StormBase stormBase) {
String path = ClusterUtils.stormPath(stormId);
stateStorage.set_data(path, Utils.serialize(stormBase), defaultAcls);
+ this.assignmentsBackend.keepStormId(stormBase.get_name(), stormId);
}
/**
@@ -533,8 +611,9 @@ public class StormClusterStateImpl implements IStormClusterState {
newComponentExecutors.put(entry.getKey(), entry.getValue());
}
}
- if (newComponentExecutors.size() > 0)
+ if (newComponentExecutors.size() > 0) {
newElems.set_component_executors(newComponentExecutors);
+ }
}
Map<String, DebugOptions> ComponentDebug = new HashMap<>();
@@ -605,7 +684,9 @@ public class StormClusterStateImpl implements IStormClusterState {
@Override
public void setAssignment(String stormId, Assignment info) {
- stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), defaultAcls);
+ byte[] serAssignment = Utils.serialize(info);
+ stateStorage.set_data(ClusterUtils.assignmentPath(stormId), serAssignment, defaultAcls);
+ this.assignmentsBackend.keepOrUpdateAssignment(stormId, info);
}
@Override
@@ -636,6 +717,7 @@ public class StormClusterStateImpl implements IStormClusterState {
@Override
public void removeStorm(String stormId) {
stateStorage.delete_node(ClusterUtils.assignmentPath(stormId));
+ this.assignmentsBackend.clearStateForStorm(stormId);
stateStorage.delete_node(ClusterUtils.credentialsPath(stormId));
stateStorage.delete_node(ClusterUtils.logConfigPath(stormId));
stateStorage.delete_node(ClusterUtils.profilerConfigPath(stormId));
@@ -697,8 +779,9 @@ public class StormClusterStateImpl implements IStormClusterState {
for (String child : childrens) {
String childPath = path + ClusterUtils.ZK_SEPERATOR + child;
ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), ErrorInfo.class);
- if (errorInfo != null)
+ if (errorInfo != null) {
errorInfos.add(errorInfo);
+ }
}
}
Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
@@ -745,6 +828,7 @@ public class StormClusterStateImpl implements IStormClusterState {
stateStorage.unregister(stateId);
if (solo) {
stateStorage.close();
+ this.assignmentsBackend.close();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
index d5c29f9..8ae6c5f 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -15,8 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.cluster;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.*;
import org.apache.curator.framework.state.ConnectionState;
@@ -33,13 +41,6 @@ import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public class ZKStateStorage implements IStateStorage {
private static Logger LOG = LoggerFactory.getLogger(ZKStateStorage.class);
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
index 0ae745f..5076045 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
@@ -15,14 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.cluster;
-import org.apache.storm.utils.Utils;
-import org.apache.zookeeper.data.ACL;
+package org.apache.storm.cluster;
import java.util.List;
import java.util.Map;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.data.ACL;
+
public class ZKStateStorageFactory implements StateStorageFactory {
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index ca0d4d0..94ea9af 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -57,6 +57,7 @@ import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IContext;
import org.apache.storm.security.auth.AuthUtils;
@@ -64,7 +65,9 @@ import org.apache.storm.security.auth.IAutoCredentials;
import org.apache.storm.stats.StatsUtil;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.SupervisorClient;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.zookeeper.data.ACL;
@@ -82,6 +85,7 @@ public class Worker implements Shutdownable, DaemonCommon {
private final IContext context;
private final String topologyId;
private final String assignmentId;
+ private final int supervisorPort;
private final int port;
private final String workerId;
private final LogConfigManager logConfigManager;
@@ -105,15 +109,18 @@ public class Worker implements Shutdownable, DaemonCommon {
* @param context -
* @param topologyId - topology id
* @param assignmentId - assignment id
+ * @param supervisorPort - parent supervisor thrift server port
* @param port - port on which the worker runs
* @param workerId - worker id
*/
- public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId, int port, String workerId) {
+ public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
+ int supervisorPort, int port, String workerId) {
this.conf = conf;
this.context = context;
this.topologyId = topologyId;
this.assignmentId = assignmentId;
+ this.supervisorPort = supervisorPort;
this.port = port;
this.workerId = workerId;
this.logConfigManager = new LogConfigManager();
@@ -137,7 +144,7 @@ public class Worker implements Shutdownable, DaemonCommon {
ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf);
IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
- IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, csContext);
+ IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
Map<String, String> initCreds = new HashMap<>();
@@ -156,8 +163,8 @@ public class Worker implements Shutdownable, DaemonCommon {
private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
Map<String, String> initCreds, Credentials initialCredentials)
throws Exception {
- workerState = new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
- stormClusterState, autoCreds);
+ workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorPort, port, workerId,
+ topologyConf, stateStorage, stormClusterState, autoCreds);
// Heartbeat here so that worker process dies if this fails
// it's important that worker heartbeat to supervisor ASAP so that supervisor knows
@@ -178,7 +185,7 @@ public class Worker implements Shutdownable, DaemonCommon {
});
workerState.executorHeartbeatTimer
- .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+ .scheduleRecurring(0, (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS),
Worker.this::doExecutorHeartbeats);
workerState.registerCallbacks();
@@ -313,12 +320,14 @@ public class Worker implements Shutdownable, DaemonCommon {
public void doHeartBeat() throws IOException {
LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
- state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
+ LSWorkerHeartbeat lsWorkerHeartbeat = new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
workerState.localExecutors.stream()
.map(executor -> new ExecutorInfo(executor.get(0).intValue(), executor.get(1).intValue()))
- .collect(Collectors.toList()), workerState.port));
+ .collect(Collectors.toList()), workerState.port);
+ state.setWorkerHeartBeat(lsWorkerHeartbeat);
state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
// it shouldn't take supervisor 120 seconds between listing dir and reading it
+ heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
}
public void doExecutorHeartbeats() {
@@ -392,6 +401,30 @@ public class Worker implements Shutdownable, DaemonCommon {
workerState.stormClusterState.topologyLogConfig(topologyId, this::checkLogConfigChanged);
}
+ /**
+ * Send a heartbeat to local supervisor first to check if supervisor is ok for heartbeating.
+ */
+ private void heartbeatToMasterIfLocalbeatFail(LSWorkerHeartbeat lsWorkerHeartbeat) {
+ if (ConfigUtils.isLocalMode(this.conf)) {
+ return;
+ }
+ //In distributed mode, send heartbeat directly to master if local supervisor goes down.
+ SupervisorWorkerHeartbeat workerHeartbeat = new SupervisorWorkerHeartbeat(lsWorkerHeartbeat.get_topology_id(),
+ lsWorkerHeartbeat.get_executors(), lsWorkerHeartbeat.get_time_secs());
+ try (SupervisorClient client = SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPort)){
+ client.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
+ } catch (Exception tr1) {
+ //If any error/exception thrown, report directly to nimbus.
+ LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage());
+ try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)){
+ nimbusClient.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
+ } catch (Exception tr2) {
+ //if any error/exception thrown, just ignore.
+ LOG.error("Exception when send heartbeat to master", tr2.getMessage());
+ }
+ }
+ }
+
@Override
public void shutdown() {
try {
@@ -463,15 +496,17 @@ public class Worker implements Shutdownable, DaemonCommon {
}
public static void main(String[] args) throws Exception {
- Preconditions.checkArgument(args.length == 4, "Illegal number of arguments. Expected: 4, Actual: " + args.length);
+ Preconditions.checkArgument(args.length == 5, "Illegal number of arguments. Expected: 5, Actual: " + args.length);
String stormId = args[0];
String assignmentId = args[1];
- String portStr = args[2];
- String workerId = args[3];
+ String supervisorPort = args[2];
+ String portStr = args[3];
+ String workerId = args[4];
Map<String, Object> conf = Utils.readStormConfig();
Utils.setupDefaultUncaughtExceptionHandler();
StormCommon.validateDistributedMode(conf);
- Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(portStr), workerId);
+ Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(supervisorPort),
+ Integer.parseInt(portStr), workerId);
worker.start();
Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index d080471..992dcbc 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -81,6 +81,7 @@ import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.SupervisorClient;
import org.apache.storm.utils.ThriftTopologyUtils;
import org.apache.storm.utils.Utils.SmartThread;
import org.slf4j.Logger;
@@ -199,6 +200,7 @@ public class WorkerState {
final IConnection receiver;
final String topologyId;
final String assignmentId;
+ final int supervisorPort;
final int port;
final String workerId;
final IStateStorage stateStorage;
@@ -272,18 +274,17 @@ public class WorkerState {
private final Collection<IAutoCredentials> autoCredentials;
private static final long LOAD_REFRESH_INTERVAL_MS = 5000L;
- public WorkerState(Map<String, Object> conf, IContext mqContext, String topologyId, String assignmentId, int port, String workerId,
- Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
- Collection<IAutoCredentials> autoCredentials)
- throws IOException, InvalidTopologyException {
+ public WorkerState(Map<String, Object> conf, IContext mqContext, String topologyId, String assignmentId,
+ int supervisorPort, int port, String workerId, Map<String, Object> topologyConf, IStateStorage stateStorage,
+ IStormClusterState stormClusterState, Collection<IAutoCredentials> autoCredentials) throws IOException, InvalidTopologyException {
this.autoCredentials = autoCredentials;
- this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
-
this.conf = conf;
+ this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
this.receiver = this.mqContext.bind(topologyId, port);
this.topologyId = topologyId;
this.assignmentId = assignmentId;
+ this.supervisorPort = supervisorPort;
this.port = port;
this.workerId = workerId;
this.stateStorage = stateStorage;
@@ -354,23 +355,7 @@ public class WorkerState {
}
public void refreshConnections(Runnable callback) throws Exception {
- Integer version = stormClusterState.assignmentVersion(topologyId, callback);
- version = (null == version) ? 0 : version;
- VersionedData<Assignment> assignmentVersion = assignmentVersions.get().get(topologyId);
- Assignment assignment;
- if (null != assignmentVersion && (assignmentVersion.getVersion() == version)) {
- assignment = assignmentVersion.getData();
- } else {
- VersionedData<Assignment>
- newAssignmentVersion = new VersionedData<>(version,
- stormClusterState.assignmentInfoWithVersion(topologyId, callback).getData());
- assignmentVersions.getAndUpdate(prev -> {
- Map<String, VersionedData<Assignment>> next = new HashMap<>(prev);
- next.put(topologyId, newAssignmentVersion);
- return next;
- });
- assignment = newAssignmentVersion.getData();
- }
+ Assignment assignment = getLocalAssignment(conf, stormClusterState, topologyId);
Set<NodeInfo> neededConnections = new HashSet<>();
Map<Integer, NodeInfo> newTaskToNodePort = new HashMap<>();
@@ -645,8 +630,7 @@ public class WorkerState {
LOG.info("Reading assignments");
List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
- Map<List<Long>, NodeInfo> executorToNodePort =
- stormClusterState.assignmentInfo(topologyId, null).get_executor_node_port();
+ Map<List<Long>, NodeInfo> executorToNodePort = getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port();
for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
NodeInfo nodeInfo = entry.getValue();
if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
@@ -656,6 +640,21 @@ public class WorkerState {
return executorsAssignedToThisWorker;
}
+ private Assignment getLocalAssignment(Map<String, Object> conf, IStormClusterState stormClusterState, String topologyId) {
+ if (!ConfigUtils.isLocalMode(conf)) {
+ try (SupervisorClient supervisorClient = SupervisorClient.getConfiguredClient(conf, Utils.hostname(),
+ supervisorPort)){
+ Assignment assignment = supervisorClient.getClient().getLocalAssignmentForStorm(topologyId);
+ return assignment;
+ } catch (Throwable tr1) {
+ //if any error/exception thrown, fetch it from zookeeper
+ return stormClusterState.remoteAssignmentInfo(topologyId, null);
+ }
+ } else {
+ return stormClusterState.remoteAssignmentInfo(topologyId, null);
+ }
+ }
+
private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> topologyConf, Set<List<Long>> executors) {
Integer recvQueueSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE));
Integer recvBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/generated/HBNodes.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/HBNodes.java b/storm-client/src/jvm/org/apache/storm/generated/HBNodes.java
index 887dbc0..4bbd4f4 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/HBNodes.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/HBNodes.java
@@ -364,13 +364,13 @@ public class HBNodes implements org.apache.thrift.TBase<HBNodes, HBNodes._Fields
case 1: // PULSE_IDS
if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
{
- org.apache.thrift.protocol.TList _list886 = iprot.readListBegin();
- struct.pulseIds = new ArrayList<String>(_list886.size);
- String _elem887;
- for (int _i888 = 0; _i888 < _list886.size; ++_i888)
+ org.apache.thrift.protocol.TList _list912 = iprot.readListBegin();
+ struct.pulseIds = new ArrayList<String>(_list912.size);
+ String _elem913;
+ for (int _i914 = 0; _i914 < _list912.size; ++_i914)
{
- _elem887 = iprot.readString();
- struct.pulseIds.add(_elem887);
+ _elem913 = iprot.readString();
+ struct.pulseIds.add(_elem913);
}
iprot.readListEnd();
}
@@ -396,9 +396,9 @@ public class HBNodes implements org.apache.thrift.TBase<HBNodes, HBNodes._Fields
oprot.writeFieldBegin(PULSE_IDS_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.pulseIds.size()));
- for (String _iter889 : struct.pulseIds)
+ for (String _iter915 : struct.pulseIds)
{
- oprot.writeString(_iter889);
+ oprot.writeString(_iter915);
}
oprot.writeListEnd();
}
@@ -429,9 +429,9 @@ public class HBNodes implements org.apache.thrift.TBase<HBNodes, HBNodes._Fields
if (struct.is_set_pulseIds()) {
{
oprot.writeI32(struct.pulseIds.size());
- for (String _iter890 : struct.pulseIds)
+ for (String _iter916 : struct.pulseIds)
{
- oprot.writeString(_iter890);
+ oprot.writeString(_iter916);
}
}
}
@@ -443,13 +443,13 @@ public class HBNodes implements org.apache.thrift.TBase<HBNodes, HBNodes._Fields
BitSet incoming = iprot.readBitSet(1);
if (incoming.get(0)) {
{
- org.apache.thrift.protocol.TList _list891 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
- struct.pulseIds = new ArrayList<String>(_list891.size);
- String _elem892;
- for (int _i893 = 0; _i893 < _list891.size; ++_i893)
+ org.apache.thrift.protocol.TList _list917 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+ struct.pulseIds = new ArrayList<String>(_list917.size);
+ String _elem918;
+ for (int _i919 = 0; _i919 < _list917.size; ++_i919)
{
- _elem892 = iprot.readString();
- struct.pulseIds.add(_elem892);
+ _elem918 = iprot.readString();
+ struct.pulseIds.add(_elem918);
}
}
struct.set_pulseIds_isSet(true);
http://git-wip-us.apache.org/repos/asf/storm/blob/0dac58b0/storm-client/src/jvm/org/apache/storm/generated/HBRecords.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/HBRecords.java b/storm-client/src/jvm/org/apache/storm/generated/HBRecords.java
index cfed785..fae00cf 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/HBRecords.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/HBRecords.java
@@ -367,14 +367,14 @@ public class HBRecords implements org.apache.thrift.TBase<HBRecords, HBRecords._
case 1: // PULSES
if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
{
- org.apache.thrift.protocol.TList _list878 = iprot.readListBegin();
- struct.pulses = new ArrayList<HBPulse>(_list878.size);
- HBPulse _elem879;
- for (int _i880 = 0; _i880 < _list878.size; ++_i880)
+ org.apache.thrift.protocol.TList _list904 = iprot.readListBegin();
+ struct.pulses = new ArrayList<HBPulse>(_list904.size);
+ HBPulse _elem905;
+ for (int _i906 = 0; _i906 < _list904.size; ++_i906)
{
- _elem879 = new HBPulse();
- _elem879.read(iprot);
- struct.pulses.add(_elem879);
+ _elem905 = new HBPulse();
+ _elem905.read(iprot);
+ struct.pulses.add(_elem905);
}
iprot.readListEnd();
}
@@ -400,9 +400,9 @@ public class HBRecords implements org.apache.thrift.TBase<HBRecords, HBRecords._
oprot.writeFieldBegin(PULSES_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.pulses.size()));
- for (HBPulse _iter881 : struct.pulses)
+ for (HBPulse _iter907 : struct.pulses)
{
- _iter881.write(oprot);
+ _iter907.write(oprot);
}
oprot.writeListEnd();
}
@@ -433,9 +433,9 @@ public class HBRecords implements org.apache.thrift.TBase<HBRecords, HBRecords._
if (struct.is_set_pulses()) {
{
oprot.writeI32(struct.pulses.size());
- for (HBPulse _iter882 : struct.pulses)
+ for (HBPulse _iter908 : struct.pulses)
{
- _iter882.write(oprot);
+ _iter908.write(oprot);
}
}
}
@@ -447,14 +447,14 @@ public class HBRecords implements org.apache.thrift.TBase<HBRecords, HBRecords._
BitSet incoming = iprot.readBitSet(1);
if (incoming.get(0)) {
{
- org.apache.thrift.protocol.TList _list883 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
- struct.pulses = new ArrayList<HBPulse>(_list883.size);
- HBPulse _elem884;
- for (int _i885 = 0; _i885 < _list883.size; ++_i885)
+ org.apache.thrift.protocol.TList _list909 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+ struct.pulses = new ArrayList<HBPulse>(_list909.size);
+ HBPulse _elem910;
+ for (int _i911 = 0; _i911 < _list909.size; ++_i911)
{
- _elem884 = new HBPulse();
- _elem884.read(iprot);
- struct.pulses.add(_elem884);
+ _elem910 = new HBPulse();
+ _elem910.read(iprot);
+ struct.pulses.add(_elem910);
}
}
struct.set_pulses_isSet(true);