You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/03 15:50:05 UTC
[05/10] storm git commit: STORM-1281: LocalCluster,
testing4j and testing.clj to java
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/LocalCluster.java b/storm-core/src/jvm/org/apache/storm/LocalCluster.java
new file mode 100644
index 0000000..3b18ac9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/LocalCluster.java
@@ -0,0 +1,824 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.UnaryOperator;
+
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.DaemonCommon;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.nimbus.Nimbus;
+import org.apache.storm.daemon.nimbus.Nimbus.StandaloneINimbus;
+import org.apache.storm.daemon.supervisor.ReadClusterState;
+import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
+import org.apache.storm.daemon.supervisor.Supervisor;
+import org.apache.storm.executor.LocalExecutor;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.Nimbus.Processor;
+import org.apache.storm.generated.NimbusSummary;
+import org.apache.storm.generated.RebalanceOptions;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.messaging.local.Context;
+import org.apache.storm.nimbus.ILeaderElector;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.security.auth.IGroupMappingServiceProvider;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.ThriftServer;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.testing.InProcessZookeeper;
+import org.apache.storm.testing.NonRichBoltTracker;
+import org.apache.storm.testing.TmpPath;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.RegisteredGlobalState;
+import org.apache.storm.utils.StormCommonInstaller;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.TException;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A stand alone storm cluster that runs inside a single process.
+ * It is intended to be used for testing. Both internal testing for
+ * Apache Storm itself and for people building storm topologies.
+ *
+ * LocalCluster is an AutoCloseable so if you are using it in tests you can use
+ * a try block to be sure it is shut down.
+ *
+ * try (LocalCluster cluster = new LocalCluster()) {
+ * // Do some tests
+ * }
+ * // The cluster has been shut down.
+ */
+public class LocalCluster implements ILocalCluster {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalCluster.class);
+
+ private static ThriftServer startNimbusDaemon(Map<String, Object> conf, Nimbus nimbus) {
+ ThriftServer ret = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS);
+ LOG.info("Starting Nimbus server...");
+ new Thread(() -> ret.serve()).start();
+ return ret;
+ }
+
+ /**
+ * Simple way to configure a LocalCluster to meet your needs.
+ */
+ public static class Builder {
+ private int supervisors = 2;
+ private int portsPerSupervisor = 3;
+ private Map<String, Object> daemonConf = new HashMap<>();
+ private INimbus inimbus = null;
+ private IGroupMappingServiceProvider groupMapper = null;
+ private int supervisorSlotPortMin = 1024;
+ private boolean nimbusDaemon = false;
+ private UnaryOperator<Nimbus> nimbusWrapper = null;
+ private BlobStore store = null;
+ private IStormClusterState clusterState = null;
+ private ILeaderElector leaderElector = null;
+ private String trackId = null;
+ private boolean simulateTime = false;
+
+ /**
+ * Set the number of supervisors the cluster should have.
+ */
+ public Builder withSupervisors(int supervisors) {
+ if (supervisors < 0) {
+ throw new IllegalArgumentException("supervisors cannot be negative");
+ }
+ this.supervisors = supervisors;
+ return this;
+ }
+
+ /**
+ * Set the number of slots/ports each supervisor should have
+ */
+ public Builder withPortsPerSupervisor(int portsPerSupervisor) {
+ if (portsPerSupervisor < 0) {
+ throw new IllegalArgumentException("supervisor ports cannot be negative");
+ }
+ this.portsPerSupervisor = portsPerSupervisor;
+ return this;
+ }
+
+ /**
+ * Set the base config that the daemons should use.
+ */
+ public Builder withDaemonConf(Map<String, Object> conf) {
+ if (conf != null) {
+ this.daemonConf = new HashMap<>(conf);
+ }
+ return this;
+ }
+
+ /**
+ * Add an single key/value config to the daemon conf
+ */
+ public Builder withDaemonConf(String key, Object value) {
+ this.daemonConf.put(key, value);
+ return this;
+ }
+
+ /**
+ * Override the INimbus instance that nimbus will use.
+ */
+ public Builder withINimbus(INimbus inimbus) {
+ this.inimbus = inimbus;
+ return this;
+ }
+
+ /**
+ * Override the code that maps users to groups for authorization.
+ */
+ public Builder withGroupMapper(IGroupMappingServiceProvider groupMapper) {
+ this.groupMapper = groupMapper;
+ return this;
+ }
+
+ /**
+ * When assigning ports to worker slots start at minPort.
+ */
+ public Builder withSupervisorSlotPortMin(Number minPort) {
+ int port = 1024;
+ if (minPort == null) {
+ LOG.warn("Number is null... {}", minPort);
+ } else {
+ port = minPort.intValue();
+ }
+ if (port <= 0) {
+ throw new IllegalArgumentException("port must be positive");
+ }
+ this.supervisorSlotPortMin = port;
+ return this;
+ }
+
+ /**
+ * Have the local nimbus actually launch a thrift server. This is intended to
+ * be used mostly for internal storm testing.
+ */
+ public Builder withNimbusDaemon() {
+ return withNimbusDaemon(true);
+ }
+
+ /**
+ * If nimbusDaemon is true the local nimbus will launch a thrift server. This is intended to
+ * be used mostly for internal storm testing.
+ */
+ public Builder withNimbusDaemon(Boolean nimbusDaemon) {
+ if (nimbusDaemon == null) {
+ nimbusDaemon = false;
+ LOG.warn("nimbusDaemon is null");
+ }
+ this.nimbusDaemon = nimbusDaemon;
+ return this;
+ }
+
+ /**
+ * Turn on simulated time in the cluster. This allows someone to simulate long periods of
+ * time for timeouts etc when testing nimbus/supervisors themselves. NOTE: that this only
+ * works for code that uses the {@link org.apache.storm.utils.Time} class for time management
+ * so it will not work in all cases.
+ */
+ public Builder withSimulatedTime() {
+ return withSimulatedTime(true);
+ }
+
+ /**
+ * Turn on simulated time in the cluster. This allows someone to simulate long periods of
+ * time for timeouts etc when testing nimbus/supervisors themselves. NOTE: that this only
+ * works for code that uses the {@link org.apache.storm.utils.Time} class for time management
+ * so it will not work in all cases.
+ */
+ public Builder withSimulatedTime(boolean simulateTime) {
+ this.simulateTime = simulateTime;
+ return this;
+ }
+
+ /**
+ * Before nimbus is created/used call nimbusWrapper on it first and use the
+ * result instead. This is intended for internal testing only, and it here to
+ * allow a mocking framework to spy on the nimbus class.
+ */
+ public Builder withNimbusWrapper(UnaryOperator<Nimbus> nimbusWrapper) {
+ this.nimbusWrapper = nimbusWrapper;
+ return this;
+ }
+
+ /**
+ * Use the following blobstore instead of the one in the config.
+ * This is intended mostly for internal testing with Mocks.
+ */
+ public Builder withBlobStore(BlobStore store) {
+ this.store = store;
+ return this;
+ }
+
+ /**
+ * Use the following clusterState instead of the one in the config.
+ * This is intended mostly for internal testing with Mocks.
+ */
+ public Builder withClusterState(IStormClusterState clusterState) {
+ this.clusterState = clusterState;
+ return this;
+ }
+
+ /**
+ * Use the following leaderElector instead of the one in the config.
+ * This is intended mostly for internal testing with Mocks.
+ */
+ public Builder withLeaderElector(ILeaderElector leaderElector) {
+ this.leaderElector = leaderElector;
+ return this;
+ }
+
+ /**
+ * A tracked cluster can run tracked topologies.
+ * See {@link org.apache.storm.testing.TrackedTopology} for more information
+ * on tracked topologies.
+ * @param trackId an arbitrary unique id that is used to keep track of tracked topologies
+ */
+ public Builder withTracked(String trackId) {
+ this.trackId = trackId;
+ return this;
+ }
+
+ /**
+ * A tracked cluster can run tracked topologies.
+ * See {@link org.apache.storm.testing.TrackedTopology} for more information
+ * on tracked topologies.
+ */
+ public Builder withTracked() {
+ this.trackId = Utils.uuid();
+ return this;
+ }
+
+ /**
+ * @return the LocalCluster
+ * @throws Exception on any one of many different errors.
+ * This is intended for testing so yes it is ugly and throws Exception...
+ */
+ public LocalCluster build() throws Exception {
+ return new LocalCluster(this);
+ }
+ }
+
+ private static class TrackedStormCommon extends StormCommon {
+ private final String id;
+ public TrackedStormCommon(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public IBolt makeAckerBoltImpl() {
+ return new NonRichBoltTracker(new Acker(), id);
+ }
+ }
+
+ private final Nimbus nimbus;
+ //This is very private and does not need to be exposed
+ private final AtomicInteger portCounter;
+ private final Map<String, Object> daemonConf;
+ private final List<Supervisor> supervisors;
+ private final IStateStorage state;
+ private final IStormClusterState clusterState;
+ private final List<TmpPath> tmpDirs;
+ private final InProcessZookeeper zookeeper;
+ private final IContext sharedContext;
+ private final ThriftServer thriftServer;
+ private final String trackId;
+ private final StormCommonInstaller commonInstaller;
+ private final SimulatedTime time;
+
+ /**
+ * Create a default LocalCluster
+ * @throws Exception on any error
+ */
+ public LocalCluster() throws Exception {
+ this(new Builder().withDaemonConf(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true));
+ }
+
+ /**
+ * Create a LocalCluster that connects to an existing Zookeeper instance
+ * @param zkHost the host for ZK
+ * @param zkPort the port for ZK
+ * @throws Exception on any error
+ */
+ public LocalCluster(String zkHost, Long zkPort) throws Exception {
+ this(new Builder().withDaemonConf(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true)
+ .withDaemonConf(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkHost))
+ .withDaemonConf(Config.STORM_ZOOKEEPER_PORT, zkPort));
+ }
+
+ @SuppressWarnings("deprecation")
+ private LocalCluster(Builder builder) throws Exception {
+ if (builder.simulateTime) {
+ time = new SimulatedTime();
+ } else {
+ time = null;
+ }
+ this.trackId = builder.trackId;
+ if (trackId != null) {
+ ConcurrentHashMap<String, AtomicInteger> metrics = new ConcurrentHashMap<>();
+ metrics.put("spout-emitted", new AtomicInteger(0));
+ metrics.put("transferred", new AtomicInteger(0));
+ metrics.put("processed", new AtomicInteger(0));
+ this.commonInstaller = new StormCommonInstaller(new TrackedStormCommon(this.trackId));
+ LOG.warn("Adding tracked metrics for ID {}", this.trackId);
+ RegisteredGlobalState.setState(this.trackId, metrics);
+ LocalExecutor.setTrackId(this.trackId);
+ } else {
+ this.commonInstaller = null;
+ }
+
+ this.tmpDirs = new ArrayList<>();
+ this.supervisors = new ArrayList<>();
+ TmpPath nimbusTmp = new TmpPath();
+ this.tmpDirs.add(nimbusTmp);
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
+ conf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
+ conf.put(Config.ZMQ_LINGER_MILLIS, 0);
+ conf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
+ conf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 50);
+ conf.put(Config.STORM_CLUSTER_MODE, "local");
+ conf.put(Config.BLOBSTORE_SUPERUSER, System.getProperty("user.name"));
+ conf.put(Config.BLOBSTORE_DIR, nimbusTmp.getPath());
+
+ InProcessZookeeper zookeeper = null;
+ if (!builder.daemonConf.containsKey(Config.STORM_ZOOKEEPER_SERVERS)) {
+ zookeeper = new InProcessZookeeper();
+ conf.put(Config.STORM_ZOOKEEPER_PORT, zookeeper.getPort());
+ conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
+ }
+ this.zookeeper = zookeeper;
+ conf.putAll(builder.daemonConf);
+ this.daemonConf = new HashMap<>(conf);
+
+ this.portCounter = new AtomicInteger(builder.supervisorSlotPortMin);
+ ClusterStateContext cs = new ClusterStateContext();
+ this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, null, cs);
+ if (builder.clusterState == null) {
+ clusterState = ClusterUtils.mkStormClusterState(this.daemonConf, null, cs);
+ } else {
+ this.clusterState = builder.clusterState;
+ }
+ //Set it for nimbus only
+ conf.put(Config.STORM_LOCAL_DIR, nimbusTmp.getPath());
+ Nimbus nimbus = new Nimbus(conf, builder.inimbus == null ? new StandaloneINimbus() : builder.inimbus,
+ this.getClusterState(), null, builder.store, builder.leaderElector, builder.groupMapper);
+ if (builder.nimbusWrapper != null) {
+ nimbus = builder.nimbusWrapper.apply(nimbus);
+ }
+ this.nimbus = nimbus;
+ this.nimbus.launchServer();
+ IContext context = null;
+ if (!Utils.getBoolean(this.daemonConf.get(Config.STORM_LOCAL_MODE_ZMQ), false)) {
+ context = new Context();
+ context.prepare(this.daemonConf);
+ }
+ this.sharedContext = context;
+ this.thriftServer = builder.nimbusDaemon ? startNimbusDaemon(this.daemonConf, this.nimbus) : null;
+
+ for (int i = 0; i < builder.supervisors; i++) {
+ addSupervisor(builder.portsPerSupervisor, null, null);
+ }
+
+ //Wait for a leader to be elected (or topology submission can be rejected)
+ try {
+ long timeoutAfter = System.currentTimeMillis() + 10_000;
+ while (!hasLeader()) {
+ if (timeoutAfter > System.currentTimeMillis()) {
+ throw new IllegalStateException("Timed out waiting for nimbus to become the leader");
+ }
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ //Ignore any exceptions we might be doing a test for authentication
+ }
+ }
+
+ private boolean hasLeader() throws AuthorizationException, TException {
+ ClusterSummary summary = getNimbus().getClusterInfo();
+ if (summary.is_set_nimbuses()) {
+ for (NimbusSummary sum: summary.get_nimbuses()) {
+ if (sum.is_isLeader()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @return Nimbus itself so you can interact with it directly, if needed.
+ */
+ public Nimbus getNimbus() {
+ return nimbus;
+ }
+
+ /**
+ * @return the base config for the daemons.
+ */
+ public Map<String, Object> getDaemonConf() {
+ return new HashMap<>(daemonConf);
+ }
+
+ public static final KillOptions KILL_NOW = new KillOptions();
+ static {
+ KILL_NOW.set_wait_secs(0);
+ }
+
+ /**
+ * When running a topology locally, for tests etc. It is helpful to be sure
+ * that the topology is dead before the test exits. This is an AutoCloseable
+ * topology that not only gives you access to the compiled StormTopology
+ * but also will kill the topology when it closes.
+ *
+ * try (LocalTopology testTopo = cluster.submitTopology("testing", ...)) {
+ * // Run Some test
+ * }
+ * // The topology has been killed
+ */
+ public class LocalTopology extends StormTopology implements ILocalTopology {
+ private static final long serialVersionUID = 6145919776650637748L;
+ private final String topoName;
+
+ public LocalTopology(String topoName, StormTopology topo) {
+ super(topo);
+ this.topoName = topoName;
+ }
+
+ @Override
+ public void close() throws TException {
+ killTopologyWithOpts(topoName, KILL_NOW);
+ }
+ }
+
+ @Override
+ public LocalTopology submitTopology(String topologyName, Map<String, Object> conf, StormTopology topology)
+ throws TException {
+ if (!Utils.isValidConf(conf)) {
+ throw new IllegalArgumentException("Topology conf is not json-serializable");
+ }
+ getNimbus().submitTopology(topologyName, null, JSONValue.toJSONString(conf), topology);
+
+ ISubmitterHook hook = (ISubmitterHook) Utils.getConfiguredClass(conf, Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN);
+ if (hook != null) {
+ TopologyInfo topologyInfo = Utils.getTopologyInfo(topologyName, null, conf);
+ try {
+ hook.notify(topologyInfo, conf, topology);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return new LocalTopology(topologyName, topology);
+ }
+
+ @Override
+ public LocalTopology submitTopologyWithOpts(String topologyName, Map<String, Object> conf, StormTopology topology, SubmitOptions submitOpts)
+ throws TException {
+ if (!Utils.isValidConf(conf)) {
+ throw new IllegalArgumentException("Topology conf is not json-serializable");
+ }
+ getNimbus().submitTopologyWithOpts(topologyName, null, JSONValue.toJSONString(conf), topology, submitOpts);
+ return new LocalTopology(topologyName, topology);
+ }
+
+ @Override
+ public LocalTopology submitTopology(String topologyName, Map<String, Object> conf, TrackedTopology topology)
+ throws TException {
+ submitTopology(topologyName, conf, topology.getTopology());
+ return new LocalTopology(topologyName, topology.getTopology());
+ }
+
+ @Override
+ public LocalTopology submitTopologyWithOpts(String topologyName, Map<String, Object> conf, TrackedTopology topology, SubmitOptions submitOpts)
+ throws TException {
+ submitTopologyWithOpts(topologyName, conf, topology.getTopology(), submitOpts);
+ return new LocalTopology(topologyName, topology.getTopology());
+ }
+
+ @Override
+ public void uploadNewCredentials(String topologyName, Credentials creds) throws TException {
+ getNimbus().uploadNewCredentials(topologyName, creds);
+ }
+
+ @Override
+ public void killTopology(String topologyName) throws TException {
+ getNimbus().killTopology(topologyName);
+ }
+
+ @Override
+ public void killTopologyWithOpts(String name, KillOptions options) throws TException {
+ getNimbus().killTopologyWithOpts(name, options);
+ }
+
+
+ @Override
+ public void activate(String topologyName) throws TException {
+ getNimbus().activate(topologyName);
+ }
+
+
+ @Override
+ public void deactivate(String topologyName) throws TException {
+ getNimbus().deactivate(topologyName);
+ }
+
+
+ @Override
+ public void rebalance(String name, RebalanceOptions options) throws TException {
+ getNimbus().rebalance(name, options);
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ @Override
+ public String getTopologyConf(String id) throws TException {
+ return getNimbus().getTopologyConf(id);
+ }
+
+
+ @Override
+ public StormTopology getTopology(String id) throws TException {
+ return getNimbus().getTopology(id);
+ }
+
+
+ @Override
+ public ClusterSummary getClusterInfo() throws TException {
+ return getNimbus().getClusterInfo();
+ }
+
+
+ @Override
+ public TopologyInfo getTopologyInfo(String id) throws TException {
+ return getNimbus().getTopologyInfo(id);
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ nimbus.shutdown();
+ if (thriftServer != null) {
+ LOG.info("shutting down thrift server");
+ try {
+ thriftServer.stop();
+ } catch (Exception e) {
+ LOG.info("failed to stop thrift", e);
+ }
+ }
+ if (state != null) {
+ state.close();
+ }
+ if (getClusterState() != null) {
+ getClusterState().disconnect();
+ }
+ for (Supervisor s: supervisors) {
+ s.shutdownAllWorkers(null, ReadClusterState.THREAD_DUMP_ON_ERROR);
+ s.close();
+ }
+ ProcessSimulator.killAllProcesses();
+ if (zookeeper != null) {
+ LOG.info("Shutting down in process zookeeper");
+ zookeeper.close();
+ LOG.info("Done shutting down in process zookeeper");
+ }
+
+ for (TmpPath p: tmpDirs) {
+ p.close();
+ }
+
+ if (this.trackId != null) {
+ LOG.warn("Clearing tracked metrics for ID {}", this.trackId);
+ LocalExecutor.clearTrackId();
+ RegisteredGlobalState.clearState(this.trackId);
+ }
+
+ if (this.commonInstaller != null) {
+ this.commonInstaller.close();
+ }
+
+ if (time != null) {
+ time.close();
+ }
+ }
+
+ /**
+ * Get a specific Supervisor. This is intended mostly for internal testing.
+ * @param id the id of the supervisor
+ */
+ public synchronized Supervisor getSupervisor(String id) {
+ for (Supervisor s: supervisors) {
+ if (id.equals(s.getId())) {
+ return s;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Kill a specific supervisor. This is intended mostly for internal testing.
+ * @param id the id of the supervisor
+ */
+ public synchronized void killSupervisor(String id) {
+ for (Iterator<Supervisor> it = supervisors.iterator(); it.hasNext();) {
+ Supervisor s = it.next();
+ if (id.equals(s.getId())) {
+ it.remove();
+ s.close();
+ //tmpDir will be handled separately
+ return;
+ }
+ }
+ }
+
+ /**
+ * Add another supervisor to the topology. This is intended mostly for internal testing.
+ */
+ public Supervisor addSupervisor() throws Exception {
+ return addSupervisor(null, null, null);
+ }
+
+ /**
+ * Add another supervisor to the topology. This is intended mostly for internal testing.
+ * @param ports the number of ports/slots the supervisor should have
+ */
+ public Supervisor addSupervisor(Number ports) throws Exception {
+ return addSupervisor(ports, null, null);
+ }
+
+ /**
+ * Add another supervisor to the topology. This is intended mostly for internal testing.
+ * @param ports the number of ports/slots the supervisor should have
+ * @param id the id of the new supervisor, so you can find it later.
+ */
+ public Supervisor addSupervisor(Number ports, String id) throws Exception {
+ return addSupervisor(ports, null, id);
+ }
+
+ /**
+ * Add another supervisor to the topology. This is intended mostly for internal testing.
+ * @param ports the number of ports/slots the supervisor should have
+ * @param conf any config values that should be added/over written in the daemon conf of the cluster.
+ * @param id the id of the new supervisor, so you can find it later.
+ */
+ public synchronized Supervisor addSupervisor(Number ports, Map<String, Object> conf, String id) throws Exception {
+ if (ports == null) {
+ ports = 2;
+ }
+ TmpPath tmpDir = new TmpPath();
+ tmpDirs.add(tmpDir);
+
+ List<Integer> portNumbers = new ArrayList<>(ports.intValue());
+ for (int i = 0; i < ports.intValue(); i++) {
+ portNumbers.add(portCounter.getAndIncrement());
+ }
+
+ Map<String, Object> superConf = new HashMap<>(daemonConf);
+ if (conf != null) {
+ superConf.putAll(conf);
+ }
+ superConf.put(Config.STORM_LOCAL_DIR, tmpDir.getPath());
+ superConf.put(Config.SUPERVISOR_SLOTS_PORTS, portNumbers);
+
+ final String superId = id == null ? Utils.uuid() : id;
+ ISupervisor isuper = new StandaloneSupervisor() {
+ @Override
+ public String generateSupervisorId() {
+ return superId;
+ }
+ };
+ if (!ConfigUtils.isLocalMode(superConf)) {
+ throw new IllegalArgumentException("Cannot start server in distrubuted mode!");
+ }
+
+ Supervisor s = new Supervisor(superConf, sharedContext, isuper);
+ s.launch();
+ supervisors.add(s);
+ return s;
+ }
+
+ private boolean areAllSupervisorsWaiting() {
+ boolean ret = true;
+ for (Supervisor s: supervisors) {
+ ret = ret && s.isWaiting();
+ }
+ return ret;
+ }
+
+ private static boolean areAllWorkersWaiting() {
+ boolean ret = true;
+ for (Shutdownable s: ProcessSimulator.getAllProcessHandles()) {
+ if (s instanceof DaemonCommon) {
+ ret = ret && ((DaemonCommon)s).isWaiting();
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Wait for the cluster to be idle. This is intended to be used with
+ * Simulated time and is for internal testing.
+ * @throws InterruptedException if interrupted while waiting.
+ * @throws AssertionError if the cluster did not come to an idle point with
+ * a timeout.
+ */
+ public void waitForIdle() throws InterruptedException {
+ waitForIdle(Testing.TEST_TIMEOUT_MS);
+ }
+
+ /**
+ * Wait for the cluster to be idle. This is intended to be used with
+ * Simulated time and is for internal testing.
+ * @param timeoutMs the number of ms to wait before throwing an error.
+ * @throws InterruptedException if interrupted while waiting.
+ * @throws AssertionError if the cluster did not come to an idle point with
+ * a timeout.
+ */
+ public void waitForIdle(long timeoutMs) throws InterruptedException {
+ Random rand = ThreadLocalRandom.current();
+ //wait until all workers, supervisors, and nimbus is waiting
+ final long endTime = System.currentTimeMillis() + timeoutMs;
+ while (!(nimbus.isWaiting() &&
+ areAllSupervisorsWaiting() &&
+ areAllWorkersWaiting())) {
+ if (System.currentTimeMillis() >= endTime) {
+ LOG.info("Cluster was not idle in {} ms", timeoutMs);
+ LOG.info(Utils.threadDump());
+ throw new AssertionError("Test timed out (" + timeoutMs + "ms) cluster not idle");
+ }
+ Thread.sleep(rand.nextInt(20));
+ }
+ }
+
+ @Override
+ public void advanceClusterTime(int secs) throws InterruptedException {
+ advanceClusterTime(secs, 1);
+ }
+
+ @Override
+ public void advanceClusterTime(int secs, int incSecs) throws InterruptedException {
+ for (int amountLeft = secs; amountLeft > 0; amountLeft -= incSecs) {
+ int diff = Math.min(incSecs, amountLeft);
+ Time.advanceTimeSecs(diff);
+ waitForIdle();
+ }
+ }
+
+ @Override
+ public IStormClusterState getClusterState() {
+ return clusterState;
+ }
+
+ @Override
+ public String getTrackedId() {
+ return trackId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/LocalDRPC.java b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
index ccdf634..e3f34d1 100644
--- a/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
+++ b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
@@ -17,24 +17,30 @@
*/
package org.apache.storm;
+import java.util.Map;
+
import org.apache.storm.daemon.DrpcServer;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.DRPCExecutionException;
import org.apache.storm.generated.DRPCRequest;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServiceRegistry;
-import org.apache.storm.utils.Utils;
import org.apache.thrift.TException;
-import java.util.Map;
-
+/**
+ * A Local way to test DRPC
+ *
+ * try (LocalDRPC drpc = new LocalDRPC()) {
+ * // Do tests
+ * }
+ */
public class LocalDRPC implements ILocalDRPC {
private final DrpcServer handler;
private final String serviceId;
public LocalDRPC() {
- Map conf = ConfigUtils.readStormConfig();
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
handler = new DrpcServer(conf);
serviceId = ServiceRegistry.registerService(handler);
}
@@ -61,12 +67,17 @@ public class LocalDRPC implements ILocalDRPC {
@Override
public void shutdown() {
- ServiceRegistry.unregisterService(this.serviceId);
- this.handler.close();
+ close();
}
@Override
public DRPCRequest fetchRequest(String functionName) throws AuthorizationException, TException {
return handler.fetchRequest(functionName);
}
+
+ @Override
+ public void close() {
+ ServiceRegistry.unregisterService(this.serviceId);
+ this.handler.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/Testing.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Testing.java b/storm-core/src/jvm/org/apache/storm/Testing.java
new file mode 100644
index 0000000..be90ae8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/Testing.java
@@ -0,0 +1,716 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.CompletableSpout;
+import org.apache.storm.testing.CompleteTopologyParam;
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.FixedTupleSpout;
+import org.apache.storm.testing.MkClusterParam;
+import org.apache.storm.testing.MkTupleParam;
+import org.apache.storm.testing.MockedSources;
+import org.apache.storm.testing.TestJob;
+import org.apache.storm.testing.TrackedTopology;
+import org.apache.storm.testing.TupleCaptureBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.RegisteredGlobalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A utility that helps with testing topologies, Bolts and Spouts.
+ */
+public class Testing {
+ private static final Logger LOG = LoggerFactory.getLogger(Testing.class);
+ /**
+ * The default amount of wall time should be spent waiting for
+ * specific conditions to happen. Default is 10 seconds unless
+ * the environment variable STORM_TEST_TIMEOUT_MS is set.
+ */
+ public static final int TEST_TIMEOUT_MS;
+ static {
+ int timeout = 10_000;
+ try {
+ timeout = Integer.parseInt(System.getenv("STORM_TEST_TIMEOUT_MS"));
+ } catch (Exception e) {
+ //Ignored, will go with default timeout
+ }
+ TEST_TIMEOUT_MS = timeout;
+ }
+
+ /**
+ * Simply produces a boolean to see if a specific state is true or false.
+ */
+ public static interface Condition {
+ public boolean exec();
+ }
+
+ /**
+ * Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has
+ * passed
+ * @param condition what we are waiting for
+ * @param body what to run in the loop
+ * @throws AssertionError if teh loop timed out.
+ */
+ public static void whileTimeout(Condition condition, Runnable body) {
+ whileTimeout(TEST_TIMEOUT_MS, condition, body);
+ }
+
+ /**
+ * Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has
+ * passed
+ * @param the number of ms to wait before timing out.
+ * @param condition what we are waiting for
+ * @param body what to run in the loop
+ * @throws AssertionError if teh loop timed out.
+ */
+ public static void whileTimeout(long timeoutMs, Condition condition, Runnable body) {
+ long endTime = System.currentTimeMillis() + timeoutMs;
+ LOG.debug("Looping until {}", condition);
+ while (condition.exec()) {
+ if (System.currentTimeMillis() > endTime) {
+ LOG.info("Condition {} not met in {} ms", condition, timeoutMs);
+ LOG.info(Utils.threadDump());
+ throw new AssertionError("Test timed out (" + timeoutMs + "ms) " + condition);
+ }
+ body.run();
+ }
+ LOG.debug("Condition met {}", condition);
+ }
+
+ /**
+ * Convenience method for data.stream.allMatch(pred)
+ */
+ public static <T> boolean isEvery(Collection<T> data, Predicate<T> pred) {
+ return data.stream().allMatch(pred);
+ }
+
+ /**
+ * Run with simulated time
+ * @deprecated use ```
+ * try (Time.SimulatedTime time = new Time.SimulatedTime()) {
+ * ...
+ * }
+ * ```
+ * @param code what to run
+ */
+ @Deprecated
+ public static void withSimulatedTime(Runnable code) {
+ try (SimulatedTime st = new SimulatedTime()) {
+ code.run();
+ }
+ }
+
+ private static LocalCluster cluster(MkClusterParam param, boolean simulated) throws Exception {
+ return cluster(param, null, simulated);
+ }
+
+ private static LocalCluster cluster(MkClusterParam param) throws Exception {
+ return cluster(param, null, false);
+ }
+
+ private static LocalCluster cluster(MkClusterParam param, String id, boolean simulated) throws Exception {
+ Integer supervisors = param.getSupervisors();
+ if (supervisors == null) {
+ supervisors = 2;
+ }
+ Integer ports = param.getPortsPerSupervisor();
+ if (ports == null) {
+ ports = 3;
+ }
+ Map<String, Object> conf = param.getDaemonConf();
+ if (conf == null) {
+ conf = new HashMap<>();
+ }
+ return new LocalCluster.Builder()
+ .withSupervisors(supervisors)
+ .withPortsPerSupervisor(ports)
+ .withDaemonConf(conf)
+ .withNimbusDaemon(param.isNimbusDaemon())
+ .withTracked(id)
+ .withSimulatedTime(simulated)
+ .build();
+ }
+
+ /**
+ * Run with a local cluster
+ * @deprecated use ```
+ * try (LocalCluster cluster = new LocalCluster()) {
+ * ...
+ * }
+ * ```
+ * @param code what to run
+ */
+ @Deprecated
+ public static void withLocalCluster(TestJob code) {
+ withLocalCluster(new MkClusterParam(), code);
+ }
+
+ /**
+ * Run with a local cluster
+ * @deprecated use ```
+ * try (LocalCluster cluster = new LocalCluster.Builder()....build()) {
+ * ...
+ * }
+ * ```
+ * @param param configs to set in the cluster
+ * @param code what to run
+ */
+ @Deprecated
+ public static void withLocalCluster(MkClusterParam param, TestJob code) {
+ try (LocalCluster lc = cluster(param)) {
+ code.run(lc);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Run with a local cluster
+ * @deprecated use ```
+ * try (LocalCluster cluster = new LocalCluster.Builder()....build()) {
+ * ...
+ * }
+ * ```
+ * @param clusterConf some configs to set in the cluster
+ */
+ @Deprecated
+ public static ILocalCluster getLocalCluster(Map<String, Object> clusterConf) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> conf = (Map<String, Object>) clusterConf.get("daemon-conf");
+ if (conf == null) {
+ conf = new HashMap<>();
+ }
+ Number supervisors = (Number) clusterConf.getOrDefault("supervisors", 2);
+ Number ports = (Number) clusterConf.getOrDefault("ports-per-supervisor", 3);
+ INimbus inimbus = (INimbus) clusterConf.get("inimbus");
+ Number portMin = (Number) clusterConf.getOrDefault("supervisor-slot-port-min", 1024);
+ Boolean nimbusDaemon = (Boolean) clusterConf.getOrDefault("nimbus-daemon", false);
+ try {
+ return new LocalCluster.Builder()
+ .withSupervisors(supervisors.intValue())
+ .withDaemonConf(conf)
+ .withPortsPerSupervisor(ports.intValue())
+ .withINimbus(inimbus)
+ .withSupervisorSlotPortMin(portMin)
+ .withNimbusDaemon(nimbusDaemon)
+ .build();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Run with a local cluster
+ * @deprecated use ```
+ * try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) {
+ * ...
+ * }
+ * ```
+ * @param code what to run
+ */
+ @Deprecated
+ public static void withSimulatedTimeLocalCluster(TestJob code) {
+ withSimulatedTimeLocalCluster(new MkClusterParam(), code);
+ }
+
+ /**
+ * Run with a local cluster
+ * @deprecated use ```
+ * try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()....build()) {
+ * ...
+ * }
+ * ```
+ * @param param configs to set in the cluster
+ * @param code what to run
+ */
+ @Deprecated
+ public static void withSimulatedTimeLocalCluster(MkClusterParam param, TestJob code) {
+ try (LocalCluster lc = cluster(param, true)) {
+ code.run(lc);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Run with a local cluster
+ * @deprecated use ```
+ * try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) {
+ * ...
+ * }
+ * ```
+ * @param code what to run
+ */
+ @Deprecated
+ public static void withTrackedCluster(TestJob code) {
+ withTrackedCluster(new MkClusterParam(), code);
+ }
+
+ /**
+ * In a tracked topology some metrics are tracked. This provides a way to get those metrics.
+ * This is intended mostly for internal testing.
+ * @param id the id of the tracked cluster
+ * @param key the name of the metric to get.
+ * @return the metric
+ */
+ @SuppressWarnings("unchecked")
+ @Deprecated
+ public static int globalAmt(String id, String key) {
+ LOG.warn("Reading tracked metrics for ID {}", id);
+ return ((ConcurrentHashMap<String, AtomicInteger>)RegisteredGlobalState.getState(id)).get(key).get();
+ }
+
+ /**
+ * Run with a local tracked cluster
+ * @deprecated use ```
+ * try (LocalCluster cluster = new LocalCluster.Builder().withTracked()....build()) {
+ * ...
+ * }
+ * ```
+ * @param param configs to set in the cluster
+ * @param code what to run
+ */
+ @Deprecated
+ public static void withTrackedCluster(MkClusterParam param, TestJob code) {
+ try (LocalCluster lc = cluster(param, Utils.uuid(), true)) {
+ code.run(lc);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * A topology that has all messages captured and can be read later on.
+ * This is intended mostly for internal testing.
+ * @param <T> the topology (tracked or regular)
+ */
+ public static final class CapturedTopology<T> {
+ public final T topology;
+ /**
+ * a Bolt that will hold all of the captured data.
+ */
+ public final TupleCaptureBolt capturer;
+
+ public CapturedTopology(T topology, TupleCaptureBolt capturer) {
+ this.topology = topology;
+ this.capturer = capturer;
+ }
+ }
+
+ /**
+ * Track and capture a topology.
+ * This is intended mostly for internal testing.
+ */
+ public static CapturedTopology<TrackedTopology> trackAndCaptureTopology(ILocalCluster cluster, StormTopology topology) {
+ CapturedTopology<StormTopology> captured = captureTopology(topology);
+ return new CapturedTopology<>(new TrackedTopology(captured.topology, cluster), captured.capturer);
+ }
+
+ /**
+ * Rewrites a topology so that all the tuples flowing through it are captured
+ * @param topology the topology to rewrite
+ * @return the modified topology and a new Bolt that can retrieve the
+ * captured tuples.
+ */
+ public static CapturedTopology<StormTopology> captureTopology(StormTopology topology) {
+ topology = topology.deepCopy(); //Don't modify the original
+
+ TupleCaptureBolt capturer = new TupleCaptureBolt();
+ Map<GlobalStreamId, Grouping> captureBoltInputs = new HashMap<>();
+ for (Map.Entry<String, SpoutSpec> spoutEntry : topology.get_spouts().entrySet()) {
+ String id = spoutEntry.getKey();
+ for (Entry<String, StreamInfo> streamEntry : spoutEntry.getValue().get_common().get_streams().entrySet()) {
+ String stream = streamEntry.getKey();
+ StreamInfo info = streamEntry.getValue();
+ if (info.is_direct()) {
+ captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareDirectGrouping());
+ } else {
+ captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareGlobalGrouping());
+ }
+ }
+ }
+
+ for (Entry<String, Bolt> boltEntry : topology.get_bolts().entrySet()) {
+ String id = boltEntry.getKey();
+ for (Entry<String, StreamInfo> streamEntry : boltEntry.getValue().get_common().get_streams().entrySet()) {
+ String stream = streamEntry.getKey();
+ StreamInfo info = streamEntry.getValue();
+ if (info.is_direct()) {
+ captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareDirectGrouping());
+ } else {
+ captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareGlobalGrouping());
+ }
+ }
+ }
+ topology.put_to_bolts(Utils.uuid(), new Bolt(Thrift.serializeComponentObject(capturer),
+ Thrift.prepareComponentCommon(captureBoltInputs, new HashMap<>(), null)));
+ return new CapturedTopology<>(topology, capturer);
+ }
+
+ /**
+ * Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are
+ * instances of {@link org.apache.storm.testing.CompletableSpout}
+ * @param cluster the cluster to submit the topology to
+ * @param topology the topology itself
+ * @return a map of the component to the list of tuples it emitted.
+ * @throws InterruptedException
+ * @throws TException on any error from nimbus.
+ */
+ public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology) throws InterruptedException, TException {
+ return completeTopology(cluster, topology, new CompleteTopologyParam());
+ }
+
+ /**
+ * Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are
+ * instances of {@link org.apache.storm.testing.CompletableSpout} or are overwritten by MockedSources in param
+ * @param cluster the cluster to submit the topology to
+ * @param topology the topology itself
+ * @param param parameters to describe how to complete a topology.
+ * @return a map of the component to the list of tuples it emitted.
+ * @throws InterruptedException
+ * @throws TException on any error from nimbus.
+ */
+ public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology,
+ CompleteTopologyParam param) throws TException, InterruptedException {
+ Map<String, List<FixedTuple>> ret = null;
+ IStormClusterState state = cluster.getClusterState();
+ CapturedTopology<StormTopology> capTopo = captureTopology(topology);
+ topology = capTopo.topology;
+ String topoName = param.getTopologyName();
+ if (topoName == null) {
+ topoName = "topologytest-" + Utils.uuid();
+ }
+
+ Map<String, SpoutSpec> spouts = topology.get_spouts();
+ MockedSources ms = param.getMockedSources();
+ if (ms != null) {
+ for (Entry<String, List<FixedTuple>> mocked: ms.getData().entrySet()) {
+ FixedTupleSpout newSpout = new FixedTupleSpout(mocked.getValue());
+ spouts.get(mocked.getKey()).set_spout_object(Thrift.serializeComponentObject(newSpout));
+ }
+ }
+ List<Object> spoutObjects = spouts.values().stream().
+ map((spec) -> Thrift.deserializeComponentObject(spec.get_spout_object())).collect(Collectors.toList());
+
+ for (Object o: spoutObjects) {
+ if (!(o instanceof CompletableSpout)) {
+ throw new RuntimeException("Cannot complete topology unless every spout is a CompletableSpout (or mocked to be); failed by " + o);
+ }
+ }
+
+ for (Object spout: spoutObjects) {
+ ((CompletableSpout)spout).startup();
+ }
+
+ cluster.submitTopology(topoName, param.getStormConf(), topology);
+
+ if (Time.isSimulating()) {
+ cluster.advanceClusterTime(11);
+ }
+
+ String topoId = state.getTopoId(topoName).get();
+ //Give the topology time to come up without using it to wait for the spouts to complete
+ simulateWait(cluster);
+ Integer timeoutMs = param.getTimeoutMs();
+ if (timeoutMs == null) {
+ timeoutMs = TEST_TIMEOUT_MS;
+ }
+ whileTimeout(timeoutMs,
+ () -> !isEvery(spoutObjects, (o) -> ((CompletableSpout)o).isExhausted()),
+ () -> {
+ try {
+ simulateWait(cluster);
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+ });
+
+ KillOptions killOpts = new KillOptions();
+ killOpts.set_wait_secs(0);
+ cluster.killTopologyWithOpts(topoName, killOpts);
+
+ whileTimeout(timeoutMs,
+ () -> state.assignmentInfo(topoId, null) != null,
+ () -> {
+ try {
+ simulateWait(cluster);
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+ });
+
+ if (param.getCleanupState()) {
+ for (Object o : spoutObjects) {
+ ((CompletableSpout)o).clean();
+ }
+ ret = capTopo.capturer.getAndRemoveResults();
+ } else {
+ ret = capTopo.capturer.getAndClearResults();
+ }
+
+ return ret;
+ }
+
+ /**
+ * If using simulated time simulate waiting for 10 seconds. This is intended for internal testing only.
+ */
+ public static void simulateWait(ILocalCluster cluster) throws InterruptedException {
+ if (Time.isSimulating()) {
+ cluster.advanceClusterTime(10);
+ Thread.sleep(100);
+ }
+ }
+
+ /**
+ * Get all of the tuples from a given component on the default stream
+ * @param results the results of running a completed topology
+ * @param componentId the id of the component to look at
+ * @return a list of the tuple values.
+ */
+ public static List<List<Object>> readTuples(Map<String, List<FixedTuple>> results, String componentId) {
+ return readTuples(results, componentId, Utils.DEFAULT_STREAM_ID);
+ }
+
+ /**
+ * Get all of the tuples from a given component on a given stream
+ * @param results the results of running a completed topology
+ * @param componentId the id of the component to look at
+ * @param streamId the id of the stream to look for.
+ * @return a list of the tuple values.
+ */
+ public static List<List<Object>> readTuples(Map<String, List<FixedTuple>> results, String componentId, String streamId) {
+ List<List<Object>> ret = new ArrayList<>();
+ List<FixedTuple> streamResult = results.get(componentId);
+ if (streamResult != null) {
+ for (FixedTuple tuple: streamResult) {
+ if (streamId.equals(tuple.stream)) {
+ ret.add(tuple.values);
+ }
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Create a tracked topology.
+ * @deprecated use {@link org.apache.storm.testing.TrackedTopology} directly.
+ */
+ @Deprecated
+ public static TrackedTopology mkTrackedTopology(ILocalCluster cluster, StormTopology topology) {
+ return new TrackedTopology(topology, cluster);
+ }
+
+ /**
+ * Simulated time wait for a tracked topology. This is intended for internal testing
+ */
+ public static void trackedWait(CapturedTopology<TrackedTopology> topo) {
+ topo.topology.trackedWait();
+ }
+
+ /**
+ * Simulated time wait for a tracked topology. This is intended for internal testing
+ */
+ public static void trackedWait(CapturedTopology<TrackedTopology> topo, Integer amt) {
+ topo.topology.trackedWait(amt);
+ }
+
+ /**
+ * Simulated time wait for a tracked topology. This is intended for internal testing
+ */
+ public static void trackedWait(CapturedTopology<TrackedTopology> topo, Integer amt, Integer timeoutMs) {
+ topo.topology.trackedWait(amt, timeoutMs);
+ }
+
+ /**
+ * Simulated time wait for a tracked topology. This is intended for internal testing
+ */
+ public static void trackedWait(TrackedTopology topo) {
+ topo.trackedWait();
+ }
+
+ /**
+ * Simulated time wait for a tracked topology. This is intended for internal testing
+ */
+ public static void trackedWait(TrackedTopology topo, Integer amt) {
+ topo.trackedWait(amt);
+ }
+
+ /**
+ * Simulated time wait for a tracked topology. This is intended for internal testing
+ */
+ public static void trackedWait(TrackedTopology topo, Integer amt, Integer timeoutMs) {
+ topo.trackedWait(amt, timeoutMs);
+ }
+
+ /**
+ * Simulated time wait for a cluster. This is intended for internal testing
+ */
+ public static void advanceClusterTime(ILocalCluster cluster, Integer secs) throws InterruptedException {
+ advanceClusterTime(cluster, secs, 1);
+ }
+
+ /**
+ * Simulated time wait for a cluster. This is intended for internal testing
+ */
+ public static void advanceClusterTime(ILocalCluster cluster, Integer secs, Integer step) throws InterruptedException {
+ cluster.advanceClusterTime(secs, step);
+ }
+
+ /**
+ * Count how many times each element appears in the Collection
+ * @param c a collection of values
+ * @return a map of the unique values in c to the count of those values.
+ */
+ public static <T> Map<T, Integer> multiset(Collection<T> c) {
+ Map<T, Integer> ret = new HashMap<T, Integer>();
+ for (T t: c) {
+ Integer i = ret.get(t);
+ if (i == null) {
+ i = new Integer(0);
+ }
+ i += 1;
+ ret.put(t, i);
+ }
+ return ret;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static void printRec(Object o, String prefix) {
+ if (o instanceof Collection) {
+ LOG.info("{} {} ({}) [",prefix,o, o.getClass());
+ for (Object sub: (Collection)o) {
+ printRec(sub, prefix + " ");
+ }
+ LOG.info("{} ]",prefix);
+ } else if (o instanceof Map) {
+ Map<?,?> m = (Map<?,?>)o;
+ LOG.info("{} {} ({}) {",prefix,o, o.getClass());
+ for (Map.Entry<?, ?> entry: m.entrySet()) {
+ printRec(entry.getKey(), prefix + " ");
+ LOG.info("{} ->", prefix);
+ printRec(entry.getValue(), prefix + " ");
+ }
+ LOG.info("{} }",prefix);
+ } else {
+ LOG.info("{} {} ({})", prefix, o, o.getClass());
+ }
+ }
+
+ /**
+ * Check if two collections are equivalent ignoring the order of elements.
+ */
+ public static <T> boolean multiseteq(Collection<T> a, Collection<T> b) {
+ boolean ret = multiset(a).equals(multiset(b));
+ if (! ret) {
+ printRec(multiset(a), "MS-A:");
+ printRec(multiset(b), "MS-B:");
+ }
+ return ret;
+ }
+
+ /**
+ * Create a {@link org.apache.storm.tuple.Tuple} for use with testing
+ * @param values the values to appear in the tuple
+ */
+ public static Tuple testTuple(List<Object> values) {
+ return testTuple(values, new MkTupleParam());
+ }
+
+ /**
+ * Create a {@link org.apache.storm.tuple.Tuple} for use with testing
+ * @param values the values to appear in the tuple
+ * @param param parametrs describing more details about the tuple
+ */
+ public static Tuple testTuple(List<Object> values, MkTupleParam param) {
+ String stream = param.getStream();
+ if (stream == null) {
+ stream = Utils.DEFAULT_STREAM_ID;
+ }
+
+ String component = param.getComponent();
+ if (component == null) {
+ component = "component";
+ }
+
+ int task = 1;
+
+ List<String> fields = param.getFields();
+ if (fields == null) {
+ fields = new ArrayList<>(values.size());
+ for (int i = 1; i <= values.size(); i++) {
+ fields.add("field"+i);
+ }
+ }
+
+ Map<Integer, String> taskToComp = new HashMap<>();
+ taskToComp.put(task, component);
+ Map<String, Map<String, Fields>> compToStreamToFields = new HashMap<>();
+ Map<String, Fields> streamToFields = new HashMap<>();
+ streamToFields.put(stream, new Fields(fields));
+ compToStreamToFields.put(component, streamToFields);
+
+ TopologyContext context= new TopologyContext(null,
+ ConfigUtils.readStormConfig(),
+ taskToComp,
+ null,
+ compToStreamToFields,
+ "test-storm-id",
+ null,
+ null,
+ 1,
+ null,
+ null,
+ new HashMap<>(),
+ new HashMap<>(),
+ new HashMap<>(),
+ new HashMap<>(),
+ new AtomicBoolean(false));
+ return new TupleImpl(context, values, 1, stream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/Thrift.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Thrift.java b/storm-core/src/jvm/org/apache/storm/Thrift.java
index cde822f..9ed8bb2 100644
--- a/storm-core/src/jvm/org/apache/storm/Thrift.java
+++ b/storm-core/src/jvm/org/apache/storm/Thrift.java
@@ -278,7 +278,7 @@ public class Thrift {
public static SpoutSpec prepareSerializedSpoutDetails(IRichSpout spout, Map<String, StreamInfo> outputs) {
return new SpoutSpec(ComponentObject.serialized_java
- (Utils.javaSerialize(spout)), prepareComponentCommon(new HashMap(), outputs, null, null));
+ (Utils.javaSerialize(spout)), prepareComponentCommon(new HashMap<>(), outputs, null, null));
}
public static Bolt prepareSerializedBoltDetails(Map<GlobalStreamId, Grouping> inputs, IBolt bolt, Map<String, StreamInfo> outputs,
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
index c0b78d3..c550cfe 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -30,6 +30,7 @@ import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ZookeeperAuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +49,8 @@ public class BlobStoreUtils {
private static final String BLOB_DEPENDENCIES_PREFIX = "dep-";
private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtils.class);
- public static CuratorFramework createZKClient(Map conf) {
+ public static CuratorFramework createZKClient(Map<String, Object> conf) {
+ @SuppressWarnings("unchecked")
List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
@@ -106,7 +108,7 @@ public class BlobStoreUtils {
}
// Download missing blobs from potential nimbodes
- public static boolean downloadMissingBlob(Map conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
+ public static boolean downloadMissingBlob(Map<String, Object> conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
throws TTransportException {
NimbusClient client;
ReadableBlobMeta rbm;
@@ -156,7 +158,7 @@ public class BlobStoreUtils {
}
// Download updated blobs from potential nimbodes
- public static boolean downloadUpdatedBlob(Map conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
+ public static boolean downloadUpdatedBlob(Map<String, Object> conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos)
throws TTransportException {
NimbusClient client;
ClientBlobStore remoteBlobStore;
@@ -215,17 +217,17 @@ public class BlobStoreUtils {
return keyList;
}
- public static void createStateInZookeeper(Map conf, String key, NimbusInfo nimbusInfo) throws TTransportException {
+ public static void createStateInZookeeper(Map<String, Object> conf, String key, NimbusInfo nimbusInfo) throws TTransportException {
ClientBlobStore cb = new NimbusBlobStore();
cb.setClient(conf, new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null));
cb.createStateInZookeeper(key);
}
- public static void updateKeyForBlobStore (Map conf, BlobStore blobStore, CuratorFramework zkClient, String key, NimbusInfo nimbusDetails) {
+ public static void updateKeyForBlobStore (Map<String, Object> conf, BlobStore blobStore, CuratorFramework zkClient, String key, NimbusInfo nimbusDetails) {
try {
// Most of clojure tests currently try to access the blobs using getBlob. Since, updateKeyForBlobStore
// checks for updating the correct version of the blob as a part of nimbus ha before performing any
- // operation on it, there is a neccessity to stub several test cases to ignore this method. It is a valid
+ // operation on it, there is a necessity to stub several test cases to ignore this method. It is a valid
// trade off to return if nimbusDetails which include the details of the current nimbus host port data are
// not initialized as a part of the test. Moreover, this applies to only local blobstore when used along with
// nimbus ha.
@@ -238,6 +240,7 @@ public class BlobStoreUtils {
return;
}
stateInfo = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+
LOG.debug("StateInfo for update {}", stateInfo);
Set<NimbusInfo> nimbusInfoList = getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
@@ -252,6 +255,9 @@ public class BlobStoreUtils {
LOG.debug("Updating state inside zookeeper for an update");
createStateInZookeeper(conf, key, nimbusDetails);
}
+ } catch (NoNodeException e) {
+ //race condition with a delete
+ return;
} catch (Exception exp) {
throw new RuntimeException(exp);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
index c2c62bd..8df981b 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -26,6 +26,7 @@ import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -270,7 +271,12 @@ public class LocalFsBlobStore extends BlobStore {
if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == null) {
return 0;
}
- replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size();
+ try {
+ replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size();
+ } catch (NoNodeException e) {
+ //Race with delete
+ //If it is not here the replication is 0
+ }
return replicationCount;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 443e471..704c9e5 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -53,6 +53,12 @@ public interface IStormClusterState {
public List<String> activeStorms();
+ /**
+ * Get a storm base for a topology
+ * @param stormId the id of the topology
+ * @param callback something to call if the data changes (best effort)
+ * @return the StormBase or null if it is not alive.
+ */
public StormBase stormBase(String stormId, Runnable callback);
public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
@@ -183,8 +189,10 @@ public interface IStormClusterState {
Map<String, StormBase> stormBases = new HashMap<>();
for (String topologyId : activeStorms()) {
StormBase base = stormBase(topologyId, null);
- stormBases.put(topologyId, base);
+ if (base != null) { //rece condition with delete
+ stormBases.put(topologyId, base);
+ }
}
return stormBases;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
index 63d39f8..65c7416 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -749,9 +749,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private static String extractStatusStr(StormBase base) {
String ret = null;
- TopologyStatus status = base.get_status();
- if (status != null) {
- ret = status.name().toUpperCase();
+ if (base != null) {
+ TopologyStatus status = base.get_status();
+ if (status != null) {
+ ret = status.name().toUpperCase();
+ }
}
return ret;
}
@@ -1308,18 +1310,15 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
minReplicationCount, confCount, codeCount, jarCount);
}
- private TopologyDetails readTopologyDetails(String topoId) throws NotAliveException, KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
- StormBase base = stormClusterState.stormBase(topoId, null);
- if (base == null) {
- if (topoId == null) {
- throw new NullPointerException();
- }
- throw new NotAliveException(topoId);
- }
+ private TopologyDetails readTopologyDetails(String topoId, StormBase base) throws KeyNotFoundException,
+ AuthorizationException, IOException, InvalidTopologyException {
+ assert (base != null);
+ assert (topoId != null);
+
BlobStore store = blobStore;
Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
StormTopology topo = readStormTopologyAsNimbus(topoId, store);
- Map<List<Integer>, String> rawExecToComponent = computeExecutorToComponent(topoId);
+ Map<List<Integer>, String> rawExecToComponent = computeExecutorToComponent(topoId, base);
Map<ExecutorDetails, String> executorsToComponent = new HashMap<>();
for (Entry<List<Integer>, String> entry: rawExecToComponent.entrySet()) {
List<Integer> execs = entry.getKey();
@@ -1380,9 +1379,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return ret;
}
- private List<List<Integer>> computeExecutors(String topoId) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
+ private List<List<Integer>> computeExecutors(String topoId, StormBase base) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
BlobStore store = blobStore;
- StormBase base = stormClusterState.stormBase(topoId, null);
+ assert (base != null);
+
Map<String, Integer> compToExecutors = base.get_component_executors();
Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
StormTopology topology = readStormTopologyAsNimbus(topoId, store);
@@ -1405,9 +1405,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return ret;
}
- private Map<List<Integer>, String> computeExecutorToComponent(String topoId) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
+ private Map<List<Integer>, String> computeExecutorToComponent(String topoId, StormBase base) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
BlobStore store = blobStore;
- List<List<Integer>> executors = computeExecutors(topoId);
+ List<List<Integer>> executors = computeExecutors(topoId, base);
StormTopology topology = readStormTopologyAsNimbus(topoId, store);
Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
Map<Integer, String> taskToComponent = StormCommon.stormTaskInfo(topology, topoConf);
@@ -1418,11 +1418,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return ret;
}
- private Map<String, Set<List<Integer>>> computeTopologyToExecutors(Collection<String> topoIds) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
+ private Map<String, Set<List<Integer>>> computeTopologyToExecutors(Map<String, StormBase> bases) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
Map<String, Set<List<Integer>>> ret = new HashMap<>();
- if (topoIds != null) {
- for (String topoId: topoIds) {
- ret.put(topoId, new HashSet<>(computeExecutors(topoId)));
+ if (bases != null) {
+ for (Entry<String, StormBase> entry: bases.entrySet()) {
+ String topoId = entry.getKey();
+ ret.put(topoId, new HashSet<>(computeExecutors(topoId, entry.getValue())));
}
}
return ret;
@@ -1579,8 +1580,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
private Map<String, SchedulerAssignment> computeNewSchedulerAssignments(Map<String, Assignment> existingAssignments,
- Topologies topologies, String scratchTopologyId) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
- Map<String, Set<List<Integer>>> topoToExec = computeTopologyToExecutors(existingAssignments.keySet());
+ Topologies topologies, Map<String, StormBase> bases, String scratchTopologyId) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
+ Map<String, Set<List<Integer>>> topoToExec = computeTopologyToExecutors(bases);
updateAllHeartbeats(existingAssignments, topoToExec);
@@ -1641,12 +1642,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return cluster.getAssignments();
}
- private TopologyResources getResourcesForTopology(String topoId) throws NotAliveException, AuthorizationException, InvalidTopologyException, IOException {
+ private TopologyResources getResourcesForTopology(String topoId, StormBase base) throws NotAliveException, AuthorizationException, InvalidTopologyException, IOException {
TopologyResources ret = idToResources.get().get(topoId);
if (ret == null) {
try {
IStormClusterState state = stormClusterState;
- TopologyDetails details = readTopologyDetails(topoId);
+ TopologyDetails details = readTopologyDetails(topoId, base);
double sumOnHeap = 0.0;
double sumOffHeap = 0.0;
double sumCPU = 0.0;
@@ -1720,10 +1721,17 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
IStormClusterState state = stormClusterState;
//read all the topologies
- List<String> topologyIds = state.activeStorms();
+ Map<String, StormBase> bases = state.topologyBases();
Map<String, TopologyDetails> tds = new HashMap<>();
- for (String id: topologyIds) {
- tds.put(id, readTopologyDetails(id));
+ for (Iterator<Entry<String, StormBase>> it = bases.entrySet().iterator(); it.hasNext(); ) {
+ Entry<String, StormBase> entry = it.next();
+ String id = entry.getKey();
+ try {
+ tds.put(id, readTopologyDetails(id, entry.getValue()));
+ } catch (KeyNotFoundException e) {
+ //A race happened and it is probably not running
+ it.remove();
+ }
}
Topologies topologies = new Topologies(tds);
List<String> assignedTopologyIds = state.assignments(null);
@@ -1737,7 +1745,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
}
// make the new assignments for topologies
- Map<String, SchedulerAssignment> newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, scratchTopoId);
+ Map<String, SchedulerAssignment> newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId);
Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort = computeNewTopoToExecToNodePort(newSchedulerAssignments, existingAssignments);
for (String id: assignedTopologyIds) {
if (!topologyToExecutorToNodePort.containsKey(id)) {
@@ -1754,8 +1762,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
Map<List<Long>, List<Object>> execToNodePort = entry.getValue();
Assignment existingAssignment = existingAssignments.get(topoId);
Set<String> allNodes = new HashSet<>();
- for (List<Object> nodePort: execToNodePort.values()) {
- allNodes.add((String) nodePort.get(0));
+ if (execToNodePort != null) {
+ for (List<Object> nodePort: execToNodePort.values()) {
+ allNodes.add((String) nodePort.get(0));
+ }
}
Map<String, String> allNodeHost = new HashMap<>();
if (existingAssignment != null) {
@@ -2247,7 +2257,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (status != null) {
summary.set_sched_status(status);
}
- TopologyResources resources = getResourcesForTopology(topoId);
+ TopologyResources resources = getResourcesForTopology(topoId, base);
if (resources != null) {
summary.set_requested_memonheap(resources.getRequestedMemOnHeap());
summary.set_requested_memoffheap(resources.getRequestedMemOffHeap());
@@ -3271,6 +3281,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
getTopologyInfoWithOptsCalls.mark();
CommonTopoInfo common = getCommonTopoInfo(topoId, "getTopologyInfo");
+ if (common.base == null) {
+ throw new NotAliveException(topoId);
+ }
IStormClusterState state = stormClusterState;
NumErrorsChoice numErrChoice = OR(options.get_num_err_choice(), NumErrorsChoice.ALL);
Map<String, List<ErrorInfo>> errors = new HashMap<>();
@@ -3332,7 +3345,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (schedStatus != null) {
topoInfo.set_sched_status(schedStatus);
}
- TopologyResources resources = getResourcesForTopology(topoId);
+ TopologyResources resources = getResourcesForTopology(topoId, common.base);
if (resources != null) {
topoInfo.set_requested_memonheap(resources.getRequestedMemOnHeap());
topoInfo.set_requested_memoffheap(resources.getRequestedMemOffHeap());
@@ -3370,27 +3383,32 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
StormTopology topology = common.topology;
Map<String, Object> topoConf = common.topoConf;
StormBase base = common.base;
- if (assignment == null) {
+ if (base == null) {
throw new NotAliveException(topoId);
}
- Map<List<Long>, NodeInfo> execToNodeInfo = assignment.get_executor_node_port();
- Map<String, String> nodeToHost = assignment.get_node_host();
Map<WorkerSlot, WorkerResources> workerToResources = getWorkerResourcesForTopology(topoId);
+ List<WorkerSummary> workerSummaries = null;
Map<List<Long>, List<Object>> exec2NodePort = new HashMap<>();
- for (Entry<List<Long>, NodeInfo> entry: execToNodeInfo.entrySet()) {
- NodeInfo ni = entry.getValue();
- List<Object> nodePort = Arrays.asList(ni.get_node(), ni.get_port_iterator().next());
- exec2NodePort.put(entry.getKey(), nodePort);
+ if (assignment != null) {
+ Map<List<Long>, NodeInfo> execToNodeInfo = assignment.get_executor_node_port();
+ Map<String, String> nodeToHost = assignment.get_node_host();
+ for (Entry<List<Long>, NodeInfo> entry: execToNodeInfo.entrySet()) {
+ NodeInfo ni = entry.getValue();
+ List<Object> nodePort = Arrays.asList(ni.get_node(), ni.get_port_iterator().next());
+ exec2NodePort.put(entry.getKey(), nodePort);
+ }
+
+ workerSummaries = StatsUtil.aggWorkerStats(topoId,
+ topoName,
+ taskToComp,
+ beats,
+ exec2NodePort,
+ nodeToHost,
+ workerToResources,
+ includeSys,
+ true); //this is the topology page, so we know the user is authorized
}
- List<WorkerSummary> workerSummaries = StatsUtil.aggWorkerStats(topoId,
- topoName,
- taskToComp,
- beats,
- exec2NodePort,
- nodeToHost,
- workerToResources,
- includeSys,
- true); //this is the topology page, so we know the user is authorized
+
TopologyPageInfo topoPageInfo = StatsUtil.aggTopoExecsStats(topoId,
exec2NodePort,
taskToComp,
@@ -3412,7 +3430,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
commonStats.set_resources_map(setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf));
}
- topoPageInfo.set_workers(workerSummaries);
+ if (workerSummaries != null) {
+ topoPageInfo.set_workers(workerSummaries);
+ }
if (base.is_set_owner()) {
topoPageInfo.set_owner(base.get_owner());
}
@@ -3420,7 +3440,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (schedStatus != null) {
topoPageInfo.set_sched_status(schedStatus);
}
- TopologyResources resources = getResourcesForTopology(topoId);
+ TopologyResources resources = getResourcesForTopology(topoId, base);
if (resources != null) {
topoPageInfo.set_requested_memonheap(resources.getRequestedMemOnHeap());
topoPageInfo.set_requested_memoffheap(resources.getRequestedMemOffHeap());
@@ -3525,6 +3545,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
getComponentPageInfoCalls.mark();
CommonTopoInfo info = getCommonTopoInfo(topoId, "getComponentPageInfo");
+ if (info.base == null) {
+ throw new NotAliveException(topoId);
+ }
StormTopology topology = info.topology;
Map<String, Object> topoConf = info.topoConf;
Assignment assignment = info.assignment;
@@ -3557,7 +3580,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
compPageInfo.set_topology_name(info.topoName);
compPageInfo.set_errors(stormClusterState.errors(topoId, componentId));
compPageInfo.set_topology_status(extractStatusStr(info.base));
- if (info.base != null && info.base.is_set_component_debug()) {
+ if (info.base.is_set_component_debug()) {
DebugOptions debug = info.base.get_component_debug().get(componentId);
if (debug != null) {
compPageInfo.set_debug_options(debug);
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
index 2dbefcb..6297dda 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.Map;
import org.apache.storm.ProcessSimulator;
-import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.daemon.worker.Worker;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileRequest;
@@ -30,9 +29,6 @@ import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import clojure.java.api.Clojure;
-import clojure.lang.IFn;
-
public class LocalContainer extends Container {
private static final Logger LOG = LoggerFactory.getLogger(LocalContainer.class);
private volatile boolean _isAlive = false;
http://git-wip-us.apache.org/repos/asf/storm/blob/4efcc996/storm-core/src/jvm/org/apache/storm/testing/CompletableSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/CompletableSpout.java b/storm-core/src/jvm/org/apache/storm/testing/CompletableSpout.java
new file mode 100644
index 0000000..e9a61f5
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/testing/CompletableSpout.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.testing;
+
+public interface CompletableSpout {
+ /**
+ * @return true if all the tuples have been completed else false.
+ */
+ public boolean isExhausted();
+
+ /**
+ * Cleanup any global state kept
+ */
+ default public void clean() {
+ //NOOP
+ }
+
+ /**
+ * Prepare the spout (globally) before starting the topology
+ */
+ default public void startup() {
+ //NOOP
+ }
+}