You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:28 UTC

[39/52] [partial] storm git commit: STORM-2441 Break down 'storm-core' to extract client (worker) artifacts

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
new file mode 100644
index 0000000..5abb04a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.Config;
+import org.apache.storm.ILocalDRPC;
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.generated.DistributedRPCInvocations;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ExtendedThreadPoolExecutor;
+import org.apache.storm.utils.ServiceRegistry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.thrift.TException;
+import org.json.simple.JSONValue;
+
+public class DRPCSpout extends BaseRichSpout {
+    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+    static final long serialVersionUID = 2387848310969237877L;
+
+    public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
+    
+    SpoutOutputCollector _collector;
+    List<DRPCInvocationsClient> _clients = new ArrayList<>();
+    transient LinkedList<Future<Void>> _futures = null;
+    transient ExecutorService _backround = null;
+    String _function;
+    String _local_drpc_id = null;
+    
+    private static class DRPCMessageId {
+        String id;
+        int index;
+        
+        public DRPCMessageId(String id, int index) {
+            this.id = id;
+            this.index = index;
+        }
+    }
+    
+    
+    public DRPCSpout(String function) {
+        _function = function;
+    }
+
+    public DRPCSpout(String function, ILocalDRPC drpc) {
+        _function = function;
+        _local_drpc_id = drpc.getServiceId();
+    }
+
+    public String get_function() {
+        return _function;
+    }
+
+    private class Adder implements Callable<Void> {
+        private String server;
+        private int port;
+        private Map conf;
+
+        public Adder(String server, int port, Map conf) {
+            this.server = server;
+            this.port = port;
+            this.conf = conf;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port);
+            synchronized (_clients) {
+                _clients.add(c);
+            }
+            return null;
+        }
+    }
+
+    private void reconnectAsync(final DRPCInvocationsClient client) {
+        _futures.add(_backround.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                client.reconnectClient();
+                return null;
+            }
+        }));
+    }
+
+    private void reconnectSync(DRPCInvocationsClient client) {
+        try {
+            LOG.info("reconnecting... ");
+            client.reconnectClient(); //Blocking call
+        } catch (TException e2) {
+            LOG.error("Failed to connect to DRPC server", e2);
+        }
+    }
+
+    private void checkFutures() {
+        Iterator<Future<Void>> i = _futures.iterator();
+        while (i.hasNext()) {
+            Future<Void> f = i.next();
+            if (f.isDone()) {
+                i.remove();
+            }
+            try {
+                f.get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+ 
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        _collector = collector;
+        if(_local_drpc_id==null) {
+            _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
+                60L, TimeUnit.SECONDS,
+                new SynchronousQueue<Runnable>());
+            _futures = new LinkedList<>();
+
+            int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
+            int index = context.getThisTaskIndex();
+
+            int port = ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
+            List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
+            if(servers == null || servers.isEmpty()) {
+                throw new RuntimeException("No DRPC servers configured for topology");   
+            }
+            
+            if (numTasks < servers.size()) {
+                for (String s: servers) {
+                    _futures.add(_backround.submit(new Adder(s, port, conf)));
+                }
+            } else {        
+                int i = index % servers.size();
+                _futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
+            }
+        }
+        
+    }
+
+    @Override
+    public void close() {
+        for(DRPCInvocationsClient client: _clients) {
+            client.close();
+        }
+    }
+
+    @Override
+    public void nextTuple() {
+        boolean gotRequest = false;
+        if(_local_drpc_id==null) {
+            int size = 0;
+            synchronized (_clients) {
+                size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end
+            }
+            for(int i=0; i<size; i++) {
+                DRPCInvocationsClient client;
+                synchronized (_clients) {
+                    client = _clients.get(i);
+                }
+                if (!client.isConnected()) {
+                    LOG.warn("DRPCInvocationsClient [{}:{}] is not connected.", client.getHost(), client.getPort());
+                    reconnectAsync(client);
+                    continue;
+                }
+                try {
+                    DRPCRequest req = client.fetchRequest(_function);
+                    if(req.get_request_id().length() > 0) {
+                        Map returnInfo = new HashMap();
+                        returnInfo.put("id", req.get_request_id());
+                        returnInfo.put("host", client.getHost());
+                        returnInfo.put("port", client.getPort());
+                        gotRequest = true;
+                        _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i));
+                        break;
+                    }
+                } catch (AuthorizationException aze) {
+                    reconnectAsync(client);
+                    LOG.error("Not authorized to fetch DRPC result from DRPC server", aze);
+                } catch (TException e) {
+                    reconnectAsync(client);
+                    LOG.error("Failed to fetch DRPC result from DRPC server", e);
+                } catch (Exception e) {
+                    LOG.error("Failed to fetch DRPC result from DRPC server", e);
+                }
+            }
+            checkFutures();
+        } else {
+            DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
+            if(drpc!=null) { // can happen during shutdown of drpc while topology is still up
+                try {
+                    DRPCRequest req = drpc.fetchRequest(_function);
+                    if(req.get_request_id().length() > 0) {
+                        Map returnInfo = new HashMap();
+                        returnInfo.put("id", req.get_request_id());
+                        returnInfo.put("host", _local_drpc_id);
+                        returnInfo.put("port", 0);
+                        gotRequest = true;
+                        _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), 
+                                        new DRPCMessageId(req.get_request_id(), 0));
+                    }
+                } catch (AuthorizationException aze) {
+                    throw new RuntimeException(aze);
+                } catch (TException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        if(!gotRequest) {
+            Utils.sleep(1);
+        }
+    }
+
+    @Override
+    public void ack(Object msgId) {
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        DRPCMessageId did = (DRPCMessageId) msgId;
+        DistributedRPCInvocations.Iface client;
+        
+        if (_local_drpc_id == null) {
+            client = _clients.get(did.index);
+        } else {
+            client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
+        }
+
+        int retryCnt = 0;
+        int maxRetries = 3;
+
+        while (retryCnt < maxRetries) {
+            retryCnt++;
+            try {
+                client.failRequest(did.id);
+                break;
+            } catch (AuthorizationException aze) {
+                LOG.error("Not authorized to failRequest from DRPC server", aze);
+                throw new RuntimeException(aze);
+            } catch (TException tex) {
+                if (retryCnt >= maxRetries) {
+                    LOG.error("Failed to fail request", tex);
+                    break;
+                }
+                reconnectSync((DRPCInvocationsClient)client);
+            }
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("args", "return-info"));
+    }    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java b/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
new file mode 100644
index 0000000..f57bbb1
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JoinResult extends BaseRichBolt {
+    public static final Logger LOG = LoggerFactory.getLogger(JoinResult.class);
+
+    String returnComponent;
+    Map<Object, Tuple> returns = new HashMap<>();
+    Map<Object, Tuple> results = new HashMap<>();
+    OutputCollector _collector;
+
+    public JoinResult(String returnComponent) {
+        this.returnComponent = returnComponent;
+    }
+ 
+    public void prepare(Map map, TopologyContext context, OutputCollector collector) {
+        _collector = collector;
+    }
+
+    public void execute(Tuple tuple) {
+        Object requestId = tuple.getValue(0);
+        if(tuple.getSourceComponent().equals(returnComponent)) {
+            returns.put(requestId, tuple);
+        } else {
+            results.put(requestId, tuple);
+        }
+
+        if(returns.containsKey(requestId) && results.containsKey(requestId)) {
+            Tuple result = results.remove(requestId);
+            Tuple returner = returns.remove(requestId);
+            LOG.debug(result.getValue(1).toString());
+            List<Tuple> anchors = new ArrayList<>();
+            anchors.add(result);
+            anchors.add(returner);            
+            _collector.emit(anchors, new Values(""+result.getValue(1), returner.getValue(1)));
+            _collector.ack(result);
+            _collector.ack(returner);
+        }
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("result", "return-info"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/KeyedFairBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/KeyedFairBolt.java b/storm-client/src/jvm/org/apache/storm/drpc/KeyedFairBolt.java
new file mode 100644
index 0000000..39860e6
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/KeyedFairBolt.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicBoltExecutor;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.KeyedRoundRobinQueue;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class KeyedFairBolt implements IRichBolt, FinishedCallback {
+    IRichBolt _delegate;
+    KeyedRoundRobinQueue<Tuple> _rrQueue;
+    Thread _executor;
+    FinishedCallback _callback;
+
+    public KeyedFairBolt(IRichBolt delegate) {
+        _delegate = delegate;
+    }
+    
+    public KeyedFairBolt(IBasicBolt delegate) {
+        this(new BasicBoltExecutor(delegate));
+    }
+    
+    
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        if(_delegate instanceof FinishedCallback) {
+            _callback = (FinishedCallback) _delegate;
+        }
+        _delegate.prepare(stormConf, context, collector);
+        _rrQueue = new KeyedRoundRobinQueue<Tuple>();
+        _executor = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    while(true) {
+                        _delegate.execute(_rrQueue.take());
+                    }
+                } catch (InterruptedException e) {
+
+                }
+            }
+        });
+        _executor.setDaemon(true);
+        _executor.start();
+    }
+
+    public void execute(Tuple input) {
+        Object key = input.getValue(0);
+        _rrQueue.add(key, input);
+    }
+
+    public void cleanup() {
+        _executor.interrupt();
+        _delegate.cleanup();
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        _delegate.declareOutputFields(declarer);
+    }
+
+    public void finishedId(Object id) {
+        if(_callback!=null) {
+            _callback.finishedId(id);
+        }
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return new HashMap<String, Object>();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCInputDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCInputDeclarer.java b/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCInputDeclarer.java
new file mode 100644
index 0000000..6f82f80
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCInputDeclarer.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.topology.ComponentConfigurationDeclarer;
+import org.apache.storm.tuple.Fields;
+
+public interface LinearDRPCInputDeclarer extends ComponentConfigurationDeclarer<LinearDRPCInputDeclarer> {
+    public LinearDRPCInputDeclarer fieldsGrouping(Fields fields);
+    public LinearDRPCInputDeclarer fieldsGrouping(String streamId, Fields fields);
+
+    public LinearDRPCInputDeclarer globalGrouping();
+    public LinearDRPCInputDeclarer globalGrouping(String streamId);
+
+    public LinearDRPCInputDeclarer shuffleGrouping();
+    public LinearDRPCInputDeclarer shuffleGrouping(String streamId);
+
+    public LinearDRPCInputDeclarer localOrShuffleGrouping();
+    public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId);
+    
+    public LinearDRPCInputDeclarer noneGrouping();
+    public LinearDRPCInputDeclarer noneGrouping(String streamId);
+
+    public LinearDRPCInputDeclarer allGrouping();
+    public LinearDRPCInputDeclarer allGrouping(String streamId);
+
+    public LinearDRPCInputDeclarer directGrouping();
+    public LinearDRPCInputDeclarer directGrouping(String streamId);
+
+    public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields);
+    public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields);
+
+    public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping grouping);
+    public LinearDRPCInputDeclarer customGrouping(String streamId, CustomStreamGrouping grouping);
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java
new file mode 100644
index 0000000..dc702a3
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.Constants;
+import org.apache.storm.ILocalDRPC;
+import org.apache.storm.coordination.BatchBoltExecutor;
+import org.apache.storm.coordination.CoordinatedBolt;
+import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
+import org.apache.storm.coordination.CoordinatedBolt.IdStreamSpec;
+import org.apache.storm.coordination.CoordinatedBolt.SourceArgs;
+import org.apache.storm.coordination.IBatchBolt;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.grouping.PartialKeyGrouping;
+import org.apache.storm.topology.BaseConfigurationDeclarer;
+import org.apache.storm.topology.BasicBoltExecutor;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.InputDeclarer;
+import org.apache.storm.topology.OutputFieldsGetter;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+// Trident subsumes the functionality provided by this class, so it's deprecated
+@Deprecated
+public class LinearDRPCTopologyBuilder {    
+    String _function;
+    List<Component> _components = new ArrayList<Component>();
+    
+    
+    public LinearDRPCTopologyBuilder(String function) {
+        _function = function;
+    }
+        
+    public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) {
+        return addBolt(new BatchBoltExecutor(bolt), parallelism);
+    }
+    
+    public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) {
+        return addBolt(bolt, 1);
+    }
+    
+    @Deprecated
+    public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) {
+        if(parallelism==null) parallelism = 1; 
+        Component component = new Component(bolt, parallelism.intValue());
+        _components.add(component);
+        return new InputDeclarerImpl(component);
+    }
+    
+    @Deprecated
+    public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) {
+        return addBolt(bolt, null);
+    }
+    
+    public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) {
+        return addBolt(new BasicBoltExecutor(bolt), parallelism);
+    }
+
+    public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) {
+        return addBolt(bolt, null);
+    }
+        
+    public StormTopology createLocalTopology(ILocalDRPC drpc) {
+        return createTopology(new DRPCSpout(_function, drpc));
+    }
+    
+    public StormTopology createRemoteTopology() {
+        return createTopology(new DRPCSpout(_function));
+    }
+    
+    
+    private StormTopology createTopology(DRPCSpout spout) {
+        final String SPOUT_ID = "spout";
+        final String PREPARE_ID = "prepare-request";
+        
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout(SPOUT_ID, spout);
+        builder.setBolt(PREPARE_ID, new PrepareRequest())
+                .noneGrouping(SPOUT_ID);
+        int i=0;
+        for(; i<_components.size();i++) {
+            Component component = _components.get(i);
+            
+            Map<String, SourceArgs> source = new HashMap<String, SourceArgs>();
+            if (i==1) {
+                source.put(boltId(i-1), SourceArgs.single());
+            } else if (i>=2) {
+                source.put(boltId(i-1), SourceArgs.all());
+            }
+            IdStreamSpec idSpec = null;
+            if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) {
+                idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM);
+            }
+            BoltDeclarer declarer = builder.setBolt(
+                    boltId(i),
+                    new CoordinatedBolt(component.bolt, source, idSpec),
+                    component.parallelism);
+            
+            for(Map<String, Object> conf: component.componentConfs) {
+                declarer.addConfigurations(conf);
+            }
+            
+            if(idSpec!=null) {
+                declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request"));
+            }
+            if(i==0 && component.declarations.isEmpty()) {
+                declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
+            } else {
+                String prevId;
+                if(i==0) {
+                    prevId = PREPARE_ID;
+                } else {
+                    prevId = boltId(i-1);
+                }
+                for(InputDeclaration declaration: component.declarations) {
+                    declaration.declare(prevId, declarer);
+                }
+            }
+            if(i>0) {
+                declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID); 
+            }
+        }
+        
+        IRichBolt lastBolt = _components.get(_components.size()-1).bolt;
+        OutputFieldsGetter getter = new OutputFieldsGetter();
+        lastBolt.declareOutputFields(getter);
+        Map<String, StreamInfo> streams = getter.getFieldsDeclaration();
+        if(streams.size()!=1) {
+            throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");
+        }
+        String outputStream = streams.keySet().iterator().next();
+        List<String> fields = streams.get(outputStream).get_output_fields();
+        if(fields.size()!=2) {
+            throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");
+        }
+
+        builder.setBolt(boltId(i), new JoinResult(PREPARE_ID))
+                .fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0)))
+                .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));
+        i++;
+        builder.setBolt(boltId(i), new ReturnResults())
+                .noneGrouping(boltId(i-1));
+        return builder.createTopology();
+    }
+    
+    private static String boltId(int index) {
+        return "bolt" + index;
+    }
+    
+    private static class Component {
+        public IRichBolt bolt;
+        public int parallelism;
+        public List<Map<String, Object>> componentConfs = new ArrayList<>();
+        public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
+        
+        public Component(IRichBolt bolt, int parallelism) {
+            this.bolt = bolt;
+            this.parallelism = parallelism;
+        }
+    }
+    
+    private static interface InputDeclaration {
+        public void declare(String prevComponent, InputDeclarer declarer);
+    }
+    
+    private static class InputDeclarerImpl extends BaseConfigurationDeclarer<LinearDRPCInputDeclarer> implements LinearDRPCInputDeclarer {
+        Component _component;
+        
+        public InputDeclarerImpl(Component component) {
+            _component = component;
+        }
+        
+        @Override
+        public LinearDRPCInputDeclarer fieldsGrouping(final Fields fields) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.fieldsGrouping(prevComponent, fields);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer fieldsGrouping(final String streamId, final Fields fields) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.fieldsGrouping(prevComponent, streamId, fields);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer globalGrouping() {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.globalGrouping(prevComponent);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer globalGrouping(final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.globalGrouping(prevComponent, streamId);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer shuffleGrouping() {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.shuffleGrouping(prevComponent);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer shuffleGrouping(final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.shuffleGrouping(prevComponent, streamId);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer localOrShuffleGrouping() {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.localOrShuffleGrouping(prevComponent);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer localOrShuffleGrouping(final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.localOrShuffleGrouping(prevComponent, streamId);
+                }                
+            });
+            return this;
+        }
+        
+        @Override
+        public LinearDRPCInputDeclarer noneGrouping() {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.noneGrouping(prevComponent);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer noneGrouping(final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.noneGrouping(prevComponent, streamId);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer allGrouping() {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.allGrouping(prevComponent);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer allGrouping(final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.allGrouping(prevComponent, streamId);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer directGrouping() {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.directGrouping(prevComponent);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer directGrouping(final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.directGrouping(prevComponent, streamId);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields) {
+            return customGrouping(new PartialKeyGrouping(fields));
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields) {
+            return customGrouping(streamId, new PartialKeyGrouping(fields));
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer customGrouping(final CustomStreamGrouping grouping) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.customGrouping(prevComponent, grouping);
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer customGrouping(final String streamId, final CustomStreamGrouping grouping) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(String prevComponent, InputDeclarer declarer) {
+                    declarer.customGrouping(prevComponent, streamId, grouping);
+                }                
+            });
+            return this;
+        }
+        
+        private void addDeclaration(InputDeclaration declaration) {
+            _component.declarations.add(declaration);
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer addConfigurations(Map<String, Object> conf) {
+            _component.componentConfs.add(conf);
+            return this;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java b/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
new file mode 100644
index 0000000..06e576c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.storm.utils.Utils;
+
+
+public class PrepareRequest extends BaseBasicBolt {
+    public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID;
+    public static final String RETURN_STREAM = "ret";
+    public static final String ID_STREAM = "id";
+
+    Random rand;
+
+    @Override
+    public void prepare(Map map, TopologyContext context) {
+        rand = new Random();
+    }
+
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+        String args = tuple.getString(0);
+        String returnInfo = tuple.getString(1);
+        long requestId = rand.nextLong();
+        collector.emit(ARGS_STREAM, new Values(requestId, args));
+        collector.emit(RETURN_STREAM, new Values(requestId, returnInfo));
+        collector.emit(ID_STREAM, new Values(requestId));
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declareStream(ARGS_STREAM, new Fields("request", "args"));
+        declarer.declareStream(RETURN_STREAM, new Fields("request", "return"));
+        declarer.declareStream(ID_STREAM, new Fields("request"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java b/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
new file mode 100644
index 0000000..8f03356
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.drpc;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.DistributedRPCInvocations;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServiceRegistry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
+
+
+public class ReturnResults extends BaseRichBolt {
+    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+    static final long serialVersionUID = -774882142710631591L;
+
+    public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);
+    OutputCollector _collector;
+    boolean local;
+    Map _conf; 
+    Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        _conf = stormConf;
+        _collector = collector;
+        local = stormConf.get(Config.STORM_CLUSTER_MODE).equals("local");
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        String result = (String) input.getValue(0);
+        String returnInfo = (String) input.getValue(1);
+        if (returnInfo!=null) {
+            Map retMap = null;
+            try {
+                retMap = (Map) JSONValue.parseWithException(returnInfo);
+            } catch (ParseException e) {
+                 LOG.error("Parseing returnInfo failed", e);
+                 _collector.fail(input);
+                 return;
+            }
+            final String host = (String) retMap.get("host");
+            final int port = ObjectReader.getInt(retMap.get("port"));
+            String id = (String) retMap.get("id");
+            DistributedRPCInvocations.Iface client;
+            if (local) {
+                client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
+            } else {
+                List server = new ArrayList() {{
+                    add(host);
+                    add(port);
+                }};
+            
+                if(!_clients.containsKey(server)) {
+                    try {
+                        _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
+                    } catch (TTransportException ex) {
+                        throw new RuntimeException(ex);
+                    }
+                }
+                client = _clients.get(server);
+            }
+ 
+
+            int retryCnt = 0;
+            int maxRetries = 3;
+            while (retryCnt < maxRetries) {
+                retryCnt++;
+                try {
+                    client.result(id, result);
+                    _collector.ack(input);
+                    break;
+                } catch (AuthorizationException aze) {
+                    LOG.error("Not authorized to return results to DRPC server", aze);
+                    _collector.fail(input);
+                    throw new RuntimeException(aze);
+                } catch (TException tex) {
+                    if (retryCnt >= maxRetries) {
+                        LOG.error("Failed to return results to DRPC server", tex);
+                        _collector.fail(input);
+                    }
+                    reconnectClient((DRPCInvocationsClient) client);
+                }
+            }
+        }
+    }    
+
+    private void reconnectClient(DRPCInvocationsClient client) {
+        if (client instanceof DRPCInvocationsClient) {
+            try {
+                LOG.info("reconnecting... ");
+                client.reconnectClient(); //Blocking call
+            } catch (TException e2) {
+                LOG.error("Failed to connect to DRPC server", e2);
+            }
+        }
+    }
+    @Override
+    public void cleanup() {
+        for(DRPCInvocationsClient c: _clients.values()) {
+            c.close();
+        }
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
new file mode 100644
index 0000000..d89ac69
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler<Object> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Executor.class);
+
+    protected final WorkerState workerData;
+    protected final WorkerTopologyContext workerTopologyContext;
+    protected final List<Long> executorId;
+    protected final List<Integer> taskIds;
+    protected final String componentId;
+    protected final AtomicBoolean openOrPrepareWasCalled;
+    protected final Map stormConf;
+    protected final Map conf;
+    protected final String stormId;
+    protected final HashMap sharedExecutorData;
+    protected final AtomicBoolean stormActive;
+    protected final AtomicReference<Map<String, DebugOptions>> stormComponentDebug;
+    protected final Runnable suicideFn;
+    protected final IStormClusterState stormClusterState;
+    protected final Map<Integer, String> taskToComponent;
+    protected CommonStats stats;
+    protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;
+    protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamToComponentToGrouper;
+    protected final ReportErrorAndDie reportErrorDie;
+    protected final Callable<Boolean> sampler;
+    protected ExecutorTransfer executorTransfer;
+    protected final String type;
+    protected final AtomicBoolean throttleOn;
+
+    protected final IReportError reportError;
+    protected final Random rand;
+    protected final DisruptorQueue transferQueue;
+    protected final DisruptorQueue receiveQueue;
+    protected Map<Integer, Task> idToTask;
+    protected final Map<String, String> credentials;
+    protected final Boolean isDebug;
+    protected final Boolean hasEventLoggers;
+    protected String hostname;
+
+    protected Executor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
+        this.workerData = workerData;
+        this.executorId = executorId;
+        this.workerTopologyContext = workerData.getWorkerTopologyContext();
+        this.taskIds = StormCommon.executorIdToTasks(executorId);
+        this.componentId = workerTopologyContext.getComponentId(taskIds.get(0));
+        this.openOrPrepareWasCalled = new AtomicBoolean(false);
+        this.stormConf = normalizedComponentConf(workerData.getTopologyConf(), workerTopologyContext, componentId);
+        this.receiveQueue = (workerData.getExecutorReceiveQueueMap().get(executorId));
+        this.stormId = workerData.getTopologyId();
+        this.conf = workerData.getConf();
+        this.sharedExecutorData = new HashMap();
+        this.stormActive = workerData.getIsTopologyActive();
+        this.stormComponentDebug = workerData.getStormComponentToDebug();
+
+        this.transferQueue = mkExecutorBatchQueue(stormConf, executorId);
+        this.executorTransfer = new ExecutorTransfer(workerData, transferQueue, stormConf);
+
+        this.suicideFn = workerData.getSuicideCallback();
+        try {
+            this.stormClusterState = ClusterUtils.mkStormClusterState(workerData.getStateStorage(), Utils.getWorkerACL(stormConf),
+                    new ClusterStateContext(DaemonType.WORKER));
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+
+        StormTopology topology = workerTopologyContext.getRawTopology();
+        Map<String, SpoutSpec> spouts = topology.get_spouts();
+        Map<String, Bolt> bolts = topology.get_bolts();
+        if (spouts.containsKey(componentId)) {
+            this.type = StatsUtil.SPOUT;
+            this.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(stormConf),ObjectReader.getInt(stormConf.get(Config.NUM_STAT_BUCKETS)));
+        } else if (bolts.containsKey(componentId)) {
+            this.type = StatsUtil.BOLT;
+            this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(stormConf),ObjectReader.getInt(stormConf.get(Config.NUM_STAT_BUCKETS)));
+        } else {
+            throw new RuntimeException("Could not find " + componentId + " in " + topology);
+        }
+
+        this.intervalToTaskToMetricToRegistry = new HashMap<>();
+        this.taskToComponent = workerData.getTaskToComponent();
+        this.streamToComponentToGrouper = outboundComponents(workerTopologyContext, componentId, stormConf);
+        this.reportError = new ReportError(stormConf, stormClusterState, stormId, componentId, workerTopologyContext);
+        this.reportErrorDie = new ReportErrorAndDie(reportError, suicideFn);
+        this.sampler = ConfigUtils.mkStatsSampler(stormConf);
+        this.throttleOn = workerData.getThrottleOn();
+        this.isDebug = ObjectReader.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
+        this.rand = new Random(Utils.secureRandomLong());
+        this.credentials = credentials;
+        this.hasEventLoggers = StormCommon.hasEventLoggers(stormConf);
+
+        try {
+            this.hostname = Utils.hostname();
+        } catch (UnknownHostException ignored) {
+            this.hostname = "";
+        }
+    }
+
+    public static Executor mkExecutor(WorkerState workerState, List<Long> executorId, Map<String, String> credentials) {
+        Executor executor;
+
+        WorkerTopologyContext workerTopologyContext = workerState.getWorkerTopologyContext();
+        List<Integer> taskIds = StormCommon.executorIdToTasks(executorId);
+        String componentId = workerTopologyContext.getComponentId(taskIds.get(0));
+
+        String type = getExecutorType(workerTopologyContext, componentId);
+        if (StatsUtil.SPOUT.equals(type)) {
+            executor = new SpoutExecutor(workerState, executorId, credentials);
+            executor.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS)));
+        } else {
+            executor = new BoltExecutor(workerState, executorId, credentials);
+            executor.stats = new BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS)));
+        }
+
+        Map<Integer, Task> idToTask = new HashMap<>();
+        for (Integer taskId : taskIds) {
+            try {
+                Task task = new Task(executor, taskId);
+                executor.sendUnanchored(
+                        task, StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer());
+                idToTask.put(taskId, task);
+            } catch (IOException ex) {
+                throw Utils.wrapInRuntime(ex);
+            }
+        }
+
+        executor.idToTask = idToTask;
+        return executor;
+    }
+
+    private static String getExecutorType(WorkerTopologyContext workerTopologyContext, String componentId) {
+        StormTopology topology = workerTopologyContext.getRawTopology();
+        Map<String, SpoutSpec> spouts = topology.get_spouts();
+        Map<String, Bolt> bolts = topology.get_bolts();
+        if (spouts.containsKey(componentId)) {
+            return StatsUtil.SPOUT;
+        } else if (bolts.containsKey(componentId)) {
+            return StatsUtil.BOLT;
+        } else {
+            throw new RuntimeException("Could not find " + componentId + " in " + topology);
+        }
+    }
+
+    /**
+     * separated from mkExecutor in order to replace executor transfer in executor data for testing
+     */
+    public ExecutorShutdown execute() throws Exception {
+        LOG.info("Loading executor tasks " + componentId + ":" + executorId);
+
+        registerBackpressure();
+        Utils.SmartThread systemThreads =
+                Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie);
+
+        String handlerName = componentId + "-executor" + executorId;
+        Utils.SmartThread handlers =
+                Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);
+        setupTicks(StatsUtil.SPOUT.equals(type));
+
+        LOG.info("Finished loading executor " + componentId + ":" + executorId);
+        return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
+    }
+
+    public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
+        ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event;
+        for (AddressedTuple addressedTuple : addressedTuples) {
+            TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+            int taskId = addressedTuple.getDest();
+            if (isDebug) {
+                LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
+            }
+            if (taskId != AddressedTuple.BROADCAST_DEST) {
+                tupleActionFn(taskId, tuple);
+            } else {
+                for (Integer t : taskIds) {
+                    tupleActionFn(t, tuple);
+                }
+            }
+        }
+    }
+
+    public void metricsTick(Task taskData, TupleImpl tuple) {
+        try {
+            Integer interval = tuple.getInteger(0);
+            int taskId = taskData.getTaskId();
+            Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = intervalToTaskToMetricToRegistry.get(interval);
+            Map<String, IMetric> nameToRegistry = null;
+            if (taskToMetricToRegistry != null) {
+                nameToRegistry = taskToMetricToRegistry.get(taskId);
+            }
+            if (nameToRegistry != null) {
+                IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
+                        hostname, workerTopologyContext.getThisWorkerPort(),
+                        componentId, taskId, Time.currentTimeSecs(), interval);
+                List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
+                for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {
+                    IMetric metric = entry.getValue();
+                    Object value = metric.getValueAndReset();
+                    if (value != null) {
+                        IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
+                        dataPoints.add(dataPoint);
+                    }
+                }
+                if (!dataPoints.isEmpty()) {
+                    sendUnanchored(taskData, Constants.METRICS_STREAM_ID,
+                            new Values(taskInfo, dataPoints), executorTransfer);
+                }
+            }
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    protected void setupMetrics() {
+        for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
+            StormTimer timerTask = workerData.getUserTimer();
+            timerTask.scheduleRecurring(interval, interval, new Runnable() {
+                @Override
+                public void run() {
+                    TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(interval),
+                            (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
+                    List<AddressedTuple> metricsTickTuple =
+                            Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+                    receiveQueue.publish(metricsTickTuple);
+                }
+            });
+        }
+    }
+
+    public void sendUnanchored(Task task, String stream, List<Object> values, ExecutorTransfer transfer) {
+        Tuple tuple = task.getTuple(stream, values);
+        List<Integer> tasks = task.getOutgoingTasks(stream, values);
+        for (Integer t : tasks) {
+            transfer.transfer(t, tuple);
+        }
+    }
+
+    /**
+     * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
+     */
+    public void sendToEventLogger(Executor executor, Task taskData, List values,
+                                  String componentId, Object messageId, Random random) {
+        Map<String, DebugOptions> componentDebug = executor.getStormComponentDebug().get();
+        DebugOptions debugOptions = componentDebug.get(componentId);
+        if (debugOptions == null) {
+            debugOptions = componentDebug.get(executor.getStormId());
+        }
+        double spct = ((debugOptions != null) && (debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0;
+        if (spct > 0 && (random.nextDouble() * 100) < spct) {
+            sendUnanchored(taskData, StormCommon.EVENTLOGGER_STREAM_ID,
+                    new Values(componentId, messageId, System.currentTimeMillis(), values),
+                    executor.getExecutorTransfer());
+        }
+    }
+
+    private void registerBackpressure() {
+        receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback() {
+            @Override
+            public void highWaterMark() throws Exception {
+                LOG.debug("executor " + executorId + " is congested, set backpressure flag true");
+                WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
+            }
+
+            @Override
+            public void lowWaterMark() throws Exception {
+                LOG.debug("executor " + executorId + " is not-congested, set backpressure flag false");
+                WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
+            }
+        });
+        receiveQueue.setHighWaterMark(ObjectReader.getDouble(stormConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
+        receiveQueue.setLowWaterMark(ObjectReader.getDouble(stormConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
+        receiveQueue.setEnableBackpressure(ObjectReader.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false));
+    }
+
+    protected void setupTicks(boolean isSpout) {
+        final Integer tickTimeSecs = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
+        boolean enableMessageTimeout = (Boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
+        if (tickTimeSecs != null) {
+            if (Utils.isSystemId(componentId) || (!enableMessageTimeout && isSpout)) {
+                LOG.info("Timeouts disabled for executor " + componentId + ":" + executorId);
+            } else {
+                StormTimer timerTask = workerData.getUserTimer();
+                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, new Runnable() {
+                    @Override
+                    public void run() {
+                        TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
+                                (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID);
+                        List<AddressedTuple> tickTuple =
+                                Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+                        receiveQueue.publish(tickTuple);
+                    }
+                });
+            }
+        }
+    }
+
+
+    private DisruptorQueue mkExecutorBatchQueue(Map stormConf, List<Long> executorId) {
+        int sendSize = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
+        int waitTimeOutMs = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS));
+        int batchSize = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE));
+        int batchTimeOutMs = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
+        return new DisruptorQueue("executor" + executorId + "-send-queue", ProducerType.SINGLE,
+                sendSize, waitTimeOutMs, batchSize, batchTimeOutMs);
+    }
+
+    /**
+     * Returns map of stream id to component id to grouper
+     */
+    private Map<String, Map<String, LoadAwareCustomStreamGrouping>> outboundComponents(
+            WorkerTopologyContext workerTopologyContext, String componentId, Map stormConf) {
+        Map<String, Map<String, LoadAwareCustomStreamGrouping>> ret = new HashMap<>();
+
+        Map<String, Map<String, Grouping>> outputGroupings = workerTopologyContext.getTargets(componentId);
+        for (Map.Entry<String, Map<String, Grouping>> entry : outputGroupings.entrySet()) {
+            String streamId = entry.getKey();
+            Map<String, Grouping> componentGrouping = entry.getValue();
+            Fields outFields = workerTopologyContext.getComponentOutputFields(componentId, streamId);
+            Map<String, LoadAwareCustomStreamGrouping> componentGrouper = new HashMap<String, LoadAwareCustomStreamGrouping>();
+            for (Map.Entry<String, Grouping> cg : componentGrouping.entrySet()) {
+                String component = cg.getKey();
+                Grouping grouping = cg.getValue();
+                List<Integer> outTasks = workerTopologyContext.getComponentTasks(component);
+                LoadAwareCustomStreamGrouping grouper = GrouperFactory.mkGrouper(
+                        workerTopologyContext, componentId, streamId, outFields, grouping, outTasks, stormConf);
+                componentGrouper.put(component, grouper);
+            }
+            if (componentGrouper.size() > 0) {
+                ret.put(streamId, componentGrouper);
+            }
+        }
+
+        for (String stream : workerTopologyContext.getComponentCommon(componentId).get_streams().keySet()) {
+            if (!ret.containsKey(stream)) {
+                ret.put(stream, null);
+            }
+        }
+
+        return ret;
+    }
+
+    private Map normalizedComponentConf(Map stormConf, WorkerTopologyContext topologyContext, String componentId) {
+        List<Object> keysToRemove = All_CONFIGS();
+        keysToRemove.remove(Config.TOPOLOGY_DEBUG);
+        keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING);
+        keysToRemove.remove(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
+        keysToRemove.remove(Config.TOPOLOGY_TRANSACTIONAL_ID);
+        keysToRemove.remove(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+        keysToRemove.remove(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS);
+        keysToRemove.remove(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME);
+        keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER);
+        keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER_CONFIG);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
+
+        Map<Object, Object> componentConf;
+        String specJsonConf = topologyContext.getComponentCommon(componentId).get_json_conf();
+        if (specJsonConf != null) {
+            try {
+                componentConf = (Map<Object, Object>) JSONValue.parseWithException(specJsonConf);
+            } catch (ParseException e) {
+                throw new RuntimeException(e);
+            }
+            for (Object p : keysToRemove) {
+                componentConf.remove(p);
+            }
+        } else {
+            componentConf = new HashMap<>();
+        }
+
+        Map<Object, Object> ret = new HashMap<>();
+        ret.putAll(stormConf);
+        ret.putAll(componentConf);
+
+        return ret;
+    }
+
+    // =============================================================================
+    // ============================ getter methods =================================
+    // =============================================================================
+
+    public List<Long> getExecutorId() {
+        return executorId;
+    }
+
+    public List<Integer> getTaskIds() {
+        return taskIds;
+    }
+
+    public String getComponentId() {
+        return componentId;
+    }
+
+    public AtomicBoolean getOpenOrPrepareWasCalled() {
+        return openOrPrepareWasCalled;
+    }
+
+    public Map getStormConf() {
+        return stormConf;
+    }
+
+    public String getStormId() {
+        return stormId;
+    }
+
+    public CommonStats getStats() {
+        return stats;
+    }
+
+    public AtomicBoolean getThrottleOn() {
+        return throttleOn;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public Boolean getIsDebug() {
+        return isDebug;
+    }
+
+    public ExecutorTransfer getExecutorTransfer() {
+        return executorTransfer;
+    }
+
+    public IReportError getReportError() {
+        return reportError;
+    }
+
+    public WorkerTopologyContext getWorkerTopologyContext() {
+        return workerTopologyContext;
+    }
+
+    public Callable<Boolean> getSampler() {
+        return sampler;
+    }
+
+    public AtomicReference<Map<String, DebugOptions>> getStormComponentDebug() {
+        return stormComponentDebug;
+    }
+
+    public DisruptorQueue getReceiveQueue() {
+        return receiveQueue;
+    }
+
+    public boolean getBackpressure() {
+        return receiveQueue.getThrottleOn();
+    }
+
+    public DisruptorQueue getTransferWorkerQueue() {
+        return transferQueue;
+    }
+
+    public IStormClusterState getStormClusterState() {
+        return stormClusterState;
+    }
+
+    public WorkerState getWorkerData() {
+        return workerData;
+    }
+
+    public Map<String, Map<String, LoadAwareCustomStreamGrouping>> getStreamToComponentToGrouper() {
+        return streamToComponentToGrouper;
+    }
+
+    public HashMap getSharedExecutorData() {
+        return sharedExecutorData;
+    }
+
+    public Map<Integer, Map<Integer, Map<String, IMetric>>> getIntervalToTaskToMetricToRegistry() {
+        return intervalToTaskToMetricToRegistry;
+    }
+
+    @VisibleForTesting
+    public void setLocalExecutorTransfer(ExecutorTransfer executorTransfer) {
+        this.executorTransfer = executorTransfer;
+    }
+
+    private static List<Object> All_CONFIGS() {
+        List<Object> ret = new ArrayList<Object>();
+        Config config = new Config();
+        Class<?> ConfigClass = config.getClass();
+        Field[] fields = ConfigClass.getFields();
+        for (int i = 0; i < fields.length; i++) {
+            try {
+                Object obj = fields[i].get(null);
+                ret.add(obj);
+            } catch (IllegalArgumentException e) {
+                LOG.error(e.getMessage(), e);
+            } catch (IllegalAccessException e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
new file mode 100644
index 0000000..144ee1b
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.Constants;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ExecutorShutdown.class);
+    private final Executor executor;
+    private final List<Utils.SmartThread> threads;
+    private final Map<Integer, Task> taskDatas;
+
+    public ExecutorShutdown(Executor executor, List<Utils.SmartThread> threads, Map<Integer, Task> taskDatas) {
+        this.executor = executor;
+        this.threads = threads;
+        this.taskDatas = taskDatas;
+    }
+
+    @Override
+    public ExecutorStats renderStats() {
+        return executor.getStats().renderStats();
+    }
+
+    @Override
+    public List<Long> getExecutorId() {
+        return executor.getExecutorId();
+    }
+
+    @Override
+    public void credentialsChanged(Credentials credentials) {
+        TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), new Values(credentials), (int) Constants.SYSTEM_TASK_ID,
+                Constants.CREDENTIALS_CHANGED_STREAM_ID);
+        List<AddressedTuple> addressedTuple = Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+        executor.getReceiveQueue().publish(addressedTuple);
+    }
+
+    @Override
+    public boolean getBackPressureFlag() {
+        return executor.getBackpressure();
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            LOG.info("Shutting down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
+            executor.getReceiveQueue().haltWithInterrupt();
+            executor.getTransferWorkerQueue().haltWithInterrupt();
+            for (Utils.SmartThread t : threads) {
+                t.interrupt();
+            }
+            for (Utils.SmartThread t : threads) {
+                LOG.debug("Executor " + executor.getComponentId() + ":" + executor.getExecutorId() + " joining thread " + t.getName());
+                t.join();
+            }
+            executor.getStats().cleanupStats();
+            for (Task task : taskDatas.values()) {
+                TopologyContext userContext = task.getUserContext();
+                for (ITaskHook hook : userContext.getHooks()) {
+                    hook.cleanup();
+                }
+            }
+            executor.getStormClusterState().disconnect();
+            if (executor.getOpenOrPrepareWasCalled().get()) {
+                for (Task task : taskDatas.values()) {
+                    Object object = task.getTaskObject();
+                    if (object instanceof ISpout) {
+                        ((ISpout) object).close();
+                    } else if (object instanceof IBolt) {
+                        ((IBolt) object).cleanup();
+                    } else {
+                        LOG.error("unknown component object");
+                    }
+                }
+            }
+            LOG.info("Shut down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
new file mode 100644
index 0000000..0012e4d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.lmax.disruptor.EventHandler;
+import org.apache.storm.Config;
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.serialization.KryoTupleSerializer;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableObject;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class ExecutorTransfer implements EventHandler, Callable {
+    private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
+
+    private final WorkerState workerData;
+    private final DisruptorQueue batchTransferQueue;
+    private final Map stormConf;
+    private final KryoTupleSerializer serializer;
+    private final MutableObject cachedEmit;
+    private final boolean isDebug;
+
+    public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map stormConf) {
+        this.workerData = workerData;
+        this.batchTransferQueue = batchTransferQueue;
+        this.stormConf = stormConf;
+        this.serializer = new KryoTupleSerializer(stormConf, workerData.getWorkerTopologyContext());
+        this.cachedEmit = new MutableObject(new ArrayList<>());
+        this.isDebug = ObjectReader.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
+    }
+
+    public void transfer(int task, Tuple tuple) {
+        AddressedTuple val = new AddressedTuple(task, tuple);
+        if (isDebug) {
+            LOG.info("TRANSFERRING tuple {}", val);
+        }
+        batchTransferQueue.publish(val);
+    }
+
+    @VisibleForTesting
+    public DisruptorQueue getBatchTransferQueue() {
+        return this.batchTransferQueue;
+    }
+
+    @Override
+    public Object call() throws Exception {
+        batchTransferQueue.consumeBatchWhenAvailable(this);
+        return 0L;
+    }
+
+    public String getName() {
+        return batchTransferQueue.getName();
+    }
+
+    @Override
+    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
+        ArrayList cachedEvents = (ArrayList) cachedEmit.getObject();
+        cachedEvents.add(event);
+        if (endOfBatch) {
+            workerData.transfer(serializer, cachedEvents);
+            cachedEmit.setObject(new ArrayList<>());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
new file mode 100644
index 0000000..441fdff
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ExecutorStats;
+
+import java.util.List;
+
+public interface IRunningExecutor {
+
+    ExecutorStats renderStats();
+    List<Long> getExecutorId();
+    void credentialsChanged(Credentials credentials);
+    boolean getBackPressureFlag();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/LocalExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/LocalExecutor.java
new file mode 100644
index 0000000..4f2811b
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/LocalExecutor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.executor;
+
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.RegisteredGlobalState;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class LocalExecutor {
+
+    private static volatile String trackId = null;
+
+    public static Executor mkExecutor(WorkerState workerState, List<Long> executorId, Map<String, String> initialCredentials)
+        throws Exception {
+        Executor executor = Executor.mkExecutor(workerState, executorId, initialCredentials);
+        executor.setLocalExecutorTransfer(new ExecutorTransfer(workerState, executor.getTransferWorkerQueue(),
+            executor.getStormConf()) {
+            @Override
+            public void transfer(int task, Tuple tuple) {
+                if (null != trackId) {
+                    ((AtomicInteger) ((Map) RegisteredGlobalState.getState(trackId)).get("transferred")).incrementAndGet();
+                }
+                super.transfer(task, tuple);
+            }
+        });
+        return executor;
+    }
+
+    public static void setTrackId(String trackId) {
+        LocalExecutor.trackId = trackId;
+    }
+
+    public static void clearTrackId() {
+        LocalExecutor.trackId = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java b/storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java
new file mode 100644
index 0000000..4b6d0fa
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class TupleInfo implements Serializable {
+
+    private static final long serialVersionUID = -3348670497595864118L;
+
+    private int taskId;
+    private Object messageId;
+    private String stream;
+    private List<Object> values;
+    private long timestamp;
+    private String id;
+
+    public Object getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(Object messageId) {
+        this.messageId = messageId;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public String getStream() {
+        return stream;
+    }
+
+    public void setStream(String stream) {
+        this.stream = stream;
+    }
+
+    public List<Object> getValues() {
+        return values;
+    }
+
+    public void setValues(List<Object> values) {
+        this.values = values;
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this,
+                ToStringStyle.SHORT_PREFIX_STYLE);
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(int taskId) {
+        this.taskId = taskId;
+    }
+}