You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:28 UTC
[39/52] [partial] storm git commit: STORM-2441 Break down
'storm-core' to extract client (worker) artifacts
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
new file mode 100644
index 0000000..5abb04a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.Config;
+import org.apache.storm.ILocalDRPC;
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.generated.DistributedRPCInvocations;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ExtendedThreadPoolExecutor;
+import org.apache.storm.utils.ServiceRegistry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.thrift.TException;
+import org.json.simple.JSONValue;
+
+public class DRPCSpout extends BaseRichSpout {
+ //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+ static final long serialVersionUID = 2387848310969237877L;
+
+ public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
+
+ SpoutOutputCollector _collector;
+ List<DRPCInvocationsClient> _clients = new ArrayList<>();
+ transient LinkedList<Future<Void>> _futures = null;
+ transient ExecutorService _backround = null;
+ String _function;
+ String _local_drpc_id = null;
+
+ private static class DRPCMessageId {
+ String id;
+ int index;
+
+ public DRPCMessageId(String id, int index) {
+ this.id = id;
+ this.index = index;
+ }
+ }
+
+
+ public DRPCSpout(String function) {
+ _function = function;
+ }
+
+ public DRPCSpout(String function, ILocalDRPC drpc) {
+ _function = function;
+ _local_drpc_id = drpc.getServiceId();
+ }
+
+ public String get_function() {
+ return _function;
+ }
+
+ private class Adder implements Callable<Void> {
+ private String server;
+ private int port;
+ private Map conf;
+
+ public Adder(String server, int port, Map conf) {
+ this.server = server;
+ this.port = port;
+ this.conf = conf;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port);
+ synchronized (_clients) {
+ _clients.add(c);
+ }
+ return null;
+ }
+ }
+
+ private void reconnectAsync(final DRPCInvocationsClient client) {
+ _futures.add(_backround.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ client.reconnectClient();
+ return null;
+ }
+ }));
+ }
+
+ private void reconnectSync(DRPCInvocationsClient client) {
+ try {
+ LOG.info("reconnecting... ");
+ client.reconnectClient(); //Blocking call
+ } catch (TException e2) {
+ LOG.error("Failed to connect to DRPC server", e2);
+ }
+ }
+
+ private void checkFutures() {
+ Iterator<Future<Void>> i = _futures.iterator();
+ while (i.hasNext()) {
+ Future<Void> f = i.next();
+ if (f.isDone()) {
+ i.remove();
+ }
+ try {
+ f.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ _collector = collector;
+ if(_local_drpc_id==null) {
+ _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>());
+ _futures = new LinkedList<>();
+
+ int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
+ int index = context.getThisTaskIndex();
+
+ int port = ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
+ List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
+ if(servers == null || servers.isEmpty()) {
+ throw new RuntimeException("No DRPC servers configured for topology");
+ }
+
+ if (numTasks < servers.size()) {
+ for (String s: servers) {
+ _futures.add(_backround.submit(new Adder(s, port, conf)));
+ }
+ } else {
+ int i = index % servers.size();
+ _futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
+ }
+ }
+
+ }
+
+ @Override
+ public void close() {
+ for(DRPCInvocationsClient client: _clients) {
+ client.close();
+ }
+ }
+
+ @Override
+ public void nextTuple() {
+ boolean gotRequest = false;
+ if(_local_drpc_id==null) {
+ int size = 0;
+ synchronized (_clients) {
+ size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end
+ }
+ for(int i=0; i<size; i++) {
+ DRPCInvocationsClient client;
+ synchronized (_clients) {
+ client = _clients.get(i);
+ }
+ if (!client.isConnected()) {
+ LOG.warn("DRPCInvocationsClient [{}:{}] is not connected.", client.getHost(), client.getPort());
+ reconnectAsync(client);
+ continue;
+ }
+ try {
+ DRPCRequest req = client.fetchRequest(_function);
+ if(req.get_request_id().length() > 0) {
+ Map returnInfo = new HashMap();
+ returnInfo.put("id", req.get_request_id());
+ returnInfo.put("host", client.getHost());
+ returnInfo.put("port", client.getPort());
+ gotRequest = true;
+ _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i));
+ break;
+ }
+ } catch (AuthorizationException aze) {
+ reconnectAsync(client);
+ LOG.error("Not authorized to fetch DRPC result from DRPC server", aze);
+ } catch (TException e) {
+ reconnectAsync(client);
+ LOG.error("Failed to fetch DRPC result from DRPC server", e);
+ } catch (Exception e) {
+ LOG.error("Failed to fetch DRPC result from DRPC server", e);
+ }
+ }
+ checkFutures();
+ } else {
+ DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
+ if(drpc!=null) { // can happen during shutdown of drpc while topology is still up
+ try {
+ DRPCRequest req = drpc.fetchRequest(_function);
+ if(req.get_request_id().length() > 0) {
+ Map returnInfo = new HashMap();
+ returnInfo.put("id", req.get_request_id());
+ returnInfo.put("host", _local_drpc_id);
+ returnInfo.put("port", 0);
+ gotRequest = true;
+ _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)),
+ new DRPCMessageId(req.get_request_id(), 0));
+ }
+ } catch (AuthorizationException aze) {
+ throw new RuntimeException(aze);
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ if(!gotRequest) {
+ Utils.sleep(1);
+ }
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ DRPCMessageId did = (DRPCMessageId) msgId;
+ DistributedRPCInvocations.Iface client;
+
+ if (_local_drpc_id == null) {
+ client = _clients.get(did.index);
+ } else {
+ client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
+ }
+
+ int retryCnt = 0;
+ int maxRetries = 3;
+
+ while (retryCnt < maxRetries) {
+ retryCnt++;
+ try {
+ client.failRequest(did.id);
+ break;
+ } catch (AuthorizationException aze) {
+ LOG.error("Not authorized to failRequest from DRPC server", aze);
+ throw new RuntimeException(aze);
+ } catch (TException tex) {
+ if (retryCnt >= maxRetries) {
+ LOG.error("Failed to fail request", tex);
+ break;
+ }
+ reconnectSync((DRPCInvocationsClient)client);
+ }
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("args", "return-info"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java b/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
new file mode 100644
index 0000000..f57bbb1
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JoinResult extends BaseRichBolt {
+ public static final Logger LOG = LoggerFactory.getLogger(JoinResult.class);
+
+ String returnComponent;
+ Map<Object, Tuple> returns = new HashMap<>();
+ Map<Object, Tuple> results = new HashMap<>();
+ OutputCollector _collector;
+
+ public JoinResult(String returnComponent) {
+ this.returnComponent = returnComponent;
+ }
+
+ public void prepare(Map map, TopologyContext context, OutputCollector collector) {
+ _collector = collector;
+ }
+
+ public void execute(Tuple tuple) {
+ Object requestId = tuple.getValue(0);
+ if(tuple.getSourceComponent().equals(returnComponent)) {
+ returns.put(requestId, tuple);
+ } else {
+ results.put(requestId, tuple);
+ }
+
+ if(returns.containsKey(requestId) && results.containsKey(requestId)) {
+ Tuple result = results.remove(requestId);
+ Tuple returner = returns.remove(requestId);
+ LOG.debug(result.getValue(1).toString());
+ List<Tuple> anchors = new ArrayList<>();
+ anchors.add(result);
+ anchors.add(returner);
+ _collector.emit(anchors, new Values(""+result.getValue(1), returner.getValue(1)));
+ _collector.ack(result);
+ _collector.ack(returner);
+ }
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("result", "return-info"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/KeyedFairBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/KeyedFairBolt.java b/storm-client/src/jvm/org/apache/storm/drpc/KeyedFairBolt.java
new file mode 100644
index 0000000..39860e6
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/KeyedFairBolt.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicBoltExecutor;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.KeyedRoundRobinQueue;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class KeyedFairBolt implements IRichBolt, FinishedCallback {
+ IRichBolt _delegate;
+ KeyedRoundRobinQueue<Tuple> _rrQueue;
+ Thread _executor;
+ FinishedCallback _callback;
+
+ public KeyedFairBolt(IRichBolt delegate) {
+ _delegate = delegate;
+ }
+
+ public KeyedFairBolt(IBasicBolt delegate) {
+ this(new BasicBoltExecutor(delegate));
+ }
+
+
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ if(_delegate instanceof FinishedCallback) {
+ _callback = (FinishedCallback) _delegate;
+ }
+ _delegate.prepare(stormConf, context, collector);
+ _rrQueue = new KeyedRoundRobinQueue<Tuple>();
+ _executor = new Thread(new Runnable() {
+ public void run() {
+ try {
+ while(true) {
+ _delegate.execute(_rrQueue.take());
+ }
+ } catch (InterruptedException e) {
+
+ }
+ }
+ });
+ _executor.setDaemon(true);
+ _executor.start();
+ }
+
+ public void execute(Tuple input) {
+ Object key = input.getValue(0);
+ _rrQueue.add(key, input);
+ }
+
+ public void cleanup() {
+ _executor.interrupt();
+ _delegate.cleanup();
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ _delegate.declareOutputFields(declarer);
+ }
+
+ public void finishedId(Object id) {
+ if(_callback!=null) {
+ _callback.finishedId(id);
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return new HashMap<String, Object>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCInputDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCInputDeclarer.java b/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCInputDeclarer.java
new file mode 100644
index 0000000..6f82f80
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCInputDeclarer.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.topology.ComponentConfigurationDeclarer;
+import org.apache.storm.tuple.Fields;
+
+public interface LinearDRPCInputDeclarer extends ComponentConfigurationDeclarer<LinearDRPCInputDeclarer> {
+ public LinearDRPCInputDeclarer fieldsGrouping(Fields fields);
+ public LinearDRPCInputDeclarer fieldsGrouping(String streamId, Fields fields);
+
+ public LinearDRPCInputDeclarer globalGrouping();
+ public LinearDRPCInputDeclarer globalGrouping(String streamId);
+
+ public LinearDRPCInputDeclarer shuffleGrouping();
+ public LinearDRPCInputDeclarer shuffleGrouping(String streamId);
+
+ public LinearDRPCInputDeclarer localOrShuffleGrouping();
+ public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId);
+
+ public LinearDRPCInputDeclarer noneGrouping();
+ public LinearDRPCInputDeclarer noneGrouping(String streamId);
+
+ public LinearDRPCInputDeclarer allGrouping();
+ public LinearDRPCInputDeclarer allGrouping(String streamId);
+
+ public LinearDRPCInputDeclarer directGrouping();
+ public LinearDRPCInputDeclarer directGrouping(String streamId);
+
+ public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields);
+ public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields);
+
+ public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping grouping);
+ public LinearDRPCInputDeclarer customGrouping(String streamId, CustomStreamGrouping grouping);
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java
new file mode 100644
index 0000000..dc702a3
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.Constants;
+import org.apache.storm.ILocalDRPC;
+import org.apache.storm.coordination.BatchBoltExecutor;
+import org.apache.storm.coordination.CoordinatedBolt;
+import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
+import org.apache.storm.coordination.CoordinatedBolt.IdStreamSpec;
+import org.apache.storm.coordination.CoordinatedBolt.SourceArgs;
+import org.apache.storm.coordination.IBatchBolt;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.grouping.PartialKeyGrouping;
+import org.apache.storm.topology.BaseConfigurationDeclarer;
+import org.apache.storm.topology.BasicBoltExecutor;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.InputDeclarer;
+import org.apache.storm.topology.OutputFieldsGetter;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+// Trident subsumes the functionality provided by this class, so it's deprecated
+@Deprecated
+public class LinearDRPCTopologyBuilder {
+ String _function;
+ List<Component> _components = new ArrayList<Component>();
+
+
+ public LinearDRPCTopologyBuilder(String function) {
+ _function = function;
+ }
+
+ public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) {
+ return addBolt(new BatchBoltExecutor(bolt), parallelism);
+ }
+
+ public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) {
+ return addBolt(bolt, 1);
+ }
+
+ @Deprecated
+ public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) {
+ if(parallelism==null) parallelism = 1;
+ Component component = new Component(bolt, parallelism.intValue());
+ _components.add(component);
+ return new InputDeclarerImpl(component);
+ }
+
+ @Deprecated
+ public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) {
+ return addBolt(bolt, null);
+ }
+
+ public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) {
+ return addBolt(new BasicBoltExecutor(bolt), parallelism);
+ }
+
+ public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) {
+ return addBolt(bolt, null);
+ }
+
+ public StormTopology createLocalTopology(ILocalDRPC drpc) {
+ return createTopology(new DRPCSpout(_function, drpc));
+ }
+
+ public StormTopology createRemoteTopology() {
+ return createTopology(new DRPCSpout(_function));
+ }
+
+
+ private StormTopology createTopology(DRPCSpout spout) {
+ final String SPOUT_ID = "spout";
+ final String PREPARE_ID = "prepare-request";
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(SPOUT_ID, spout);
+ builder.setBolt(PREPARE_ID, new PrepareRequest())
+ .noneGrouping(SPOUT_ID);
+ int i=0;
+ for(; i<_components.size();i++) {
+ Component component = _components.get(i);
+
+ Map<String, SourceArgs> source = new HashMap<String, SourceArgs>();
+ if (i==1) {
+ source.put(boltId(i-1), SourceArgs.single());
+ } else if (i>=2) {
+ source.put(boltId(i-1), SourceArgs.all());
+ }
+ IdStreamSpec idSpec = null;
+ if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) {
+ idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM);
+ }
+ BoltDeclarer declarer = builder.setBolt(
+ boltId(i),
+ new CoordinatedBolt(component.bolt, source, idSpec),
+ component.parallelism);
+
+ for(Map<String, Object> conf: component.componentConfs) {
+ declarer.addConfigurations(conf);
+ }
+
+ if(idSpec!=null) {
+ declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request"));
+ }
+ if(i==0 && component.declarations.isEmpty()) {
+ declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
+ } else {
+ String prevId;
+ if(i==0) {
+ prevId = PREPARE_ID;
+ } else {
+ prevId = boltId(i-1);
+ }
+ for(InputDeclaration declaration: component.declarations) {
+ declaration.declare(prevId, declarer);
+ }
+ }
+ if(i>0) {
+ declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID);
+ }
+ }
+
+ IRichBolt lastBolt = _components.get(_components.size()-1).bolt;
+ OutputFieldsGetter getter = new OutputFieldsGetter();
+ lastBolt.declareOutputFields(getter);
+ Map<String, StreamInfo> streams = getter.getFieldsDeclaration();
+ if(streams.size()!=1) {
+ throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");
+ }
+ String outputStream = streams.keySet().iterator().next();
+ List<String> fields = streams.get(outputStream).get_output_fields();
+ if(fields.size()!=2) {
+ throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");
+ }
+
+ builder.setBolt(boltId(i), new JoinResult(PREPARE_ID))
+ .fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0)))
+ .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));
+ i++;
+ builder.setBolt(boltId(i), new ReturnResults())
+ .noneGrouping(boltId(i-1));
+ return builder.createTopology();
+ }
+
+ private static String boltId(int index) {
+ return "bolt" + index;
+ }
+
+ private static class Component {
+ public IRichBolt bolt;
+ public int parallelism;
+ public List<Map<String, Object>> componentConfs = new ArrayList<>();
+ public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
+
+ public Component(IRichBolt bolt, int parallelism) {
+ this.bolt = bolt;
+ this.parallelism = parallelism;
+ }
+ }
+
+ private static interface InputDeclaration {
+ public void declare(String prevComponent, InputDeclarer declarer);
+ }
+
+ private static class InputDeclarerImpl extends BaseConfigurationDeclarer<LinearDRPCInputDeclarer> implements LinearDRPCInputDeclarer {
+ Component _component;
+
+ public InputDeclarerImpl(Component component) {
+ _component = component;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer fieldsGrouping(final Fields fields) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.fieldsGrouping(prevComponent, fields);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer fieldsGrouping(final String streamId, final Fields fields) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.fieldsGrouping(prevComponent, streamId, fields);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer globalGrouping() {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.globalGrouping(prevComponent);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer globalGrouping(final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.globalGrouping(prevComponent, streamId);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer shuffleGrouping() {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.shuffleGrouping(prevComponent);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer shuffleGrouping(final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.shuffleGrouping(prevComponent, streamId);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer localOrShuffleGrouping() {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.localOrShuffleGrouping(prevComponent);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer localOrShuffleGrouping(final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.localOrShuffleGrouping(prevComponent, streamId);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer noneGrouping() {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.noneGrouping(prevComponent);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer noneGrouping(final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.noneGrouping(prevComponent, streamId);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer allGrouping() {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.allGrouping(prevComponent);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer allGrouping(final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.allGrouping(prevComponent, streamId);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer directGrouping() {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.directGrouping(prevComponent);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer directGrouping(final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.directGrouping(prevComponent, streamId);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields) {
+ return customGrouping(new PartialKeyGrouping(fields));
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields) {
+ return customGrouping(streamId, new PartialKeyGrouping(fields));
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer customGrouping(final CustomStreamGrouping grouping) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.customGrouping(prevComponent, grouping);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer customGrouping(final String streamId, final CustomStreamGrouping grouping) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(String prevComponent, InputDeclarer declarer) {
+ declarer.customGrouping(prevComponent, streamId, grouping);
+ }
+ });
+ return this;
+ }
+
+ private void addDeclaration(InputDeclaration declaration) {
+ _component.declarations.add(declaration);
+ }
+
+ @Override
+ public LinearDRPCInputDeclarer addConfigurations(Map<String, Object> conf) {
+ _component.componentConfs.add(conf);
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java b/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
new file mode 100644
index 0000000..06e576c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.storm.utils.Utils;
+
+
+public class PrepareRequest extends BaseBasicBolt {
+ public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID;
+ public static final String RETURN_STREAM = "ret";
+ public static final String ID_STREAM = "id";
+
+ Random rand;
+
+ @Override
+ public void prepare(Map map, TopologyContext context) {
+ rand = new Random();
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String args = tuple.getString(0);
+ String returnInfo = tuple.getString(1);
+ long requestId = rand.nextLong();
+ collector.emit(ARGS_STREAM, new Values(requestId, args));
+ collector.emit(RETURN_STREAM, new Values(requestId, returnInfo));
+ collector.emit(ID_STREAM, new Values(requestId));
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream(ARGS_STREAM, new Fields("request", "args"));
+ declarer.declareStream(RETURN_STREAM, new Fields("request", "return"));
+ declarer.declareStream(ID_STREAM, new Fields("request"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java b/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
new file mode 100644
index 0000000..8f03356
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.DistributedRPCInvocations;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServiceRegistry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
+
+
+public class ReturnResults extends BaseRichBolt {
+ //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+ static final long serialVersionUID = -774882142710631591L;
+
+ public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);
+ OutputCollector _collector;
+ boolean local;
+ Map _conf;
+ Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ _conf = stormConf;
+ _collector = collector;
+ local = stormConf.get(Config.STORM_CLUSTER_MODE).equals("local");
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ String result = (String) input.getValue(0);
+ String returnInfo = (String) input.getValue(1);
+ if (returnInfo!=null) {
+ Map retMap = null;
+ try {
+ retMap = (Map) JSONValue.parseWithException(returnInfo);
+ } catch (ParseException e) {
+ LOG.error("Parseing returnInfo failed", e);
+ _collector.fail(input);
+ return;
+ }
+ final String host = (String) retMap.get("host");
+ final int port = ObjectReader.getInt(retMap.get("port"));
+ String id = (String) retMap.get("id");
+ DistributedRPCInvocations.Iface client;
+ if (local) {
+ client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
+ } else {
+ List server = new ArrayList() {{
+ add(host);
+ add(port);
+ }};
+
+ if(!_clients.containsKey(server)) {
+ try {
+ _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
+ } catch (TTransportException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ client = _clients.get(server);
+ }
+
+
+ int retryCnt = 0;
+ int maxRetries = 3;
+ while (retryCnt < maxRetries) {
+ retryCnt++;
+ try {
+ client.result(id, result);
+ _collector.ack(input);
+ break;
+ } catch (AuthorizationException aze) {
+ LOG.error("Not authorized to return results to DRPC server", aze);
+ _collector.fail(input);
+ throw new RuntimeException(aze);
+ } catch (TException tex) {
+ if (retryCnt >= maxRetries) {
+ LOG.error("Failed to return results to DRPC server", tex);
+ _collector.fail(input);
+ }
+ reconnectClient((DRPCInvocationsClient) client);
+ }
+ }
+ }
+ }
+
+ private void reconnectClient(DRPCInvocationsClient client) {
+ if (client instanceof DRPCInvocationsClient) {
+ try {
+ LOG.info("reconnecting... ");
+ client.reconnectClient(); //Blocking call
+ } catch (TException e2) {
+ LOG.error("Failed to connect to DRPC server", e2);
+ }
+ }
+ }
+ @Override
+ public void cleanup() {
+ for(DRPCInvocationsClient c: _clients.values()) {
+ c.close();
+ }
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
new file mode 100644
index 0000000..d89ac69
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler<Object> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Executor.class);
+
+ protected final WorkerState workerData;
+ protected final WorkerTopologyContext workerTopologyContext;
+ protected final List<Long> executorId;
+ protected final List<Integer> taskIds;
+ protected final String componentId;
+ protected final AtomicBoolean openOrPrepareWasCalled;
+ protected final Map stormConf;
+ protected final Map conf;
+ protected final String stormId;
+ protected final HashMap sharedExecutorData;
+ protected final AtomicBoolean stormActive;
+ protected final AtomicReference<Map<String, DebugOptions>> stormComponentDebug;
+ protected final Runnable suicideFn;
+ protected final IStormClusterState stormClusterState;
+ protected final Map<Integer, String> taskToComponent;
+ protected CommonStats stats;
+ protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;
+ protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamToComponentToGrouper;
+ protected final ReportErrorAndDie reportErrorDie;
+ protected final Callable<Boolean> sampler;
+ protected ExecutorTransfer executorTransfer;
+ protected final String type;
+ protected final AtomicBoolean throttleOn;
+
+ protected final IReportError reportError;
+ protected final Random rand;
+ protected final DisruptorQueue transferQueue;
+ protected final DisruptorQueue receiveQueue;
+ protected Map<Integer, Task> idToTask;
+ protected final Map<String, String> credentials;
+ protected final Boolean isDebug;
+ protected final Boolean hasEventLoggers;
+ protected String hostname;
+
+ protected Executor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
+ this.workerData = workerData;
+ this.executorId = executorId;
+ this.workerTopologyContext = workerData.getWorkerTopologyContext();
+ this.taskIds = StormCommon.executorIdToTasks(executorId);
+ this.componentId = workerTopologyContext.getComponentId(taskIds.get(0));
+ this.openOrPrepareWasCalled = new AtomicBoolean(false);
+ this.stormConf = normalizedComponentConf(workerData.getTopologyConf(), workerTopologyContext, componentId);
+ this.receiveQueue = (workerData.getExecutorReceiveQueueMap().get(executorId));
+ this.stormId = workerData.getTopologyId();
+ this.conf = workerData.getConf();
+ this.sharedExecutorData = new HashMap();
+ this.stormActive = workerData.getIsTopologyActive();
+ this.stormComponentDebug = workerData.getStormComponentToDebug();
+
+ this.transferQueue = mkExecutorBatchQueue(stormConf, executorId);
+ this.executorTransfer = new ExecutorTransfer(workerData, transferQueue, stormConf);
+
+ this.suicideFn = workerData.getSuicideCallback();
+ try {
+ this.stormClusterState = ClusterUtils.mkStormClusterState(workerData.getStateStorage(), Utils.getWorkerACL(stormConf),
+ new ClusterStateContext(DaemonType.WORKER));
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+
+ StormTopology topology = workerTopologyContext.getRawTopology();
+ Map<String, SpoutSpec> spouts = topology.get_spouts();
+ Map<String, Bolt> bolts = topology.get_bolts();
+ if (spouts.containsKey(componentId)) {
+ this.type = StatsUtil.SPOUT;
+ this.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(stormConf),ObjectReader.getInt(stormConf.get(Config.NUM_STAT_BUCKETS)));
+ } else if (bolts.containsKey(componentId)) {
+ this.type = StatsUtil.BOLT;
+ this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(stormConf),ObjectReader.getInt(stormConf.get(Config.NUM_STAT_BUCKETS)));
+ } else {
+ throw new RuntimeException("Could not find " + componentId + " in " + topology);
+ }
+
+ this.intervalToTaskToMetricToRegistry = new HashMap<>();
+ this.taskToComponent = workerData.getTaskToComponent();
+ this.streamToComponentToGrouper = outboundComponents(workerTopologyContext, componentId, stormConf);
+ this.reportError = new ReportError(stormConf, stormClusterState, stormId, componentId, workerTopologyContext);
+ this.reportErrorDie = new ReportErrorAndDie(reportError, suicideFn);
+ this.sampler = ConfigUtils.mkStatsSampler(stormConf);
+ this.throttleOn = workerData.getThrottleOn();
+ this.isDebug = ObjectReader.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
+ this.rand = new Random(Utils.secureRandomLong());
+ this.credentials = credentials;
+ this.hasEventLoggers = StormCommon.hasEventLoggers(stormConf);
+
+ try {
+ this.hostname = Utils.hostname();
+ } catch (UnknownHostException ignored) {
+ this.hostname = "";
+ }
+ }
+
+ public static Executor mkExecutor(WorkerState workerState, List<Long> executorId, Map<String, String> credentials) {
+ Executor executor;
+
+ WorkerTopologyContext workerTopologyContext = workerState.getWorkerTopologyContext();
+ List<Integer> taskIds = StormCommon.executorIdToTasks(executorId);
+ String componentId = workerTopologyContext.getComponentId(taskIds.get(0));
+
+ String type = getExecutorType(workerTopologyContext, componentId);
+ if (StatsUtil.SPOUT.equals(type)) {
+ executor = new SpoutExecutor(workerState, executorId, credentials);
+ executor.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS)));
+ } else {
+ executor = new BoltExecutor(workerState, executorId, credentials);
+ executor.stats = new BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS)));
+ }
+
+ Map<Integer, Task> idToTask = new HashMap<>();
+ for (Integer taskId : taskIds) {
+ try {
+ Task task = new Task(executor, taskId);
+ executor.sendUnanchored(
+ task, StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer());
+ idToTask.put(taskId, task);
+ } catch (IOException ex) {
+ throw Utils.wrapInRuntime(ex);
+ }
+ }
+
+ executor.idToTask = idToTask;
+ return executor;
+ }
+
+ private static String getExecutorType(WorkerTopologyContext workerTopologyContext, String componentId) {
+ StormTopology topology = workerTopologyContext.getRawTopology();
+ Map<String, SpoutSpec> spouts = topology.get_spouts();
+ Map<String, Bolt> bolts = topology.get_bolts();
+ if (spouts.containsKey(componentId)) {
+ return StatsUtil.SPOUT;
+ } else if (bolts.containsKey(componentId)) {
+ return StatsUtil.BOLT;
+ } else {
+ throw new RuntimeException("Could not find " + componentId + " in " + topology);
+ }
+ }
+
+ /**
+ * separated from mkExecutor in order to replace executor transfer in executor data for testing
+ */
+ public ExecutorShutdown execute() throws Exception {
+ LOG.info("Loading executor tasks " + componentId + ":" + executorId);
+
+ registerBackpressure();
+ Utils.SmartThread systemThreads =
+ Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie);
+
+ String handlerName = componentId + "-executor" + executorId;
+ Utils.SmartThread handlers =
+ Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);
+ setupTicks(StatsUtil.SPOUT.equals(type));
+
+ LOG.info("Finished loading executor " + componentId + ":" + executorId);
+ return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
+ }
+
+ public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
+ ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event;
+ for (AddressedTuple addressedTuple : addressedTuples) {
+ TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+ int taskId = addressedTuple.getDest();
+ if (isDebug) {
+ LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
+ }
+ if (taskId != AddressedTuple.BROADCAST_DEST) {
+ tupleActionFn(taskId, tuple);
+ } else {
+ for (Integer t : taskIds) {
+ tupleActionFn(t, tuple);
+ }
+ }
+ }
+ }
+
+ public void metricsTick(Task taskData, TupleImpl tuple) {
+ try {
+ Integer interval = tuple.getInteger(0);
+ int taskId = taskData.getTaskId();
+ Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = intervalToTaskToMetricToRegistry.get(interval);
+ Map<String, IMetric> nameToRegistry = null;
+ if (taskToMetricToRegistry != null) {
+ nameToRegistry = taskToMetricToRegistry.get(taskId);
+ }
+ if (nameToRegistry != null) {
+ IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
+ hostname, workerTopologyContext.getThisWorkerPort(),
+ componentId, taskId, Time.currentTimeSecs(), interval);
+ List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
+ for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {
+ IMetric metric = entry.getValue();
+ Object value = metric.getValueAndReset();
+ if (value != null) {
+ IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
+ dataPoints.add(dataPoint);
+ }
+ }
+ if (!dataPoints.isEmpty()) {
+ sendUnanchored(taskData, Constants.METRICS_STREAM_ID,
+ new Values(taskInfo, dataPoints), executorTransfer);
+ }
+ }
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ protected void setupMetrics() {
+ for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
+ StormTimer timerTask = workerData.getUserTimer();
+ timerTask.scheduleRecurring(interval, interval, new Runnable() {
+ @Override
+ public void run() {
+ TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(interval),
+ (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
+ List<AddressedTuple> metricsTickTuple =
+ Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+ receiveQueue.publish(metricsTickTuple);
+ }
+ });
+ }
+ }
+
+ public void sendUnanchored(Task task, String stream, List<Object> values, ExecutorTransfer transfer) {
+ Tuple tuple = task.getTuple(stream, values);
+ List<Integer> tasks = task.getOutgoingTasks(stream, values);
+ for (Integer t : tasks) {
+ transfer.transfer(t, tuple);
+ }
+ }
+
+ /**
+ * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
+ */
+ public void sendToEventLogger(Executor executor, Task taskData, List values,
+ String componentId, Object messageId, Random random) {
+ Map<String, DebugOptions> componentDebug = executor.getStormComponentDebug().get();
+ DebugOptions debugOptions = componentDebug.get(componentId);
+ if (debugOptions == null) {
+ debugOptions = componentDebug.get(executor.getStormId());
+ }
+ double spct = ((debugOptions != null) && (debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0;
+ if (spct > 0 && (random.nextDouble() * 100) < spct) {
+ sendUnanchored(taskData, StormCommon.EVENTLOGGER_STREAM_ID,
+ new Values(componentId, messageId, System.currentTimeMillis(), values),
+ executor.getExecutorTransfer());
+ }
+ }
+
+ private void registerBackpressure() {
+ receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback() {
+ @Override
+ public void highWaterMark() throws Exception {
+ LOG.debug("executor " + executorId + " is congested, set backpressure flag true");
+ WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
+ }
+
+ @Override
+ public void lowWaterMark() throws Exception {
+ LOG.debug("executor " + executorId + " is not-congested, set backpressure flag false");
+ WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
+ }
+ });
+ receiveQueue.setHighWaterMark(ObjectReader.getDouble(stormConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
+ receiveQueue.setLowWaterMark(ObjectReader.getDouble(stormConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
+ receiveQueue.setEnableBackpressure(ObjectReader.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false));
+ }
+
+ protected void setupTicks(boolean isSpout) {
+ final Integer tickTimeSecs = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
+ boolean enableMessageTimeout = (Boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
+ if (tickTimeSecs != null) {
+ if (Utils.isSystemId(componentId) || (!enableMessageTimeout && isSpout)) {
+ LOG.info("Timeouts disabled for executor " + componentId + ":" + executorId);
+ } else {
+ StormTimer timerTask = workerData.getUserTimer();
+ timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, new Runnable() {
+ @Override
+ public void run() {
+ TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
+ (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID);
+ List<AddressedTuple> tickTuple =
+ Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+ receiveQueue.publish(tickTuple);
+ }
+ });
+ }
+ }
+ }
+
+
+ private DisruptorQueue mkExecutorBatchQueue(Map stormConf, List<Long> executorId) {
+ int sendSize = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
+ int waitTimeOutMs = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS));
+ int batchSize = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE));
+ int batchTimeOutMs = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
+ return new DisruptorQueue("executor" + executorId + "-send-queue", ProducerType.SINGLE,
+ sendSize, waitTimeOutMs, batchSize, batchTimeOutMs);
+ }
+
+ /**
+ * Returns map of stream id to component id to grouper
+ */
+ private Map<String, Map<String, LoadAwareCustomStreamGrouping>> outboundComponents(
+ WorkerTopologyContext workerTopologyContext, String componentId, Map stormConf) {
+ Map<String, Map<String, LoadAwareCustomStreamGrouping>> ret = new HashMap<>();
+
+ Map<String, Map<String, Grouping>> outputGroupings = workerTopologyContext.getTargets(componentId);
+ for (Map.Entry<String, Map<String, Grouping>> entry : outputGroupings.entrySet()) {
+ String streamId = entry.getKey();
+ Map<String, Grouping> componentGrouping = entry.getValue();
+ Fields outFields = workerTopologyContext.getComponentOutputFields(componentId, streamId);
+ Map<String, LoadAwareCustomStreamGrouping> componentGrouper = new HashMap<String, LoadAwareCustomStreamGrouping>();
+ for (Map.Entry<String, Grouping> cg : componentGrouping.entrySet()) {
+ String component = cg.getKey();
+ Grouping grouping = cg.getValue();
+ List<Integer> outTasks = workerTopologyContext.getComponentTasks(component);
+ LoadAwareCustomStreamGrouping grouper = GrouperFactory.mkGrouper(
+ workerTopologyContext, componentId, streamId, outFields, grouping, outTasks, stormConf);
+ componentGrouper.put(component, grouper);
+ }
+ if (componentGrouper.size() > 0) {
+ ret.put(streamId, componentGrouper);
+ }
+ }
+
+ for (String stream : workerTopologyContext.getComponentCommon(componentId).get_streams().keySet()) {
+ if (!ret.containsKey(stream)) {
+ ret.put(stream, null);
+ }
+ }
+
+ return ret;
+ }
+
+ private Map normalizedComponentConf(Map stormConf, WorkerTopologyContext topologyContext, String componentId) {
+ List<Object> keysToRemove = All_CONFIGS();
+ keysToRemove.remove(Config.TOPOLOGY_DEBUG);
+ keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING);
+ keysToRemove.remove(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
+ keysToRemove.remove(Config.TOPOLOGY_TRANSACTIONAL_ID);
+ keysToRemove.remove(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ keysToRemove.remove(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS);
+ keysToRemove.remove(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME);
+ keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER);
+ keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER_CONFIG);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
+
+ Map<Object, Object> componentConf;
+ String specJsonConf = topologyContext.getComponentCommon(componentId).get_json_conf();
+ if (specJsonConf != null) {
+ try {
+ componentConf = (Map<Object, Object>) JSONValue.parseWithException(specJsonConf);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ for (Object p : keysToRemove) {
+ componentConf.remove(p);
+ }
+ } else {
+ componentConf = new HashMap<>();
+ }
+
+ Map<Object, Object> ret = new HashMap<>();
+ ret.putAll(stormConf);
+ ret.putAll(componentConf);
+
+ return ret;
+ }
+
+ // =============================================================================
+ // ============================ getter methods =================================
+ // =============================================================================
+
+ public List<Long> getExecutorId() {
+ return executorId;
+ }
+
+ public List<Integer> getTaskIds() {
+ return taskIds;
+ }
+
+ public String getComponentId() {
+ return componentId;
+ }
+
+ public AtomicBoolean getOpenOrPrepareWasCalled() {
+ return openOrPrepareWasCalled;
+ }
+
+ public Map getStormConf() {
+ return stormConf;
+ }
+
+ public String getStormId() {
+ return stormId;
+ }
+
+ public CommonStats getStats() {
+ return stats;
+ }
+
+ public AtomicBoolean getThrottleOn() {
+ return throttleOn;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public Boolean getIsDebug() {
+ return isDebug;
+ }
+
+ public ExecutorTransfer getExecutorTransfer() {
+ return executorTransfer;
+ }
+
+ public IReportError getReportError() {
+ return reportError;
+ }
+
+ public WorkerTopologyContext getWorkerTopologyContext() {
+ return workerTopologyContext;
+ }
+
+ public Callable<Boolean> getSampler() {
+ return sampler;
+ }
+
+ public AtomicReference<Map<String, DebugOptions>> getStormComponentDebug() {
+ return stormComponentDebug;
+ }
+
+ public DisruptorQueue getReceiveQueue() {
+ return receiveQueue;
+ }
+
+ public boolean getBackpressure() {
+ return receiveQueue.getThrottleOn();
+ }
+
+ public DisruptorQueue getTransferWorkerQueue() {
+ return transferQueue;
+ }
+
+ public IStormClusterState getStormClusterState() {
+ return stormClusterState;
+ }
+
+ public WorkerState getWorkerData() {
+ return workerData;
+ }
+
+ public Map<String, Map<String, LoadAwareCustomStreamGrouping>> getStreamToComponentToGrouper() {
+ return streamToComponentToGrouper;
+ }
+
+ public HashMap getSharedExecutorData() {
+ return sharedExecutorData;
+ }
+
+ public Map<Integer, Map<Integer, Map<String, IMetric>>> getIntervalToTaskToMetricToRegistry() {
+ return intervalToTaskToMetricToRegistry;
+ }
+
+ @VisibleForTesting
+ public void setLocalExecutorTransfer(ExecutorTransfer executorTransfer) {
+ this.executorTransfer = executorTransfer;
+ }
+
+ private static List<Object> All_CONFIGS() {
+ List<Object> ret = new ArrayList<Object>();
+ Config config = new Config();
+ Class<?> ConfigClass = config.getClass();
+ Field[] fields = ConfigClass.getFields();
+ for (int i = 0; i < fields.length; i++) {
+ try {
+ Object obj = fields[i].get(null);
+ ret.add(obj);
+ } catch (IllegalArgumentException e) {
+ LOG.error(e.getMessage(), e);
+ } catch (IllegalAccessException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
new file mode 100644
index 0000000..144ee1b
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.Constants;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutorShutdown.class);
+ private final Executor executor;
+ private final List<Utils.SmartThread> threads;
+ private final Map<Integer, Task> taskDatas;
+
+ public ExecutorShutdown(Executor executor, List<Utils.SmartThread> threads, Map<Integer, Task> taskDatas) {
+ this.executor = executor;
+ this.threads = threads;
+ this.taskDatas = taskDatas;
+ }
+
+ @Override
+ public ExecutorStats renderStats() {
+ return executor.getStats().renderStats();
+ }
+
+ @Override
+ public List<Long> getExecutorId() {
+ return executor.getExecutorId();
+ }
+
+ @Override
+ public void credentialsChanged(Credentials credentials) {
+ TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), new Values(credentials), (int) Constants.SYSTEM_TASK_ID,
+ Constants.CREDENTIALS_CHANGED_STREAM_ID);
+ List<AddressedTuple> addressedTuple = Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+ executor.getReceiveQueue().publish(addressedTuple);
+ }
+
+ @Override
+ public boolean getBackPressureFlag() {
+ return executor.getBackpressure();
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ LOG.info("Shutting down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
+ executor.getReceiveQueue().haltWithInterrupt();
+ executor.getTransferWorkerQueue().haltWithInterrupt();
+ for (Utils.SmartThread t : threads) {
+ t.interrupt();
+ }
+ for (Utils.SmartThread t : threads) {
+ LOG.debug("Executor " + executor.getComponentId() + ":" + executor.getExecutorId() + " joining thread " + t.getName());
+ t.join();
+ }
+ executor.getStats().cleanupStats();
+ for (Task task : taskDatas.values()) {
+ TopologyContext userContext = task.getUserContext();
+ for (ITaskHook hook : userContext.getHooks()) {
+ hook.cleanup();
+ }
+ }
+ executor.getStormClusterState().disconnect();
+ if (executor.getOpenOrPrepareWasCalled().get()) {
+ for (Task task : taskDatas.values()) {
+ Object object = task.getTaskObject();
+ if (object instanceof ISpout) {
+ ((ISpout) object).close();
+ } else if (object instanceof IBolt) {
+ ((IBolt) object).cleanup();
+ } else {
+ LOG.error("unknown component object");
+ }
+ }
+ }
+ LOG.info("Shut down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
new file mode 100644
index 0000000..0012e4d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.lmax.disruptor.EventHandler;
+import org.apache.storm.Config;
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.serialization.KryoTupleSerializer;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableObject;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class ExecutorTransfer implements EventHandler, Callable {
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
+
+ private final WorkerState workerData;
+ private final DisruptorQueue batchTransferQueue;
+ private final Map stormConf;
+ private final KryoTupleSerializer serializer;
+ private final MutableObject cachedEmit;
+ private final boolean isDebug;
+
+ public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map stormConf) {
+ this.workerData = workerData;
+ this.batchTransferQueue = batchTransferQueue;
+ this.stormConf = stormConf;
+ this.serializer = new KryoTupleSerializer(stormConf, workerData.getWorkerTopologyContext());
+ this.cachedEmit = new MutableObject(new ArrayList<>());
+ this.isDebug = ObjectReader.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
+ }
+
+ public void transfer(int task, Tuple tuple) {
+ AddressedTuple val = new AddressedTuple(task, tuple);
+ if (isDebug) {
+ LOG.info("TRANSFERRING tuple {}", val);
+ }
+ batchTransferQueue.publish(val);
+ }
+
+ @VisibleForTesting
+ public DisruptorQueue getBatchTransferQueue() {
+ return this.batchTransferQueue;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ batchTransferQueue.consumeBatchWhenAvailable(this);
+ return 0L;
+ }
+
+ public String getName() {
+ return batchTransferQueue.getName();
+ }
+
+ @Override
+ public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
+ ArrayList cachedEvents = (ArrayList) cachedEmit.getObject();
+ cachedEvents.add(event);
+ if (endOfBatch) {
+ workerData.transfer(serializer, cachedEvents);
+ cachedEmit.setObject(new ArrayList<>());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
new file mode 100644
index 0000000..441fdff
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ExecutorStats;
+
+import java.util.List;
+
+public interface IRunningExecutor {
+
+ ExecutorStats renderStats();
+ List<Long> getExecutorId();
+ void credentialsChanged(Credentials credentials);
+ boolean getBackPressureFlag();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/LocalExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/LocalExecutor.java
new file mode 100644
index 0000000..4f2811b
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/LocalExecutor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.executor;
+
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.RegisteredGlobalState;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class LocalExecutor {
+
+ private static volatile String trackId = null;
+
+ public static Executor mkExecutor(WorkerState workerState, List<Long> executorId, Map<String, String> initialCredentials)
+ throws Exception {
+ Executor executor = Executor.mkExecutor(workerState, executorId, initialCredentials);
+ executor.setLocalExecutorTransfer(new ExecutorTransfer(workerState, executor.getTransferWorkerQueue(),
+ executor.getStormConf()) {
+ @Override
+ public void transfer(int task, Tuple tuple) {
+ if (null != trackId) {
+ ((AtomicInteger) ((Map) RegisteredGlobalState.getState(trackId)).get("transferred")).incrementAndGet();
+ }
+ super.transfer(task, tuple);
+ }
+ });
+ return executor;
+ }
+
+ public static void setTrackId(String trackId) {
+ LocalExecutor.trackId = trackId;
+ }
+
+ public static void clearTrackId() {
+ LocalExecutor.trackId = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java b/storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java
new file mode 100644
index 0000000..4b6d0fa
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class TupleInfo implements Serializable {
+
+ private static final long serialVersionUID = -3348670497595864118L;
+
+ private int taskId;
+ private Object messageId;
+ private String stream;
+ private List<Object> values;
+ private long timestamp;
+ private String id;
+
+ public Object getMessageId() {
+ return messageId;
+ }
+
+ public void setMessageId(Object messageId) {
+ this.messageId = messageId;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getStream() {
+ return stream;
+ }
+
+ public void setStream(String stream) {
+ this.stream = stream;
+ }
+
+ public List<Object> getValues() {
+ return values;
+ }
+
+ public void setValues(List<Object> values) {
+ this.values = values;
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this,
+ ToStringStyle.SHORT_PREFIX_STYLE);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public int getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(int taskId) {
+ this.taskId = taskId;
+ }
+}