You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/08 08:17:37 UTC

[1/5] storm git commit: STORM-1277 port backtype.storm.daemon.executor to java

Repository: storm
Updated Branches:
  refs/heads/master 44068c419 -> 28563ece1


http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/error/ReportError.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/error/ReportError.java b/storm-core/src/jvm/org/apache/storm/executor/error/ReportError.java
new file mode 100644
index 0000000..ec71ed4
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/error/ReportError.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ReportError implements IReportError {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReportError.class);
+
+    private final Map stormConf;
+    private final IStormClusterState stormClusterState;
+    private final String stormId;
+    private final String componentId;
+    private final WorkerTopologyContext workerTopologyContext;
+
+    private int maxPerInterval;
+    private int errorIntervalSecs;
+    private AtomicInteger intervalStartTime;
+    private AtomicInteger intervalErrors;
+
+    public ReportError(Map stormConf, IStormClusterState stormClusterState, String stormId, String componentId, WorkerTopologyContext workerTopologyContext) {
+        this.stormConf = stormConf;
+        this.stormClusterState = stormClusterState;
+        this.stormId = stormId;
+        this.componentId = componentId;
+        this.workerTopologyContext = workerTopologyContext;
+        this.errorIntervalSecs = Utils.getInt(stormConf.get(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS));
+        this.maxPerInterval = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL));
+        this.intervalStartTime = new AtomicInteger(Time.currentTimeSecs());
+        this.intervalErrors = new AtomicInteger(0);
+    }
+
+    @Override
+    public void report(Throwable error) {
+        LOG.error("Error", error);
+        if (Time.deltaSecs(intervalStartTime.get()) > errorIntervalSecs) {
+            intervalErrors.set(0);
+            intervalStartTime.set(Time.currentTimeSecs());
+        }
+        if (intervalErrors.incrementAndGet() <= maxPerInterval) {
+            try {
+                stormClusterState.reportError(stormId, componentId, Utils.hostname(stormConf),
+                        workerTopologyContext.getThisWorkerPort().longValue(), error);
+            } catch (UnknownHostException e) {
+                throw Utils.wrapInRuntime(e);
+            }
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java b/storm-core/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java
new file mode 100644
index 0000000..d326af1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReportErrorAndDie implements Thread.UncaughtExceptionHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(ReportErrorAndDie.class);
+    private final IReportError reportError;
+    private final Runnable suicideFn;
+
+    public ReportErrorAndDie(IReportError reportError, Runnable suicideFn) {
+        this.reportError = reportError;
+        this.suicideFn = suicideFn;
+    }
+
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+        try {
+            reportError.report(e);
+        } catch (Exception ex) {
+            LOG.error("Error while reporting error to cluster, proceeding with shutdown", ex);
+        }
+        if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)
+                || Utils.exceptionCauseIsInstanceOf(java.io.InterruptedIOException.class, e)) {
+            LOG.info("Got interrupted exception shutting thread down...");
+            suicideFn.run();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
new file mode 100644
index 0000000..a2ee650
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.concurrent.Callable;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);
+
+    private final ISpoutWaitStrategy spoutWaitStrategy;
+    private Integer maxSpoutPending;
+    private final AtomicBoolean lastActive;
+    private List<ISpout> spouts;
+    private List<SpoutOutputCollector> outputCollectors;
+    private final MutableLong emittedCount;
+    private final MutableLong emptyEmitStreak;
+    private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+    private final boolean hasAckers;
+    private RotatingMap<Long, TupleInfo> pending;
+    private final boolean backPressureEnabled;
+
+    public SpoutExecutor(final Map workerData, final List<Long> executorId, Map<String, String> credentials) {
+        super(workerData, executorId, credentials);
+        this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+        this.spoutWaitStrategy.prepare(stormConf);
+
+        this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+        this.lastActive = new AtomicBoolean(false);
+        this.hasAckers = StormCommon.hasAckers(stormConf);
+        this.emittedCount = new MutableLong(0);
+        this.emptyEmitStreak = new MutableLong(0);
+        this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+    }
+
+    @Override
+    public void init(final Map<Integer, Task> idToTask) {
+        LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+        this.idToTask = idToTask;
+        this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
+        this.spouts = new ArrayList<>();
+        for (Task task : idToTask.values()) {
+            this.spouts.add((ISpout) task.getTaskObject());
+        }
+        this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() {
+            @Override
+            public void expire(Long key, TupleInfo tupleInfo) {
+                Long timeDelta = null;
+                if (tupleInfo.getTimestamp() != 0) {
+                    timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+                }
+                failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId()), timeDelta, tupleInfo, "TIMEOUT");
+            }
+        });
+
+        this.spoutThrottlingMetrics.registerAll(stormConf, idToTask.values().iterator().next().getUserContext());
+        this.outputCollectors = new ArrayList<>();
+        for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
+            Task taskData = entry.getValue();
+            ISpout spoutObject = (ISpout) taskData.getTaskObject();
+            SpoutOutputCollectorImpl spoutOutputCollector = new SpoutOutputCollectorImpl(
+                    spoutObject, this, taskData, entry.getKey(), emittedCount,
+                    hasAckers, rand, hasEventLoggers, isDebug, pending);
+            SpoutOutputCollector outputCollector = new SpoutOutputCollector(spoutOutputCollector);
+            this.outputCollectors.add(outputCollector);
+
+            taskData.getBuiltInMetrics().registerAll(stormConf, taskData.getUserContext());
+            Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+            BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, taskData.getUserContext());
+
+            if (spoutObject instanceof ICredentialsListener) {
+                ((ICredentialsListener) spoutObject).setCredentials(credentials);
+            }
+            spoutObject.open(stormConf, taskData.getUserContext(), outputCollector);
+        }
+        openOrPrepareWasCalled.set(true);
+        LOG.info("Opened spout {}:{}", componentId, idToTask.keySet());
+        setupMetrics();
+    }
+
+    @Override
+    public Callable<Object> call() throws Exception {
+        while (!stormActive.get()) {
+            Utils.sleep(100);
+        }
+
+        return new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                receiveQueue.consumeBatch(SpoutExecutor.this);
+
+                long currCount = emittedCount.get();
+                boolean throttleOn = backPressureEnabled && SpoutExecutor.this.throttleOn.get();
+                boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
+                boolean isActive = stormActive.get();
+                if (isActive) {
+                    if (!lastActive.get()) {
+                        lastActive.set(true);
+                        LOG.info("Activating spout {}:{}", componentId, idToTask.keySet());
+                        for (ISpout spout : spouts) {
+                            spout.activate();
+                        }
+                    }
+                    if (!transferQueue.isFull() && !throttleOn && !reachedMaxSpoutPending) {
+                        for (ISpout spout : spouts) {
+                            spout.nextTuple();
+                        }
+                    }
+                } else {
+                    if (lastActive.get()) {
+                        lastActive.set(false);
+                        LOG.info("Deactivating spout {}:{}", componentId, idToTask.keySet());
+                        for (ISpout spout : spouts) {
+                            spout.deactivate();
+                        }
+                    }
+                    Time.sleep(100);
+                    spoutThrottlingMetrics.skippedInactive(stats);
+                }
+                if (currCount == emittedCount.get() && isActive) {
+                    emptyEmitStreak.increment();
+                    spoutWaitStrategy.emptyEmit(emptyEmitStreak.get());
+                    if (throttleOn) {
+                        spoutThrottlingMetrics.skippedThrottle(stats);
+                    } else if (reachedMaxSpoutPending) {
+                        spoutThrottlingMetrics.skippedMaxSpout(stats);
+                    }
+                } else {
+                    emptyEmitStreak.set(0);
+                }
+                return 0L;
+            }
+        };
+    }
+
+    @Override
+    public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
+        String streamId = tuple.getSourceStreamId();
+        if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+            pending.rotate();
+        } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
+            metricsTick(idToTask.get(taskId), tuple);
+        } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
+            Object spoutObj = idToTask.get(taskId).getTaskObject();
+            if (spoutObj instanceof ICredentialsListener) {
+                ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
+            }
+        } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
+            Long id = (Long) tuple.getValue(0);
+            TupleInfo pendingForId = pending.get(id);
+            if (pendingForId != null) {
+                pending.put(id, pendingForId);
+            }
+        } else {
+            Long id = (Long) tuple.getValue(0);
+            Long timeDeltaMs = (Long) tuple.getValue(1);
+            TupleInfo tupleInfo = (TupleInfo) pending.remove(id);
+            if (tupleInfo.getMessageId() != null) {
+                if (taskId != tupleInfo.getTaskId()) {
+                    throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
+                }
+                long startTimeMs = tupleInfo.getTimestamp();
+                Long timeDelta = null;
+                if (startTimeMs != 0) {
+                    timeDelta = timeDeltaMs;
+                }
+                if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
+                    ackSpoutMsg(this, idToTask.get(taskId), timeDelta, tupleInfo);
+                } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
+                    failSpoutMsg(this, idToTask.get(taskId), timeDelta, tupleInfo, "FAIL-STREAM");
+                }
+            }
+        }
+    }
+
+    public void ackSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
+        try {
+            ISpout spout = (ISpout) taskData.getTaskObject();
+            int taskId = taskData.getTaskId();
+            if (executor.getIsDebug()) {
+                LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId());
+            }
+            spout.ack(tupleInfo.getMessageId());
+            new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
+            if (timeDelta != null) {
+                ((SpoutExecutorStats) executor.getStats()).spoutAckedTuple(tupleInfo.getStream(), timeDelta);
+            }
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    public void failSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
+        try {
+            ISpout spout = (ISpout) taskData.getTaskObject();
+            int taskId = taskData.getTaskId();
+            if (executor.getIsDebug()) {
+                LOG.info("SPOUT Failing {} : {} REASON: {}", tupleInfo.getId(), tupleInfo, reason);
+            }
+            spout.fail(tupleInfo.getMessageId());
+            new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
+            if (timeDelta != null) {
+                ((SpoutExecutorStats) executor.getStats()).spoutFailedTuple(tupleInfo.getStream(), timeDelta);
+            }
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
new file mode 100644
index 0000000..9fbb994
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
+
+    private final SpoutExecutor executor;
+    private final Task taskData;
+    private final int taskId;
+    private final MutableLong emittedCount;
+    private final boolean hasAckers;
+    private final Random random;
+    private final Boolean isEventLoggers;
+    private final Boolean isDebug;
+    private final RotatingMap<Long, TupleInfo> pending;
+
+    @SuppressWarnings("unused")
+    public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData, int taskId,
+                                    MutableLong emittedCount, boolean hasAckers, Random random,
+                                    Boolean isEventLoggers, Boolean isDebug, RotatingMap<Long, TupleInfo> pending) {
+        this.executor = executor;
+        this.taskData = taskData;
+        this.taskId = taskId;
+        this.emittedCount = emittedCount;
+        this.hasAckers = hasAckers;
+        this.random = random;
+        this.isEventLoggers = isEventLoggers;
+        this.isDebug = isDebug;
+        this.pending = pending;
+    }
+
+    @Override
+    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+        return sendSpoutMsg(streamId, tuple, messageId, null);
+    }
+
+    @Override
+    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
+        sendSpoutMsg(streamId, tuple, messageId, taskId);
+    }
+
+    @Override
+    public long getPendingCount() {
+        return pending.size();
+    }
+
+    @Override
+    public void reportError(Throwable error) {
+        executor.getReportError().report(error);
+    }
+
+    private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) {
+        emittedCount.increment();
+
+        List<Integer> outTasks;
+        if (outTaskId != null) {
+            outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
+        } else {
+            outTasks = taskData.getOutgoingTasks(stream, values);
+        }
+
+        List<Long> ackSeq = new ArrayList<>();
+        boolean needAck = (messageId != null) && hasAckers;
+
+        long rootId = MessageId.generateId(random);
+        for (Integer t : outTasks) {
+            MessageId msgId;
+            if (needAck) {
+                long as = MessageId.generateId(random);
+                msgId = MessageId.makeRootId(rootId, as);
+                ackSeq.add(as);
+            } else {
+                msgId = MessageId.makeUnanchored();
+            }
+
+            TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId);
+            executor.getExecutorTransfer().transfer(t, tuple);
+        }
+        if (isEventLoggers) {
+            executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), messageId, random);
+        }
+
+        boolean sample = false;
+        try {
+            sample = executor.getSampler().call();
+        } catch (Exception ignored) {
+        }
+        if (needAck) {
+            TupleInfo info = new TupleInfo();
+            info.setTaskId(this.taskId);
+            info.setStream(stream);
+            info.setMessageId(messageId);
+            if (isDebug) {
+                info.setValues(values);
+            }
+            if (sample) {
+                info.setTimestamp(System.currentTimeMillis());
+            }
+
+            pending.put(rootId, info);
+            List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
+            executor.sendUnanchored(taskData, Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer());
+        } else if (messageId != null) {
+            TupleInfo info = new TupleInfo();
+            info.setStream(stream);
+            info.setValues(values);
+            info.setMessageId(messageId);
+            info.setTimestamp(0);
+            Long timeDelta = sample ? 0L : null;
+            info.setId("0:");
+            executor.ackSpoutMsg(executor, taskData, timeDelta, info);
+        }
+
+        return outTasks;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index bfd0d36..6102549 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -84,6 +84,7 @@ public class BoltExecutorStats extends CommonStats {
 
     }
 
+    @Override
     public ExecutorStats renderStats() {
         ExecutorStats ret = new ExecutorStats();
         // common stats

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
index f7826f9..9b9a4f3 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
@@ -19,12 +19,13 @@ package org.apache.storm.stats;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 
 @SuppressWarnings("unchecked")
-public class CommonStats {
+public abstract class CommonStats {
     public static final int NUM_STAT_BUCKETS = 20;
 
     public static final String RATE = "rate";
@@ -109,4 +110,6 @@ public class CommonStats {
         return null;
     }
 
+    public abstract ExecutorStats renderStats();
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
index 28c885a..c7ba844 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
@@ -58,6 +58,7 @@ public class SpoutExecutorStats extends CommonStats {
         this.getFailed().incBy(stream, this.rate);
     }
 
+    @Override
     public ExecutorStats renderStats() {
         ExecutorStats ret = new ExecutorStats();
         // common fields

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
index 91cbee9..5092b1c 100644
--- a/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.task;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.StormTopology;
@@ -55,16 +56,25 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     private List<ITaskHook> _hooks = new ArrayList<>();
     private Map<String, Object> _executorData;
     private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics;
-    private clojure.lang.Atom _openOrPrepareWasCalled;
-
-
-    public TopologyContext(StormTopology topology, Map stormConf,
-            Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
-            Map<String, Map<String, Fields>> componentToStreamToFields,
-            String stormId, String codeDir, String pidDir, Integer taskId,
-            Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
-            Map<String, Object> userResources, Map<String, Object> executorData, Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics,
-            clojure.lang.Atom openOrPrepareWasCalled) {
+    private AtomicBoolean _openOrPrepareWasCalled;
+
+
+    public TopologyContext(StormTopology topology,
+                           Map stormConf,
+                           Map<Integer, String> taskToComponent,
+                           Map<String, List<Integer>> componentToSortedTasks,
+                           Map<String, Map<String, Fields>> componentToStreamToFields,
+                           String stormId,
+                           String codeDir,
+                           String pidDir,
+                           Integer taskId,
+                           Integer workerPort,
+                           List<Integer> workerTasks,
+                           Map<String, Object> defaultResources,
+                           Map<String, Object> userResources,
+                           Map<String, Object> executorData,
+                           Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics,
+                           AtomicBoolean openOrPrepareWasCalled) {
         super(topology, stormConf, taskToComponent, componentToSortedTasks,
                 componentToStreamToFields, stormId, codeDir, pidDir,
                 workerPort, workerTasks, defaultResources, userResources);
@@ -312,7 +322,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
      * @return The IMetric argument unchanged.
      */
     public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
-        if((Boolean) _openOrPrepareWasCalled.deref()) {
+        if(_openOrPrepareWasCalled.get()) {
             throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
                                        "IBolt::prepare() or ISpout::open() method.");
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 026ad86..e17705e 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.utils;
 
+import clojure.lang.Keyword;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -1775,6 +1776,15 @@ public class Utils {
         Runtime.getRuntime().exit(val);
     }
 
+    public static Runnable mkSuicideFn() {
+        return new Runnable() {
+            @Override
+            public void run() {
+                Utils.exitProcess(1, "Worker died");
+            }
+        };
+    }
+
     /**
      * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
      *
@@ -2381,4 +2391,23 @@ public class Utils {
         return rtn;
     }
 
+    /**
+     * converts a clojure PersistentMap to java HashMap
+     */
+    public static Map<String, Object> convertClojureMapToJavaMap(Map map) {
+        Map<String, Object> ret = new HashMap<>(map.size());
+        for (Object obj : map.entrySet()) {
+            Map.Entry entry = (Map.Entry) obj;
+            Keyword keyword = (Keyword) entry.getKey();
+            String key = keyword.getName();
+            if (key.startsWith(":")) {
+                key = key.substring(1, key.length());
+            }
+            Object value = entry.getValue();
+            ret.put(key, value);
+        }
+
+        return ret;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
index f3b5a66..b7966e0 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
@@ -24,9 +24,9 @@ import org.slf4j.LoggerFactory;
 public class WorkerBackpressureThread extends Thread {
 
     private static final Logger LOG = LoggerFactory.getLogger(WorkerBackpressureThread.class);
-    private Object trigger;
-    private Object workerData;
-    private WorkerBackpressureCallback callback;
+    private final Object trigger;
+    private final Object workerData;
+    private final WorkerBackpressureCallback callback;
     private volatile boolean running = true;
 
     public WorkerBackpressureThread(Object trigger, Object workerData, WorkerBackpressureCallback callback) {
@@ -38,7 +38,7 @@ public class WorkerBackpressureThread extends Thread {
         this.setUncaughtExceptionHandler(new BackpressureUncaughtExceptionHandler());
     }
 
-    static public void notifyBackpressureChecker(Object trigger) {
+    static public void notifyBackpressureChecker(final Object trigger) {
         try {
             synchronized (trigger) {
                 trigger.notifyAll();
@@ -57,7 +57,7 @@ public class WorkerBackpressureThread extends Thread {
     public void run() {
         while (running) {
             try {
-                synchronized(trigger) {
+                synchronized (trigger) {
                     trigger.wait(100);
                 }
                 callback.onEvent(workerData); // check all executors and update zk backpressure throttle for the worker if needed
@@ -70,6 +70,7 @@ public class WorkerBackpressureThread extends Thread {
 
 class BackpressureUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
     private static final Logger LOG = LoggerFactory.getLogger(BackpressureUncaughtExceptionHandler.class);
+
     @Override
     public void uncaughtException(Thread t, Throwable e) {
         // note that exception that happens during connecting to ZK has been ignored in the callback implementation

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index b4827e9..787fbcd 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -403,17 +403,18 @@
 (def bolt-prepared? (atom false))
 (defbolt prepare-tracked-bolt [] {:prepare true}
   [conf context collector]  
-  (reset! bolt-prepared? true)
   (bolt
    (execute [tuple]
+            (reset! bolt-prepared? true)
             (ack! collector tuple))))
 
 (def spout-opened? (atom false))
 (defspout open-tracked-spout ["val"]
   [conf context collector]
-  (reset! spout-opened? true)
   (spout
-   (nextTuple [])))
+    (nextTuple []
+      (reset! spout-opened? true)
+      )))
 
 (deftest test-submit-inactive-topology
   (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
@@ -440,8 +441,8 @@
       (advance-cluster-time cluster 9)
       (is (not @bolt-prepared?))
       (is (not @spout-opened?))        
-      (.activate (:nimbus cluster) "test")              
-      
+      (.activate (:nimbus cluster) "test")
+
       (advance-cluster-time cluster 12)
       (assert-acked tracker 1)
       (is @bolt-prepared?)

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index 487d80f..dc56f81 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -20,7 +20,7 @@
   (:import [org.apache.storm.grouping LoadMapping])
   (:use [org.apache.storm testing log config])
   (:use [org.apache.storm.internal clojure])
-  (:use [org.apache.storm.daemon common executor])
+  (:use [org.apache.storm.daemon common])
   (:import [org.apache.storm Thrift])
   (:import [org.apache.storm.utils Utils]
            (org.apache.storm.daemon GrouperFactory)))


[5/5] storm git commit: add STORM-1277 to CHANGELOG

Posted by ka...@apache.org.
add STORM-1277 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/28563ece
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/28563ece
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/28563ece

Branch: refs/heads/master
Commit: 28563ece16274bcd61827f38e960ce2d27a57dea
Parents: c7d450f
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Aug 8 17:15:20 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Aug 8 17:15:20 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/28563ece/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bdd4e71..4c8b35e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-1277: port backtype.storm.daemon.executor to java
  * STORM-2020: Stop using sun internal classes.
  * STORM-2021: Fix license.
  * STORM-2022: fix FieldsTest


[3/5] storm git commit: STORM-1277 port backtype.storm.daemon.executor to java

Posted by ka...@apache.org.
STORM-1277 port backtype.storm.daemon.executor to java

* code rebased by Jungtaek Lim <ka...@gmail.com>
* Closes #1445


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a5e19d9b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a5e19d9b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a5e19d9b

Branch: refs/heads/master
Commit: a5e19d9b8064f83adf00190ed74518e2156faae2
Parents: 44068c4
Author: \u536b\u4e50 <we...@taobao.com>
Authored: Fri Apr 29 17:58:50 2016 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Aug 8 17:13:48 2016 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/common.clj  |  18 +-
 .../clj/org/apache/storm/daemon/executor.clj    | 839 -------------------
 .../org/apache/storm/daemon/local_executor.clj  |  42 +
 .../src/clj/org/apache/storm/daemon/worker.clj  |  70 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  16 +-
 .../src/jvm/org/apache/storm/Constants.java     |  24 +-
 .../org/apache/storm/daemon/StormCommon.java    | 175 ++--
 .../src/jvm/org/apache/storm/daemon/Task.java   |  76 +-
 .../jvm/org/apache/storm/executor/Executor.java | 576 +++++++++++++
 .../apache/storm/executor/ExecutorShutdown.java | 111 +++
 .../apache/storm/executor/ExecutorTransfer.java |  87 ++
 .../apache/storm/executor/IRunningExecutor.java |  31 +
 .../org/apache/storm/executor/TupleInfo.java    |  90 ++
 .../storm/executor/bolt/BoltExecutor.java       | 138 +++
 .../executor/bolt/BoltOutputCollectorImpl.java  | 171 ++++
 .../storm/executor/error/IReportError.java      |  22 +
 .../storm/executor/error/ReportError.java       |  76 ++
 .../storm/executor/error/ReportErrorAndDie.java |  47 ++
 .../storm/executor/spout/SpoutExecutor.java     | 255 ++++++
 .../spout/SpoutOutputCollectorImpl.java         | 147 ++++
 .../apache/storm/stats/BoltExecutorStats.java   |   1 +
 .../jvm/org/apache/storm/stats/CommonStats.java |   5 +-
 .../apache/storm/stats/SpoutExecutorStats.java  |   1 +
 .../org/apache/storm/task/TopologyContext.java  |  32 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  29 +
 .../storm/utils/WorkerBackpressureThread.java   |  11 +-
 .../org/apache/storm/integration_test.clj       |  11 +-
 .../test/clj/org/apache/storm/grouping_test.clj |   2 +-
 28 files changed, 2077 insertions(+), 1026 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index cc5436c..01a49b3 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -52,20 +52,4 @@
       (catch Throwable t#
         (log-error t# "Error on initialization of server " ~(str name))
         (Utils/exitProcess 13 "Error on initialization")
-        )))))
-
-(defn worker-context [worker]
-  (WorkerTopologyContext. (:system-topology worker)
-                          (:storm-conf worker)
-                          (:task->component worker)
-                          (:component->sorted-tasks worker)
-                          (:component->stream->fields worker)
-                          (:storm-id worker)
-                          (ConfigUtils/supervisorStormResourcesPath
-                            (ConfigUtils/supervisorStormDistRoot (:conf worker) (:storm-id worker)))
-                          (ConfigUtils/workerPidsRoot (:conf worker) (:worker-id worker))
-                          (:port worker)
-                          (:task-ids worker)
-                          (:default-shared-resources worker)
-                          (:user-shared-resources worker)
-                          ))
+        )))))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
deleted file mode 100644
index 1fdfbf5..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ /dev/null
@@ -1,839 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.executor
-  (:use [org.apache.storm.daemon common])
-  (:use [clojure.walk])
-  (:import [org.apache.storm.generated Grouping Grouping$_Fields]
-           [java.io Serializable]
-           [org.apache.storm.stats BoltExecutorStats SpoutExecutorStats]
-           [org.apache.storm.daemon.metrics BuiltinMetricsUtil SpoutThrottlingMetrics])
-  (:use [org.apache.storm util config log])
-  (:import [java.util List Random HashMap ArrayList LinkedList Map])
-  (:import [org.apache.storm ICredentialsListener Thrift])
-  (:import [org.apache.storm.hooks ITaskHook])
-  (:import [org.apache.storm.tuple AddressedTuple Tuple Fields TupleImpl MessageId])
-  (:import [org.apache.storm.spout ISpoutWaitStrategy ISpout SpoutOutputCollector ISpoutOutputCollector])
-  (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
-            EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
-  (:import [org.apache.storm.grouping CustomStreamGrouping])
-  (:import [org.apache.storm.task WorkerTopologyContext IBolt OutputCollector IOutputCollector])
-  (:import [org.apache.storm.generated GlobalStreamId])
-  (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback])
-  (:import [com.lmax.disruptor InsufficientCapacityException])
-  (:import [org.apache.storm.serialization KryoTupleSerializer])
-  (:import [org.apache.storm.daemon Shutdownable StormCommon Acker Task GrouperFactory])
-  (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
-  (:import [org.apache.storm Config Constants])
-  (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils])
-  (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
-  (:import [java.lang Thread Thread$UncaughtExceptionHandler]
-           [java.util.concurrent ConcurrentLinkedQueue]
-           [org.json.simple JSONValue]
-           [com.lmax.disruptor.dsl ProducerType]
-           [org.apache.storm StormTimer])
-  (:require [clojure.set :as set]))
-
-
-;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
-(defn- outbound-groupings
-  [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping topo-conf]
-  (->> component->grouping
-       (filter-key #(-> worker-context
-                        (.getComponentTasks %)
-                        count
-                        pos?))
-       (map (fn [[component tgrouping]]
-               [component
-                (GrouperFactory/mkGrouper worker-context
-                                this-component-id
-                                stream-id
-                                out-fields
-                                tgrouping
-                                (.getComponentTasks worker-context component)
-                                topo-conf)]))
-       (into {})
-       (HashMap.)))
-
-(defn outbound-components
-  "Returns map of stream id to component id to grouper"
-  [^WorkerTopologyContext worker-context component-id topo-conf]
-  (->> (.getTargets worker-context component-id)
-        clojurify-structure
-        (map (fn [[stream-id component->grouping]]
-               [stream-id
-                (outbound-groupings
-                  worker-context
-                  component-id
-                  stream-id
-                  (.getComponentOutputFields worker-context component-id stream-id)
-                  component->grouping
-                  topo-conf)]))
-         (into (apply merge (map #(hash-map % nil) (.keySet (.get_streams (.getComponentCommon worker-context component-id))))))
-         (HashMap.)))
-
-(defn executor-type [^WorkerTopologyContext context component-id]
-  (let [topology (.getRawTopology context)
-        spouts (.get_spouts topology)
-        bolts (.get_bolts topology)]
-    (cond (contains? spouts component-id) "spout"
-          (contains? bolts component-id) "bolt"
-          :else (throw (RuntimeException. (str "Could not find " component-id " in topology " topology))))))
-
-(defn executor-selector [executor-data & _] (keyword (:type executor-data)))
-
-(defmulti mk-threads executor-selector)
-(defmulti mk-executor-stats executor-selector)
-(defmulti close-component executor-selector)
-
-(defn- normalized-component-conf [storm-conf general-context component-id]
-  (let [to-remove (disj (set ALL-CONFIGS)
-                        TOPOLOGY-DEBUG
-                        TOPOLOGY-MAX-SPOUT-PENDING
-                        TOPOLOGY-MAX-TASK-PARALLELISM
-                        TOPOLOGY-TRANSACTIONAL-ID
-                        TOPOLOGY-TICK-TUPLE-FREQ-SECS
-                        TOPOLOGY-SLEEP-SPOUT-WAIT-STRATEGY-TIME-MS
-                        TOPOLOGY-SPOUT-WAIT-STRATEGY
-                        TOPOLOGY-BOLTS-WINDOW-LENGTH-COUNT
-                        TOPOLOGY-BOLTS-WINDOW-LENGTH-DURATION-MS
-                        TOPOLOGY-BOLTS-SLIDING-INTERVAL-COUNT
-                        TOPOLOGY-BOLTS-SLIDING-INTERVAL-DURATION-MS
-                        TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-FIELD-NAME
-                        TOPOLOGY-BOLTS-LATE-TUPLE-STREAM
-                        TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-MAX-LAG-MS
-                        TOPOLOGY-BOLTS-MESSAGE-ID-FIELD-NAME
-                        TOPOLOGY-STATE-PROVIDER
-                        TOPOLOGY-STATE-PROVIDER-CONFIG
-                        )
-        spec-conf (-> general-context
-                      (.getComponentCommon component-id)
-                      .get_json_conf
-                      (#(if % (JSONValue/parse %)))
-                      clojurify-structure)]
-    (merge storm-conf (apply dissoc spec-conf to-remove))
-    ))
-
-(defprotocol RunningExecutor
-  (render-stats [this])
-  (get-executor-id [this])
-  (credentials-changed [this creds])
-  (get-backpressure-flag [this]))
-
-(defn throttled-report-error-fn [executor]
-  (let [storm-conf (:storm-conf executor)
-        error-interval-secs (storm-conf TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS)
-        max-per-interval (storm-conf TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL)
-        interval-start-time (atom (Time/currentTimeSecs))
-        interval-errors (atom 0)
-        ]
-    (fn [error]
-      (log-error error)
-      (when (> (Time/deltaSecs @interval-start-time)
-               error-interval-secs)
-        (reset! interval-errors 0)
-        (reset! interval-start-time (Time/currentTimeSecs)))
-      (swap! interval-errors inc)
-
-      (when (<= @interval-errors max-per-interval)
-        (.reportError (:storm-cluster-state executor) (:storm-id executor) (:component-id executor)
-                              (Utils/hostname storm-conf)
-          (long (.getThisWorkerPort (:worker-context executor))) error)
-        ))))
-
-;; in its own function so that it can be mocked out by tracked topologies
-(defn mk-executor-transfer-fn [batch-transfer->worker storm-conf]
-  (fn this
-    [task tuple]
-    (let [val (AddressedTuple. task tuple)]
-      (when (= true (storm-conf TOPOLOGY-DEBUG))
-        (log-message "TRANSFERING tuple " val))
-      (.publish ^DisruptorQueue batch-transfer->worker val))))
-
-(defn mk-executor-data [worker executor-id]
-  (let [worker-context (worker-context worker)
-        task-ids (clojurify-structure (StormCommon/executorIdToTasks executor-id))
-        component-id (.getComponentId worker-context (first task-ids))
-        storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)
-        executor-type (executor-type worker-context component-id)
-        batch-transfer->worker (DisruptorQueue.
-                                  (str "executor"  executor-id "-send-queue")
-                                  ProducerType/SINGLE
-                                  (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
-                                  (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
-                                  (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
-                                  (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
-        ]
-    (recursive-map
-     :worker worker
-     :worker-context worker-context
-     :executor-id executor-id
-     :task-ids task-ids
-     :component-id component-id
-     :open-or-prepare-was-called? (atom false)
-     :storm-conf storm-conf
-     :receive-queue ((:executor-receive-queue-map worker) executor-id)
-     :storm-id (:storm-id worker)
-     :conf (:conf worker)
-     :shared-executor-data (HashMap.)
-     :storm-active-atom (:storm-active-atom worker)
-     :storm-component->debug-atom (:storm-component->debug-atom worker)
-     :batch-transfer-queue batch-transfer->worker
-     :transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf)
-     :suicide-fn (:suicide-fn worker)
-     :storm-cluster-state (ClusterUtils/mkStormClusterState (:state-store worker) (Utils/getWorkerACL storm-conf)
-                            (ClusterStateContext. DaemonType/WORKER))
-     :type executor-type
-     ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
-     :stats (mk-executor-stats <> (ConfigUtils/samplingRate storm-conf))
-     :interval->task->metric-registry (HashMap.)
-     :task->component (:task->component worker)
-     :stream->component->grouper (outbound-components worker-context component-id storm-conf)
-     :report-error (throttled-report-error-fn <>)
-     :report-error-and-die (reify
-                             Thread$UncaughtExceptionHandler
-                             (uncaughtException [this _ error]
-                               (try
-                                 ((:report-error <>) error)
-                                 (catch Exception e
-                                   (log-error e "Error while reporting error to cluster, proceeding with shutdown")))
-                               (if (or
-                                    (Utils/exceptionCauseIsInstanceOf InterruptedException error)
-                                    (Utils/exceptionCauseIsInstanceOf java.io.InterruptedIOException error))
-                                 (log-message "Got interrupted excpetion shutting thread down...")
-                                 ((:suicide-fn <>)))))
-     :sampler (ConfigUtils/mkStatsSampler storm-conf)
-     :backpressure (atom false)
-     :spout-throttling-metrics (if (= (keyword executor-type) :spout)
-                                 (SpoutThrottlingMetrics.)
-                                nil)
-     ;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function?
-     )))
-
-(defn- mk-disruptor-backpressure-handler [executor-data]
-  "make a handler for the executor's receive disruptor queue to
-  check highWaterMark and lowWaterMark for backpressure"
-  (reify DisruptorBackpressureCallback
-    (highWaterMark [this]
-      "When receive queue is above highWaterMark"
-      (if (not @(:backpressure executor-data))
-        (do (reset! (:backpressure executor-data) true)
-            (log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true")
-            (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))
-    (lowWaterMark [this]
-      "When receive queue is below lowWaterMark"
-      (if @(:backpressure executor-data)
-        (do (reset! (:backpressure executor-data) false)
-            (log-debug "executor " (:executor-id executor-data) " is not-congested, set backpressure flag false")
-            (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))))
-
-(defn start-batch-transfer->worker-handler! [worker executor-data]
-  (let [worker-transfer-fn (:transfer-fn worker)
-        cached-emit (MutableObject. (ArrayList.))
-        storm-conf (:storm-conf executor-data)
-        serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data))
-        ^DisruptorQueue batch-transfer-queue (:batch-transfer-queue executor-data)
-        handler (reify com.lmax.disruptor.EventHandler
-                  (onEvent [this o seq-id batch-end?]
-                    (let [^ArrayList alist (.getObject cached-emit)]
-                      (.add alist o)
-                      (when batch-end?
-                        (worker-transfer-fn serializer alist)
-                        (.setObject cached-emit (ArrayList.))))))
-        ]
-    (Utils/asyncLoop
-      (fn [] (.consumeBatchWhenAvailable batch-transfer-queue handler) 0)
-      (.getName batch-transfer-queue)
-      (:uncaught-exception-handler (:report-error-and-die executor-data)))))
-
-;; TODO: this is all expensive... should be precomputed
-(defn send-unanchored
-  [^Task task-data stream values transfer-fn]
-  (let [out-tuple (.getTuple task-data stream values)]
-    (fast-list-iter [t (.getOutgoingTasks task-data stream values)]
-                    (transfer-fn t out-tuple))))
-
-(defn setup-metrics! [executor-data]
-  (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
-        distinct-time-bucket-intervals (keys interval->task->metric-registry)]
-    (doseq [interval distinct-time-bucket-intervals]
-      (.scheduleRecurring
-        (:user-timer (:worker executor-data))
-        interval
-        interval
-        (fn []
-          (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
-            (.publish ^DisruptorQueue receive-queue val)))))))
-
-(defn metrics-tick
-  [executor-data task-data ^TupleImpl tuple]
-   (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
-         interval (.getInteger tuple 0)
-         transfer-fn (:transfer-fn executor-data)
-         task-id (.getTaskId task-data)
-         name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
-         task-info (IMetricsConsumer$TaskInfo.
-                     (Utils/hostname (:storm-conf executor-data))
-                     (.getThisWorkerPort worker-context)
-                     (:component-id executor-data)
-                     task-id
-                     (long (Time/currentTimeSecs))
-                     interval)
-         data-points (->> name->imetric
-                          (map (fn [[name imetric]]
-                                 (let [value (.getValueAndReset ^IMetric imetric)]
-                                   (if value
-                                     (IMetricsConsumer$DataPoint. name value)))))
-                          (filter identity)
-                          (into []))]
-     (when (seq data-points)
-       (send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points] transfer-fn))))
-
-(defn setup-ticks! [worker executor-data]
-  (let [storm-conf (:storm-conf executor-data)
-        tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
-        receive-queue (:receive-queue executor-data)
-        context (:worker-context executor-data)]
-    (when tick-time-secs
-      (if (or (Utils/isSystemId (:component-id executor-data))
-              (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
-                   (= :spout (keyword (:type executor-data)))))
-        (log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
-        (.scheduleRecurring
-          (:user-timer worker)
-          tick-time-secs
-          tick-time-secs
-          (fn []
-            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
-              (.publish ^DisruptorQueue receive-queue val))))))))
-
-(defn mk-executor [worker executor-id initial-credentials]
-  (let [executor-data (mk-executor-data worker executor-id)
-        transfer-fn (:transfer-fn executor-data)
-        _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
-        task-datas (->> executor-data
-                        :task-ids
-                        (map (fn [t] (let [task (Task. (stringify-keys executor-data)  t)
-                                           stream StormCommon/SYSTEM_STREAM_ID
-                                           values ["startup"]]
-                                       ;; when this is called, the threads for the executor haven't been started yet,
-                                       ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
-                                       (send-unanchored task stream values transfer-fn)
-                                       [t task]
-                                       )))
-                        (into {})
-                        (HashMap.))
-        _ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id))
-        report-error-and-die (:report-error-and-die executor-data)
-        component-id (:component-id executor-data)
-
-
-        disruptor-handler (mk-disruptor-backpressure-handler executor-data)
-        _ (.registerBackpressureCallback (:receive-queue executor-data) disruptor-handler)
-        _ (-> (.setHighWaterMark (:receive-queue executor-data) ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
-              (.setLowWaterMark ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
-              (.setEnableBackpressure ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)))
-
-        ;; starting the batch-transfer->worker ensures that anything publishing to that queue 
-        ;; doesn't block (because it's a single threaded queue and the caching/consumer started
-        ;; trick isn't thread-safe)
-        system-threads [(start-batch-transfer->worker-handler! worker executor-data)]
-        handlers (try
-                   (mk-threads executor-data task-datas initial-credentials)
-                   (catch Throwable t (.uncaughtException report-error-and-die nil t)))
-        threads (concat handlers system-threads)]    
-    (setup-ticks! worker executor-data)
-
-    (log-message "Finished loading executor " component-id ":" (pr-str executor-id))
-    ;; TODO: add method here to get rendered stats... have worker call that when heartbeating
-    (reify
-      RunningExecutor
-      (render-stats [this]
-        (.renderStats (:stats executor-data)))
-      (get-executor-id [this]
-        executor-id)
-      (credentials-changed [this creds]
-        (let [receive-queue (:receive-queue executor-data)
-              context (:worker-context executor-data)
-              val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID))]]
-          (.publish ^DisruptorQueue receive-queue val)))
-      (get-backpressure-flag [this]
-        @(:backpressure executor-data))
-      Shutdownable
-      (shutdown
-        [this]
-        (log-message "Shutting down executor " component-id ":" (pr-str executor-id))
-        (.haltWithInterrupt ^DisruptorQueue (:receive-queue executor-data))
-        (.haltWithInterrupt ^DisruptorQueue (:batch-transfer-queue executor-data))
-        (doseq [t threads]
-          (.interrupt t)
-          (.join t))
-
-        (.cleanupStats (:stats executor-data))
-        (doseq [user-context (map #(.getUserContext %) (vals task-datas))]
-          (doseq [hook (.getHooks user-context)]
-            (.cleanup hook)))
-        (.disconnect (:storm-cluster-state executor-data))
-        (when @(:open-or-prepare-was-called? executor-data)
-          (doseq [obj (map #(.getTaskObject %) (vals task-datas))]
-            (close-component executor-data obj)))
-        (log-message "Shut down executor " component-id ":" (pr-str executor-id)))
-        )))
-
-(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id]
-  (let [^ISpout spout (.getTaskObject task-data)
-        storm-conf (:storm-conf executor-data)
-        task-id (.getTaskId task-data)]
-    ;;TODO: need to throttle these when there's lots of failures
-    (when (= true (storm-conf TOPOLOGY-DEBUG))
-      (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
-    (.fail spout msg-id)
-    (.applyOn (SpoutFailInfo. msg-id task-id time-delta) (.getUserContext task-data))
-    (when time-delta
-      (.spoutFailedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
-
-(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
-  (let [storm-conf (:storm-conf executor-data)
-        ^ISpout spout (.getTaskObject task-data)
-        task-id (.getTaskId task-data)]
-    (when (= true (storm-conf TOPOLOGY-DEBUG))
-      (log-message "SPOUT Acking message " id " " msg-id))
-    (.ack spout msg-id)
-    (.applyOn (SpoutAckInfo. msg-id task-id time-delta) (.getUserContext task-data))
-    (when time-delta
-      (.spoutAckedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
-
-(defn mk-task-receiver [executor-data tuple-action-fn]
-  (let [task-ids (:task-ids executor-data)
-        debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
-        ]
-    (reify com.lmax.disruptor.EventHandler
-      (onEvent [this tuple-batch sequence-id end-of-batch?]
-        (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
-          (let [^TupleImpl tuple (.getTuple addressed-tuple)
-                task-id (.getDest addressed-tuple)]
-            (when debug? (log-message "Processing received message FOR " task-id " TUPLE: " tuple))
-            (if (not= task-id AddressedTuple/BROADCAST_DEST)
-              (tuple-action-fn task-id tuple)
-              ;; null task ids are broadcast tuples
-              (fast-list-iter [task-id task-ids]
-                (tuple-action-fn task-id tuple)
-                ))
-            ))))))
-
-(defn executor-max-spout-pending [storm-conf num-tasks]
-  (let [p (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)]
-    (if p (* p num-tasks))))
-
-(defn init-spout-wait-strategy [storm-conf]
-  (let [ret (-> storm-conf (get TOPOLOGY-SPOUT-WAIT-STRATEGY) Utils/newInstance)]
-    (.prepare ret storm-conf)
-    ret
-    ))
-
-;; Send sampled data to the eventlogger if the global or component level
-;; debug flag is set (via nimbus api).
-(defn send-to-eventlogger [executor-data task-data values component-id message-id random]
-    (let [c->d @(:storm-component->debug-atom executor-data)
-          options (get c->d component-id (get c->d (:storm-id executor-data)))
-          spct    (if (and (not-nil? options) (:enable options)) (:samplingpct options) 0)]
-      ;; the thread's initialized random number generator is used to generate
-      ;; uniformily distributed random numbers.
-      (when (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
-        (send-unanchored
-          task-data
-          StormCommon/EVENTLOGGER_STREAM_ID
-          [component-id message-id (System/currentTimeMillis) values]
-          (:transfer-fn executor-data)))))
-
-(defmethod mk-threads :spout [executor-data task-datas initial-credentials]
-  (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data
-        ^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf)
-        max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
-        ^Integer max-spout-pending (if max-spout-pending (int max-spout-pending))        
-        last-active (atom false)        
-        spouts (ArrayList. (map #(.getTaskObject %) (vals task-datas)))
-        rand (Random. (Utils/secureRandomLong))
-        ^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue)
-        debug? (= true (storm-conf TOPOLOGY-DEBUG))
-
-        pending (RotatingMap.
-                 2 ;; microoptimize for performance of .size method
-                 (reify RotatingMap$ExpiredCallback
-                   (expire [this id [task-id spout-id tuple-info start-time-ms]]
-                     (let [time-delta (if start-time-ms (Time/deltaMs start-time-ms))]
-                       (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta "TIMEOUT" id)
-                       ))))
-        tuple-action-fn (fn [task-id ^TupleImpl tuple]
-                          (let [stream-id (.getSourceStreamId tuple)]
-                            (condp = stream-id
-                              Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
-                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
-                              Constants/CREDENTIALS_CHANGED_STREAM_ID 
-                                (let [task-data (get task-datas task-id)
-                                      spout-obj (.getTaskObject task-data)]
-                                  (when (instance? ICredentialsListener spout-obj)
-                                    (.setCredentials spout-obj (.getValue tuple 0))))
-                              Acker/ACKER_RESET_TIMEOUT_STREAM_ID 
-                                (let [id (.getValue tuple 0)
-                                      pending-for-id (.get pending id)]
-                                   (when pending-for-id
-                                     (.put pending id pending-for-id))) 
-                              (let [id (.getValue tuple 0)
-                                    time-delta-ms (.getValue tuple 1)
-                                    [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
-                                (when spout-id
-                                  (when-not (= stored-task-id task-id)
-                                    (throw (RuntimeException. (str "Fatal error, mismatched task ids: " task-id " " stored-task-id))))
-                                  (let [time-delta (if start-time-ms time-delta-ms)]
-                                    (condp = stream-id
-                                      Acker/ACKER_ACK_STREAM_ID (ack-spout-msg executor-data (get task-datas task-id)
-                                                                               spout-id tuple-finished-info time-delta id)
-                                      Acker/ACKER_FAIL_STREAM_ID (fail-spout-msg executor-data (get task-datas task-id)
-                                                                           spout-id tuple-finished-info time-delta "FAIL-STREAM" id)
-                                      )))
-                                ;; TODO: on failure, emit tuple to failure stream
-                                ))))
-        receive-queue (:receive-queue executor-data)
-        event-handler (mk-task-receiver executor-data tuple-action-fn)
-        has-ackers? (StormCommon/hasAckers storm-conf)
-        has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
-        emitted-count (MutableLong. 0)
-        empty-emit-streak (MutableLong. 0)
-        spout-transfer-fn (fn []
-                            ;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
-                            (while (not @(:storm-active-atom executor-data))
-                              (Thread/sleep 100))
-                            (log-message "Opening spout " component-id ":" (keys task-datas))
-                            (.registerAll (:spout-throttling-metrics executor-data) storm-conf (.getUserContext (first (vals task-datas))))
-                            (doseq [[task-id task-data] task-datas
-                                    :let [^ISpout spout-obj (.getTaskObject task-data)
-                                          send-spout-msg (fn [out-stream-id values message-id out-task-id]
-                                                           (.increment emitted-count)
-                                                           (let [out-tasks (if out-task-id
-                                                                             (.getOutgoingTasks task-data out-task-id out-stream-id values)
-                                                                             (.getOutgoingTasks task-data out-stream-id values))
-                                                                 rooted? (and message-id has-ackers?)
-                                                                 root-id (if rooted? (MessageId/generateId rand))
-                                                                 ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
-                                                             (fast-list-iter [out-task out-tasks id out-ids]
-                                                                             (let [tuple-id (if rooted?
-                                                                                              (MessageId/makeRootId root-id id)
-                                                                                              (MessageId/makeUnanchored))
-                                                                                   out-tuple (TupleImpl. worker-context
-                                                                                                         values
-                                                                                                         task-id
-                                                                                                         out-stream-id
-                                                                                                         tuple-id)]
-                                                                               (transfer-fn out-task out-tuple)))
-                                                             (if has-eventloggers?
-                                                               (send-to-eventlogger executor-data task-data values component-id message-id rand))
-                                                             (if (and rooted?
-                                                                      (not (.isEmpty out-ids)))
-                                                               (do
-                                                                 (.put pending root-id [task-id
-                                                                                        message-id
-                                                                                        {:stream out-stream-id 
-                                                                                         :values (if debug? values nil)}
-                                                                                        (if (.call ^Callable sampler) (System/currentTimeMillis))])
-                                                                 (send-unanchored task-data
-                                                                                       Acker/ACKER_INIT_STREAM_ID
-                                                                                       [root-id (Utils/bitXorVals out-ids) task-id]
-                                                                                       (:transfer-fn executor-data)))
-                                                               (when message-id
-                                                                 (ack-spout-msg executor-data task-data message-id
-                                                                                {:stream out-stream-id :values values}
-                                                                                (if (.call ^Callable sampler) 0) "0:")))
-                                                             (or out-tasks [])))]]
-
-                              (.registerAll (.getBuiltInMetrics task-data) storm-conf (.getUserContext task-data))
-                              (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
-                                                                       "receive" receive-queue}
-                                                                      storm-conf (.getUserContext task-data))
-
-                              (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials))
-
-                              (.open spout-obj
-                                     storm-conf
-                                     (.getUserContext task-data)
-                                     (SpoutOutputCollector.
-                                      (reify ISpoutOutputCollector
-                                        (^long getPendingCount[this]
-                                          (.size pending))
-                                        (^List emit [this ^String stream-id ^List tuple ^Object message-id]
-                                          (send-spout-msg stream-id tuple message-id nil))
-                                        (^void emitDirect [this ^int out-task-id ^String stream-id
-                                                           ^List tuple ^Object message-id]
-                                          (send-spout-msg stream-id tuple message-id out-task-id))
-                                        (reportError [this error]
-                                          (report-error error))))))
-
-                            (reset! open-or-prepare-was-called? true) 
-                            (log-message "Opened spout " component-id ":" (keys task-datas))
-                            (setup-metrics! executor-data)
-
-                            (fn []
-                              ;; This design requires that spouts be non-blocking
-                              (.consumeBatch ^DisruptorQueue receive-queue event-handler)
-
-                              (let [active? @(:storm-active-atom executor-data)
-                                    curr-count (.get emitted-count)
-                                    backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
-                                    throttle-on (and backpressure-enabled
-                                                     @(:throttle-on (:worker executor-data)))
-                                    reached-max-spout-pending (and max-spout-pending
-                                                                   (>= (.size pending) max-spout-pending))]
-                                (if active?
-                                        ; activated
-                                  (do
-                                    (when-not @last-active
-                                      (reset! last-active true)
-                                      (log-message "Activating spout " component-id ":" (keys task-datas))
-                                      (fast-list-iter [^ISpout spout spouts] (.activate spout)))
-
-                                    (if (and (not (.isFull transfer-queue))
-                                             (not throttle-on)
-                                             (not reached-max-spout-pending))
-                                      (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))))
-                                        ; deactivated
-                                  (do
-                                    (when @last-active
-                                      (reset! last-active false)
-                                      (log-message "Deactivating spout " component-id ":" (keys task-datas))
-                                      (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
-                                    ;; TODO: log that it's getting throttled
-                                    (Time/sleep 100)
-                                    (.skippedInactive (:spout-throttling-metrics executor-data) (:stats executor-data))))
-
-                                (if (and (= curr-count (.get emitted-count)) active?)
-                                  (do (.increment empty-emit-streak)
-                                      (.emptyEmit spout-wait-strategy (.get empty-emit-streak))
-                                      ;; update the spout throttling metrics
-                                      (if throttle-on
-                                        (.skippedThrottle (:spout-throttling-metrics executor-data) (:stats executor-data))
-                                        (if reached-max-spout-pending
-                                          (.skippedMaxSpout (:spout-throttling-metrics executor-data) (:stats executor-data)))))
-                                  (.set empty-emit-streak 0)))
-                              0))]
-
-    [(Utils/asyncLoop
-      spout-transfer-fn
-      false ; isDaemon
-      (:report-error-and-die executor-data)
-      Thread/NORM_PRIORITY
-      true ; isFactory
-      true ; startImmediately
-      (str component-id "-executor" (:executor-id executor-data)))]))
-
-(defn- tuple-time-delta! [^TupleImpl tuple]
-  (let [ms (.getProcessSampleStartTime tuple)]
-    (if ms
-      (Time/deltaMs ms))))
-      
-(defn- tuple-execute-time-delta! [^TupleImpl tuple]
-  (let [ms (.getExecuteSampleStartTime tuple)]
-    (if ms
-      (Time/deltaMs ms))))
-
-(defn put-xor! [^Map pending key id]
-  (let [curr (or (.get pending key) (long 0))]
-    (.put pending key (bit-xor curr id))))
-
-(defmethod mk-threads :bolt [executor-data task-datas initial-credentials]
-  (let [storm-conf (:storm-conf executor-data)
-        execute-sampler (ConfigUtils/mkStatsSampler storm-conf)
-        executor-stats (:stats executor-data)
-        {:keys [storm-conf component-id worker-context transfer-fn report-error sampler
-                open-or-prepare-was-called?]} executor-data
-        rand (Random. (Utils/secureRandomLong))
-
-        tuple-action-fn (fn [task-id ^TupleImpl tuple]
-                          ;; synchronization needs to be done with a key provided by this bolt, otherwise:
-                          ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
-                          ;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization
-                          ;; buffer other tuples until fully synchronized, then process all of those tuples
-                          ;; then go into normal loop
-                          ;; spill to disk?
-                          ;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task
-                          ;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests
-                          ;; or just timeout the sync messages that are coming in until full sync is hit from that task
-                          ;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates
-                          ;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
-                          ;; TODO: how to handle incremental updates as well as synchronizations at same time
-                          ;; TODO: need to version tuples somehow
-
-                          ;;(log-debug "Received tuple " tuple " at task " task-id)
-                          ;; need to do it this way to avoid reflection
-                          (let [stream-id (.getSourceStreamId tuple)]
-                            (condp = stream-id
-                              Constants/CREDENTIALS_CHANGED_STREAM_ID 
-                                (let [^Task task-data (get task-datas task-id)
-                                      bolt-obj (.getTaskObject task-data)]
-                                  (when (instance? ICredentialsListener bolt-obj)
-                                    (.setCredentials ^ICredentialsListener bolt-obj (.getValue tuple 0))))
-                              Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
-                              (let [^Task task-data (get task-datas task-id)
-                                    ^IBolt bolt-obj (.getTaskObject task-data)
-                                    user-context (.getUserContext task-data)
-                                    sampler? (.call ^Callable sampler)
-                                    execute-sampler? (.call ^Callable execute-sampler)
-                                    now (if (or sampler? execute-sampler?) (System/currentTimeMillis))
-                                    receive-queue (:receive-queue executor-data)]
-                                (when sampler?
-                                  (.setProcessSampleStartTime tuple now))
-                                (when execute-sampler?
-                                  (.setExecuteSampleStartTime tuple now))
-                                (.execute bolt-obj tuple)
-                                (let [delta (tuple-execute-time-delta! tuple)]
-                                  (when (= true (storm-conf TOPOLOGY-DEBUG))
-                                    (log-message "Execute done TUPLE " tuple " TASK: " task-id " DELTA: " delta))
-
-                                  (.applyOn (BoltExecuteInfo. tuple task-id delta) user-context)
-                                  (when delta
-                                    (.boltExecuteTuple executor-stats
-                                                               (.getSourceComponent tuple)
-                                                               (.getSourceStreamId tuple)
-                                                               delta)))))))
-        has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
-        bolt-transfer-fn (fn []
-                           ;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
-                           (while (not @(:storm-active-atom executor-data))
-                             (Thread/sleep 100))
-
-                           (log-message "Preparing bolt " component-id ":" (keys task-datas))
-                           (doseq [[task-id task-data] task-datas
-                                   :let [^IBolt bolt-obj (.getTaskObject task-data)
-                                         user-context (.getUserContext task-data)
-                                         transfer-fn (:transfer-fn executor-data)
-                                         bolt-emit (fn [stream anchors values task]
-                                                     (let [out-tasks (if task
-                                                                       (.getOutgoingTasks task-data task stream values)
-                                                                       (.getOutgoingTasks task-data stream values))]
-                                                       (fast-list-iter [t out-tasks]
-                                                                       (let [anchors-to-ids (HashMap.)]
-                                                                         (fast-list-iter [^TupleImpl a anchors]
-                                                                                         (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
-                                                                                           (when (pos? (count root-ids))
-                                                                                             (let [edge-id (MessageId/generateId rand)]
-                                                                                               (.updateAckVal a edge-id)
-                                                                                               (fast-list-iter [root-id root-ids]
-                                                                                                               (put-xor! anchors-to-ids root-id edge-id))))))
-                                                                         (let [tuple (TupleImpl. worker-context
-                                                                                                 values
-                                                                                                 task-id
-                                                                                                 stream
-                                                                                                 (MessageId/makeId anchors-to-ids))]
-                                                                           (transfer-fn t tuple))))
-                                                       (if has-eventloggers?
-                                                         (send-to-eventlogger executor-data task-data values component-id nil rand))
-                                                       (or out-tasks [])))]]
-                             (.registerAll (.getBuiltInMetrics task-data) storm-conf user-context)
-                             (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials))
-                             (if (= component-id Constants/SYSTEM_COMPONENT_ID)
-                               (do
-                                 (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
-                                                                          "receive" (:receive-queue executor-data)
-                                                                          "transfer" (:transfer-queue (:worker executor-data))}
-                                                                         storm-conf user-context)
-                                 (BuiltinMetricsUtil/registerIconnectionClientMetrics
-                                   (.deref (:cached-node+port->socket (:worker executor-data))) storm-conf user-context)
-                                 (BuiltinMetricsUtil/registerIconnectionServerMetric (:receiver (:worker executor-data)) storm-conf user-context))
-                               (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
-                                                                        "receive" (:receive-queue executor-data)}
-                                                                       storm-conf user-context))
-
-                             (.prepare bolt-obj
-                                       storm-conf
-                                       user-context
-                                       (OutputCollector.
-                                        (reify IOutputCollector
-                                          (emit [this stream anchors values]
-                                            (bolt-emit stream anchors values nil))
-                                          (emitDirect [this task stream anchors values]
-                                            (bolt-emit stream anchors values task))
-                                          (^void ack [this ^Tuple tuple]
-                                            (let [^TupleImpl tuple tuple
-                                                  ack-val (.getAckVal tuple)]
-                                              (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
-                                                             (send-unanchored task-data
-                                                                              Acker/ACKER_ACK_STREAM_ID
-                                                                              [root (bit-xor id ack-val)]
-                                                                              transfer-fn)))
-                                            (let [delta (tuple-time-delta! tuple)
-                                                  debug? (= true (storm-conf TOPOLOGY-DEBUG))]
-                                              (when debug?
-                                                (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
-                                              (.applyOn (BoltAckInfo. tuple task-id delta) user-context)
-                                              (when delta
-                                                (.boltAckedTuple executor-stats
-                                                                         (.getSourceComponent tuple)
-                                                                         (.getSourceStreamId tuple)
-                                                                         delta))))
-                                          (^void fail [this ^Tuple tuple]
-                                            (fast-list-iter [root (.. tuple getMessageId getAnchors)]
-                                                            (send-unanchored task-data
-                                                                                  Acker/ACKER_FAIL_STREAM_ID
-                                                                                  [root]
-                                                                                  transfer-fn))
-                                            (let [delta (tuple-time-delta! tuple)
-                                                  debug? (= true (storm-conf TOPOLOGY-DEBUG))]
-                                              (when debug?
-                                                (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
-                                              (.applyOn (BoltFailInfo. tuple task-id delta) user-context)
-                                              (when delta
-                                                (.boltFailedTuple executor-stats
-                                                                          (.getSourceComponent tuple)
-                                                                          (.getSourceStreamId tuple)
-                                                                          delta))))
-                                          (^void resetTimeout [this ^Tuple tuple]
-                                            (fast-list-iter [root (.. tuple getMessageId getAnchors)]
-                                                            (send-unanchored task-data
-                                                                                  Acker/ACKER_RESET_TIMEOUT_STREAM_ID
-                                                                                  [root]
-                                                                                  transfer-fn)))
-                                          (reportError [this error]
-                                            (report-error error))))))
-                           (reset! open-or-prepare-was-called? true)
-                           (log-message "Prepared bolt " component-id ":" (keys task-datas))
-                           (setup-metrics! executor-data)
-
-                           (let [receive-queue (:receive-queue executor-data)
-                                 event-handler (mk-task-receiver executor-data tuple-action-fn)]
-                             (fn []
-                               (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue event-handler)
-                               0)))]
-    ;; TODO: can get any SubscribedState objects out of the context now
-
-    [(Utils/asyncLoop
-      bolt-transfer-fn
-      false ; isDaemon
-      (:report-error-and-die executor-data)
-      Thread/NORM_PRIORITY
-      true ; isFactory
-      true ; startImmediately
-      (str component-id "-executor" (:executor-id executor-data)))]))
-
-(defmethod close-component :spout [executor-data spout]
-  (.close spout))
-
-(defmethod close-component :bolt [executor-data bolt]
-  (.cleanup bolt))
-
-;; TODO: refactor this to be part of an executor-specific map
-(defmethod mk-executor-stats :spout [_ rate]
-  (SpoutExecutorStats. rate))
-
-(defmethod mk-executor-stats :bolt [_ rate]
-  (BoltExecutorStats. rate))

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
new file mode 100644
index 0000000..1e46e37
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
@@ -0,0 +1,42 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.local-executor
+  (:use [org.apache.storm util config log])
+  (:import [org.apache.storm.tuple AddressedTuple]
+           [org.apache.storm.executor Executor ExecutorTransfer])
+  (:import [org.apache.storm.utils DisruptorQueue])
+  (:import [org.apache.storm Config Constants]))
+
+(defn local-transfer-executor-tuple []
+  (fn [task tuple batch-transfer->worker]
+    (let [val (AddressedTuple. task tuple)]
+      (.publish ^DisruptorQueue batch-transfer->worker val))))
+
+(defn mk-local-executor-transfer [worker-topology-context batch-queue storm-conf transfer-fn]
+  (proxy [ExecutorTransfer] [worker-topology-context batch-queue storm-conf transfer-fn]
+    (transfer [task tuple]
+      (let [batch-transfer->worker (.getBatchTransferQueue this)]
+        ((local-transfer-executor-tuple) task tuple batch-transfer->worker)))))
+
+(defn mk-local-executor [workerData executorId credentials]
+  (let [executor (Executor/mkExecutor workerData executorId credentials)
+        worker-topology-context (.getWorkerTopologyContext executor)
+        batch-transfer-queue (.getTransferWorkerQueue executor)
+        storm-conf (.getStormConf executor)
+        transfer-fn (.getTransferFn executor)
+        local-executor-transfer (mk-local-executor-transfer worker-topology-context batch-transfer-queue storm-conf transfer-fn)]
+    (.setLocalExecutorTransfer executor local-executor-transfer)
+    (.execute executor)))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index dddce68..781558c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -18,11 +18,13 @@
   (:use [org.apache.storm config log util converter local-state-converter])
   (:require [clj-time.core :as time])
   (:require [clj-time.coerce :as coerce])
-  (:require [org.apache.storm.daemon [executor :as executor]])
 
   (:require [clojure.set :as set])
+  (:require [org.apache.storm.daemon
+               [local-executor :as local-executor]])
   (:import [java.io File]
-           [org.apache.storm.stats StatsUtil])
+           [org.apache.storm.stats StatsUtil]
+           [java.util.concurrent.atomic AtomicBoolean AtomicReference])
   (:import [java.util.concurrent Executors]
            [org.apache.storm.hooks IWorkerHook BaseWorkerHook]
            [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J])
@@ -48,10 +50,9 @@
   (:import [org.apache.logging.log4j.core.config LoggerConfig])
   (:import [org.apache.storm.generated LogConfig LogLevelAction])
   (:import [org.apache.storm StormTimer])
+  (:import [org.apache.storm.executor Executor])
   (:gen-class))
 
-(defmulti mk-suicide-fn cluster-mode)
-
 (defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port assignment-versions]
   (log-message "Reading Assignments.")
   (let [assignment (:executor->node+port (clojurify-assignment (.assignmentInfo storm-cluster-state storm-id nil)))]
@@ -69,7 +70,7 @@
   (let [stats (if-not executors
                   (StatsUtil/mkEmptyExecutorZkHbs (:executors worker))
                   (StatsUtil/convertExecutorZkHbs (->> executors
-                    (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))
+                    (map (fn [e] {(.getExecutorId e) (.renderStats e)}))
                     (apply merge))))
         zk-hb (StatsUtil/mkZkWorkerHb (:storm-id worker) stats (. (:uptime worker) upTime))]
     ;; do the zookeeper heartbeat
@@ -96,7 +97,7 @@
 (defn worker-outbound-tasks
   "Returns seq of task-ids that receive messages from this worker"
   [worker]
-  (let [context (worker-context worker)
+  (let [context (StormCommon/makeWorkerContext (Utils/convertClojureMapToJavaMap worker))
         components (mapcat
                      (fn [task-id]
                        (->> (.getComponentId context (int task-id))
@@ -145,11 +146,11 @@
             assignment-id (:assignment-id worker)
             port (:port worker)
             storm-cluster-state (:storm-cluster-state worker)
-            prev-backpressure-flag @(:backpressure worker)
+            prev-backpressure-flag (.get (:backpressure worker))
             ;; the backpressure flag is true if at least one of the disruptor queues has throttle-on
             curr-backpressure-flag (if executors
                                      (or (.getThrottleOn (:transfer-queue worker))
-                                       (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors)))
+                                       (reduce #(or %1 %2) (map #(.getBackPressureFlag %1) executors)))
                                      prev-backpressure-flag)]
         ;; update the worker's backpressure flag to zookeeper only when it has changed
         (when (not= prev-backpressure-flag curr-backpressure-flag)
@@ -157,7 +158,7 @@
             (log-debug "worker backpressure flag changing from " prev-backpressure-flag " to " curr-backpressure-flag)
             (.workerBackpressure storm-cluster-state storm-id assignment-id (long port) curr-backpressure-flag)
             ;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
-            (reset! (:backpressure worker) curr-backpressure-flag)
+            (.set (:backpressure worker) curr-backpressure-flag)
             (catch Exception exc
               (log-error exc "workerBackpressure update failed when connecting to ZK ... will retry"))))
         ))))
@@ -276,7 +277,7 @@
         mq-context  (if mq-context
                       mq-context
                       (TransportFactory/makeContext storm-conf))]
-
+    ;; TODO: when translating this function, use constants defined in Constants.java
     (recursive-map
       :conf conf
       :mq-context mq-context
@@ -290,9 +291,9 @@
       ;; when worker bootup, worker will start to setup initial connections to
       ;; other workers. When all connection is ready, we will enable this flag
       ;; and spout and bolt will be activated.
-      :worker-active-flag (atom false)
-      :storm-active-atom (atom false)
-      :storm-component->debug-atom (atom {})
+      :worker-active-flag (atom false) ;; used in worker only, keep it as atom
+      :storm-active-atom (AtomicBoolean. false)
+      :storm-component->debug-atom (AtomicReference.)
       :executors executors
       :task-ids (->> receive-queue-map keys (map int) sort)
       :storm-conf storm-conf
@@ -321,7 +322,7 @@
                                  (mapcat (fn [e] (for [t (executor->tasks e)] [t (first e)])))
                                  (into {})
                                  (HashMap.))
-      :suicide-fn (mk-suicide-fn conf)
+      :suicide-fn (Utils/mkSuicideFn)
       :uptime (Utils/makeUptimeComputer)
       :default-shared-resources (mk-default-resources <>)
       :user-shared-resources (mk-user-resources <>)
@@ -329,10 +330,10 @@
       :transfer-fn (mk-transfer-fn <>)
       :load-mapping (LoadMapping.)
       :assignment-versions assignment-versions
-      :backpressure (atom false) ;; whether this worker is going slow
-      :transfer-backpressure (atom false) ;; if the transfer queue is backed-up
-      :backpressure-trigger (atom false) ;; a trigger for synchronization with executors
-      :throttle-on (atom false) ;; whether throttle is activated for spouts
+      :backpressure (AtomicBoolean. false) ;; whether this worker is going slow
+      :transfer-backpressure (AtomicBoolean. false) ;; if the transfer queue is backed-up
+      :backpressure-trigger (AtomicBoolean. false) ;; a trigger for synchronization with executors
+      :throttle-on (AtomicBoolean. false) ;; whether throttle is activated for spouts
       )))
 
 (defn- endpoint->string [[node port]]
@@ -422,6 +423,7 @@
               current-connections (set (keys @(:cached-node+port->socket worker)))
               new-connections (set/difference needed-connections current-connections)
               remove-connections (set/difference current-connections needed-connections)]
+
               (swap! (:cached-node+port->socket worker)
                      #(HashMap. (merge (into {} %1) %2))
                      (into {}
@@ -455,11 +457,9 @@
                  (:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
   ([worker callback]
     (let [base (clojurify-storm-base (.stormBase (:storm-cluster-state worker) (:storm-id worker) callback))]
-      (reset!
-        (:storm-active-atom worker)
-        (and (= :active (-> base :status :type)) @(:worker-active-flag worker)))
-      (reset! (:storm-component->debug-atom worker) (-> base :component->debug))
-      (log-debug "Event debug options " @(:storm-component->debug-atom worker)))))
+      (.set (:storm-active-atom worker) (and (= :active (-> base :status :type)) @(:worker-active-flag worker)))
+      (.set (:storm-component->debug-atom worker) (map-val thriftify-debugoptions (-> base :component->debug)))
+      (log-debug "Event debug options " (.get (:storm-component->debug-atom worker))))))
 
 ;; TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
 (defn mk-transfer-tuples-handler [worker]
@@ -514,7 +514,7 @@
         ^IConnection socket (:receiver worker)]
     (log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port worker))
     (.registerRecv socket (DeserializingConnectionCallback. (:storm-conf worker)
-                                                            (worker-context worker)
+                                                            (StormCommon/makeWorkerContext (Utils/convertClojureMapToJavaMap worker))
                                                             transfer-local-fn))))
 
 (defn- close-resources [worker]
@@ -605,7 +605,7 @@
 (defn run-worker-start-hooks [worker]
   (let [topology (:topology worker)
         topo-conf (:storm-conf worker)
-        worker-topology-context (worker-context worker)
+        worker-topology-context (StormCommon/makeWorkerContext (Utils/convertClojureMapToJavaMap worker))
         hooks (.get_worker_hooks topology)]
     (dofor [hook hooks]
       (let [hook-bytes (Utils/toByteArray hook)
@@ -680,7 +680,9 @@
 
         _ (run-worker-start-hooks worker)
 
-        _ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e initial-credentials)))
+        _ (if (ConfigUtils/isLocalMode storm-conf)
+            (reset! executors (dofor [e (:executors worker)] (local-executor/mk-local-executor worker e initial-credentials)))
+            (reset! executors (dofor [e (:executors worker)] (.execute (Executor/mkExecutor worker e initial-credentials)))))
 
         transfer-tuples (mk-transfer-tuples-handler worker)
         
@@ -699,7 +701,7 @@
             (.start backpressure-thread))
         callback (fn cb []
                    (let [throttle-on (.topologyBackpressure storm-cluster-state storm-id cb)]
-                     (reset! (:throttle-on worker) throttle-on)))
+                     (.set (:throttle-on worker) throttle-on)))
         _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
             (.topologyBackpressure storm-cluster-state storm-id callback))
 
@@ -766,14 +768,14 @@
                                     (let [new-creds (clojurify-crdentials (.credentials (:storm-cluster-state worker) storm-id nil))]
                                       (when-not (= new-creds @credentials) ;;This does not have to be atomic, worst case we update when one is not needed
                                         (AuthUtils/updateSubject subject auto-creds new-creds)
-                                        (dofor [e @executors] (.credentials-changed e new-creds))
+                                        (dofor [e @executors] (.credenetialsChanged e new-creds))
                                         (reset! credentials new-creds))))
        check-throttle-changed (fn []
                                 (let [callback (fn cb []
                                                  (let [throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id cb)]
-                                                   (reset! (:throttle-on worker) throttle-on)))
+                                                   (.set (:throttle-on worker) throttle-on)))
                                       new-throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id callback)]
-                                    (reset! (:throttle-on worker) new-throttle-on)))
+                                    (.set (:throttle-on worker) new-throttle-on)))
         check-log-config-changed (fn []
                                   (let [log-config (.topologyLogConfig (:storm-cluster-state worker) storm-id nil)]
                                     (process-log-config-change latest-log-config original-log-levels log-config)
@@ -809,14 +811,6 @@
     ret
     ))))))
 
-(defmethod mk-suicide-fn
-  :local [conf]
-  (fn [] (Utils/exitProcess 1 "Worker died")))
-
-(defmethod mk-suicide-fn
-  :distributed [conf]
-  (fn [] (Utils/exitProcess 1 "Worker died")))
-
 (defn -main [storm-id assignment-id port-str worker-id]
   (let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
     (Utils/setupDefaultUncaughtExceptionHandler)

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index d67b48d..dc676d6 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -17,15 +17,17 @@
 (ns org.apache.storm.testing
   (:require [org.apache.storm.daemon
              [nimbus :as nimbus]
+             [local-executor :as local-executor]
              [local-supervisor :as local-supervisor]
              [common :as common]
-             [worker :as worker]
-             [executor :as executor]])
+             [worker :as worker]])
   (:import [org.apache.commons.io FileUtils]
            [org.apache.storm.utils]
            [org.apache.storm.zookeeper Zookeeper]
            [org.apache.storm ProcessSimulator]
-           [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager])
+           [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager]
+           [org.apache.storm.executor Executor]
+           [java.util.concurrent.atomic AtomicBoolean])
   (:import [java.io File])
   (:import [java.util HashMap ArrayList])
   (:import [java.util.concurrent.atomic AtomicInteger])
@@ -697,12 +699,12 @@
          ;; of tuple emission (and not on a separate thread later) for
          ;; topologies to be tracked correctly. This is because "transferred" *must*
          ;; be incremented before "processing".
-         executor/mk-executor-transfer-fn
-         (let [old# executor/mk-executor-transfer-fn]
+         local-executor/local-transfer-executor-tuple
+         (let [old# local-executor/local-transfer-executor-tuple]
            (fn [& args#]
              (let [transferrer# (apply old# args#)]
                (fn [& args2#]
-                 ;; (log-message "Transferring: " transfer-args#)
+                 ;; (log-message "Transferring: " args2#)
                  (increment-global! id# "transferred" 1)
                  (apply transferrer# args2#)))))]
           (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
@@ -759,7 +761,7 @@
                   {}
                   (HashMap.)
                   (HashMap.)
-                  (atom false))]
+                  (AtomicBoolean. false))]
     (TupleImpl. context values 1 stream)))
 
 (defmacro with-timeout

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/Constants.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Constants.java b/storm-core/src/jvm/org/apache/storm/Constants.java
index b2c642e..9436db1 100644
--- a/storm-core/src/jvm/org/apache/storm/Constants.java
+++ b/storm-core/src/jvm/org/apache/storm/Constants.java
@@ -22,7 +22,7 @@ import clojure.lang.RT;
 
 
 public class Constants {
-    public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream"; 
+    public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";
 
     public static final long SYSTEM_TASK_ID = -1;
     public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]");
@@ -32,5 +32,27 @@ public class Constants {
     public static final String METRICS_STREAM_ID = "__metrics";
     public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
     public static final String CREDENTIALS_CHANGED_STREAM_ID = "__credentials";
+
+    public static final Object TOPOLOGY = "topology";
+    public static final String SYSTEM_TOPOLOGY = "system-topology";
+    public static final String STORM_CONF = "storm-conf";
+    public static final String STORM_ID = "storm-id";
+    public static final String WORKER_ID = "worker-id";
+    public static final String CONF = "conf";
+    public static final String PORT = "port";
+    public static final String TASK_TO_COMPONENT = "task->component";
+    public static final String COMPONENT_TO_SORTED_TASKS = "component->sorted-tasks";
+    public static final String COMPONENT_TO_STREAM_TO_FIELDS = "component->stream->fields";
+    public static final String TASK_IDS = "task-ids";
+    public static final String DEFAULT_SHARED_RESOURCES = "default-shared-resources";
+    public static final String USER_SHARED_RESOURCES = "user-shared-resources";
+    public static final String USER_TIMER = "user-timer";
+    public static final String TRANSFER_FN = "transfer-fn";
+    public static final String SUICIDE_FN = "suicide-fn";
+    public static final String THROTTLE_ON = "throttle-on";
+    public static final String EXECUTOR_RECEIVE_QUEUE_MAP = "executor-receive-queue-map";
+    public static final String STORM_ACTIVE_ATOM = "storm-active-atom";
+    public static final String COMPONENT_TO_DEBUG_ATOM = "storm-component->debug-atom";
+    public static final Object LOAD_MAPPING = "load-mapping";
 }
     


[2/5] storm git commit: STORM-1277 port backtype.storm.daemon.executor to java

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index cfc435f..391c2d3 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -39,6 +39,8 @@ import org.apache.storm.metric.filter.FilterByMetricName;
 import org.apache.storm.metric.util.DataPointExpander;
 import org.apache.storm.security.auth.IAuthorizer;
 import org.apache.storm.task.IBolt;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.IPredicate;
 import org.apache.storm.utils.ThriftTopologyUtils;
@@ -47,6 +49,7 @@ import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -65,6 +68,7 @@ public class StormCommon {
      * Provide an instance of this class for delegates to use.  To mock out
      * delegated methods, provide an instance of a subclass that overrides the
      * implementation of the delegated method.
+     *
      * @param common a StormCommon instance
      * @return the previously set instance
      */
@@ -90,6 +94,7 @@ public class StormCommon {
     public static final String TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE = "expandMapType";
     public static final String TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR = "metricNameSeparator";
 
+    @SuppressWarnings("unchecked")
     public static String getStormId(final IStormClusterState stormClusterState, final String topologyName) {
         List<String> activeTopologys = stormClusterState.activeStorms();
         IPredicate pred = new IPredicate<String>() {
@@ -108,7 +113,7 @@ public class StormCommon {
 
     protected Map<String, StormBase> topologyBasesImpl(IStormClusterState stormClusterState) {
         List<String> activeTopologys = stormClusterState.activeStorms();
-        Map<String, StormBase> stormBases = new HashMap<String, StormBase>();
+        Map<String, StormBase> stormBases = new HashMap<>();
         for (String topologyId : activeTopologys) {
             StormBase base = stormClusterState.stormBase(topologyId, null);
             stormBases.put(topologyId, base);
@@ -122,11 +127,12 @@ public class StormCommon {
         }
     }
 
+    @SuppressWarnings("unchecked")
     private static void validateIds(StormTopology topology) throws InvalidTopologyException {
-        List<String> componentIds = new ArrayList<String>();
+        List<String> componentIds = new ArrayList<>();
 
         for (StormTopology._Fields field : Thrift.getTopologyFields()) {
-            if (ThriftTopologyUtils.isWorkerHook(field) == false) {
+            if (!ThriftTopologyUtils.isWorkerHook(field)) {
                 Object value = topology.getFieldValue(field);
                 Map<String, Object> componentMap = (Map<String, Object>) value;
                 componentIds.addAll(componentMap.keySet());
@@ -149,7 +155,7 @@ public class StormCommon {
         }
 
         List<String> offending = Utils.getRepeat(componentIds);
-        if (offending.isEmpty() == false) {
+        if (!offending.isEmpty()) {
             throw new InvalidTopologyException("Duplicate component ids: " + offending);
         }
     }
@@ -162,19 +168,21 @@ public class StormCommon {
         }
     }
 
+    @SuppressWarnings("unchecked")
     public static Map<String, Object> allComponents(StormTopology topology) {
-        Map<String, Object> components = new HashMap<String, Object>();
+        Map<String, Object> components = new HashMap<>();
         List<StormTopology._Fields> topologyFields = Arrays.asList(Thrift.getTopologyFields());
         for (StormTopology._Fields field : topologyFields) {
-            if (ThriftTopologyUtils.isWorkerHook(field) == false) {
+            if (!ThriftTopologyUtils.isWorkerHook(field)) {
                 components.putAll(((Map) topology.getFieldValue(field)));
             }
         }
         return components;
     }
 
+    @SuppressWarnings("unchecked")
     public static Map componentConf(Object component) {
-        Map<Object, Object> conf = new HashMap<Object, Object>();
+        Map<Object, Object> conf = new HashMap<>();
         ComponentCommon common = getComponentCommon(component);
         String jconf = common.get_json_conf();
         if (jconf != null) {
@@ -183,6 +191,7 @@ public class StormCommon {
         return conf;
     }
 
+    @SuppressWarnings("unchecked")
     public static void validateBasic(StormTopology topology) throws InvalidTopologyException {
         validateIds(topology);
 
@@ -191,7 +200,7 @@ public class StormCommon {
             if (spoutComponents != null) {
                 for (Object obj : spoutComponents.values()) {
                     ComponentCommon common = getComponentCommon(obj);
-                    if (isEmptyInputs(common) == false) {
+                    if (!isEmptyInputs(common)) {
                         throw new InvalidTopologyException("May not declare inputs for a spout");
                     }
                 }
@@ -211,7 +220,7 @@ public class StormCommon {
     }
 
     private static Set<String> getStreamOutputFields(Map<String, StreamInfo> streams) {
-        Set<String> outputFields = new HashSet<String>();
+        Set<String> outputFields = new HashSet<>();
         for (StreamInfo streamInfo : streams.values()) {
             outputFields.addAll(streamInfo.get_output_fields());
         }
@@ -227,24 +236,27 @@ public class StormCommon {
             for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
                 String sourceStreamId = input.getKey().get_streamId();
                 String sourceComponentId = input.getKey().get_componentId();
-                if(componentMap.keySet().contains(sourceComponentId) == false) {
-                    throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent component [" + sourceComponentId + "]");
+                if (!componentMap.keySet().contains(sourceComponentId)) {
+                    throw new InvalidTopologyException("Component: [" + componentId +
+                            "] subscribes from non-existent component [" + sourceComponentId + "]");
                 }
 
                 ComponentCommon sourceComponent = getComponentCommon(componentMap.get(sourceComponentId));
-                if (sourceComponent.get_streams().containsKey(sourceStreamId) == false) {
-                    throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent stream: " +
+                if (!sourceComponent.get_streams().containsKey(sourceStreamId)) {
+                    throw new InvalidTopologyException("Component: [" + componentId +
+                            "] subscribes from non-existent stream: " +
                             "[" + sourceStreamId + "] of component [" + sourceComponentId + "]");
                 }
 
                 Grouping grouping = input.getValue();
                 if (Thrift.groupingType(grouping) == Grouping._Fields.FIELDS) {
-                    List<String> fields = new ArrayList<String>(grouping.get_fields());
+                    List<String> fields = new ArrayList<>(grouping.get_fields());
                     Map<String, StreamInfo> streams = sourceComponent.get_streams();
                     Set<String> sourceOutputFields = getStreamOutputFields(streams);
                     fields.removeAll(sourceOutputFields);
                     if (fields.size() != 0) {
-                        throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from stream: [" + sourceStreamId  +"] of component " +
+                        throw new InvalidTopologyException("Component: [" + componentId +
+                                "] subscribes from stream: [" + sourceStreamId + "] of component " +
                                 "[" + sourceComponentId + "] + with non-existent fields: " + fields);
                     }
                 }
@@ -253,18 +265,22 @@ public class StormCommon {
     }
 
     public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) {
-        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
+        Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
         Set<String> boltIds = topology.get_bolts().keySet();
         Set<String> spoutIds = topology.get_spouts().keySet();
 
-        for(String id : spoutIds) {
-            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+        for (String id : spoutIds) {
+            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID),
+                    Thrift.prepareFieldsGrouping(Arrays.asList("id")));
         }
 
-        for(String id : boltIds) {
-            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
-            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
-            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+        for (String id : boltIds) {
+            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID),
+                    Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID),
+                    Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
+                    Thrift.prepareFieldsGrouping(Arrays.asList("id")));
         }
         return inputs;
     }
@@ -272,10 +288,12 @@ public class StormCommon {
     public static IBolt makeAckerBolt() {
         return _instance.makeAckerBoltImpl();
     }
+
     public IBolt makeAckerBoltImpl() {
         return new Acker();
     }
 
+    @SuppressWarnings("unchecked")
     public static void addAcker(Map conf, StormTopology topology) {
         int ackerNum = Utils.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
         Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);
@@ -285,13 +303,13 @@ public class StormCommon {
         outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
         outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
 
-        Map<String, Object> ackerConf = new HashMap<String, Object>();
+        Map<String, Object> ackerConf = new HashMap<>();
         ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
         ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
 
         Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);
 
-        for(Bolt bolt : topology.get_bolts().values()) {
+        for (Bolt bolt : topology.get_bolts().values()) {
             ComponentCommon common = bolt.get_common();
             common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
             common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
@@ -301,12 +319,17 @@ public class StormCommon {
         for (SpoutSpec spout : topology.get_spouts().values()) {
             ComponentCommon common = spout.get_common();
             Map spoutConf = componentConf(spout);
-            spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+            spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
+                    Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
             common.set_json_conf(JSONValue.toJSONString(spoutConf));
-            common.put_to_streams(Acker.ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
-            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping());
-            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping());
-            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareDirectGrouping());
+            common.put_to_streams(Acker.ACKER_INIT_STREAM_ID,
+                    Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
+            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID),
+                    Thrift.prepareDirectGrouping());
+            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID),
+                    Thrift.prepareDirectGrouping());
+            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
+                    Thrift.prepareDirectGrouping());
         }
 
         topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
@@ -341,9 +364,8 @@ public class StormCommon {
     }
 
     public static List<String> eventLoggerBoltFields() {
-        List<String> fields = Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID, EventLoggerBolt.FIELD_TS,
-                EventLoggerBolt.FIELD_VALUES);
-        return fields;
+        return Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID,
+                EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES);
     }
 
     public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
@@ -352,34 +374,38 @@ public class StormCommon {
         allIds.addAll(topology.get_bolts().keySet());
         allIds.addAll(topology.get_spouts().keySet());
 
-        for(String id : allIds) {
-            inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
+        for (String id : allIds) {
+            inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),
+                    Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
         }
         return inputs;
     }
 
     public static void addEventLogger(Map conf, StormTopology topology) {
-        Integer numExecutors = Utils.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
-        HashMap<String, Object> componentConf = new HashMap<String, Object>();
+        Integer numExecutors = Utils.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
+                Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+        HashMap<String, Object> componentConf = new HashMap<>();
         componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
         componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
-        Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);
+        Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(
+                eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);
 
-        for(Object component : allComponents(topology).values()) {
+        for (Object component : allComponents(topology).values()) {
             ComponentCommon common = getComponentCommon(component);
             common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
         }
         topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
     }
 
+    @SuppressWarnings("unchecked")
     public static Map<String, Bolt> metricsConsumerBoltSpecs(Map conf, StormTopology topology) {
-        Map<String, Bolt> metricsConsumerBolts = new HashMap<String, Bolt>();
+        Map<String, Bolt> metricsConsumerBolts = new HashMap<>();
 
-        Set<String> componentIdsEmitMetrics = new HashSet<String>();
+        Set<String> componentIdsEmitMetrics = new HashSet<>();
         componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
         componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);
 
-        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
+        Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
         for (String componentId : componentIdsEmitMetrics) {
             inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
         }
@@ -433,13 +459,14 @@ public class StormCommon {
         }
     }
 
+    @SuppressWarnings("unused")
     public static void addSystemComponents(Map conf, StormTopology topology) {
-        Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
+        Map<String, StreamInfo> outputStreams = new HashMap<>();
         outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("rate_secs")));
         outputStreams.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
         outputStreams.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(Arrays.asList("creds")));
 
-        Map<String, Object> boltConf = new HashMap<String, Object>();
+        Map<String, Object> boltConf = new HashMap<>();
         boltConf.put(Config.TOPOLOGY_TASKS, 0);
 
         Bolt systemBoltSpec = Thrift.prepareSerializedBoltDetails(null, new SystemBolt(), outputStreams, 0, boltConf);
@@ -468,20 +495,12 @@ public class StormCommon {
 
     public static boolean hasAckers(Map stormConf) {
         Object ackerNum = stormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
-        if (ackerNum == null || Utils.getInt(ackerNum) > 0) {
-            return true;
-        } else {
-            return false;
-        }
+        return ackerNum == null || Utils.getInt(ackerNum) > 0;
     }
 
     public static boolean hasEventLoggers(Map stormConf) {
         Object eventLoggerNum = stormConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS);
-        if (eventLoggerNum == null || Utils.getInt(eventLoggerNum) > 0) {
-            return true;
-        } else {
-            return false;
-        }
+        return eventLoggerNum == null || Utils.getInt(eventLoggerNum) > 0;
     }
 
     public static int numStartExecutors(Object component) throws InvalidTopologyException {
@@ -492,15 +511,16 @@ public class StormCommon {
     public static Map<Integer, String> stormTaskInfo(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
         return _instance.stormTaskInfoImpl(userTopology, stormConf);
     }
+
     /*
      * Returns map from task -> componentId
      */
     protected Map<Integer, String> stormTaskInfoImpl(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
-        Map<Integer, String> taskIdToComponentId = new HashMap<Integer, String>();
+        Map<Integer, String> taskIdToComponentId = new HashMap<>();
 
         StormTopology systemTopology = systemTopology(stormConf, userTopology);
         Map<String, Object> components = allComponents(systemTopology);
-        Map<String, Integer> componentIdToTaskNum = new TreeMap<String, Integer>();
+        Map<String, Integer> componentIdToTaskNum = new TreeMap<>();
         for (Map.Entry<String, Object> entry : components.entrySet()) {
             Map conf = componentConf(entry.getValue());
             Object taskNum = conf.get(Config.TOPOLOGY_TASKS);
@@ -521,7 +541,7 @@ public class StormCommon {
     }
 
     public static List<Integer> executorIdToTasks(List<Long> executorId) {
-        List<Integer> taskIds = new ArrayList<Integer>();
+        List<Integer> taskIds = new ArrayList<>();
         int taskId = executorId.get(0).intValue();
         while (taskId <= executorId.get(1).intValue()) {
             taskIds.add(taskId);
@@ -530,22 +550,24 @@ public class StormCommon {
         return taskIds;
     }
 
-    public static Map<Integer, NodeInfo> taskToNodeport(Map<List<Long>, NodeInfo> executorToNodeport) {
-        Map<Integer, NodeInfo> tasksToNodeport = new HashMap<Integer, NodeInfo>();
-        for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodeport.entrySet()) {
+    public static Map<Integer, NodeInfo> taskToNodeport(Map<List<Long>, NodeInfo> executorToNodePort) {
+        Map<Integer, NodeInfo> tasksToNodePort = new HashMap<>();
+        for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
             List<Integer> taskIds = executorIdToTasks(entry.getKey());
             for (Integer taskId : taskIds) {
-                tasksToNodeport.put(taskId, entry.getValue());
+                tasksToNodePort.put(taskId, entry.getValue());
             }
         }
-        return tasksToNodeport;
+        return tasksToNodePort;
     }
 
-    public static IAuthorizer mkAuthorizationHandler(String klassName, Map conf) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+    public static IAuthorizer mkAuthorizationHandler(String klassName, Map conf)
+            throws IllegalAccessException, InstantiationException, ClassNotFoundException {
         return _instance.mkAuthorizationHandlerImpl(klassName, conf);
     }
 
-    protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+    protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf)
+            throws ClassNotFoundException, IllegalAccessException, InstantiationException {
         IAuthorizer aznHandler = null;
         if (klassName != null) {
             Class aznClass = Class.forName(klassName);
@@ -554,10 +576,35 @@ public class StormCommon {
                 if (aznHandler != null) {
                     aznHandler.prepare(conf);
                 }
-                LOG.debug("authorization class name:{}, class:{}, handler:{}",klassName, aznClass, aznHandler);
+                LOG.debug("authorization class name:{}, class:{}, handler:{}", klassName, aznClass, aznHandler);
             }
         }
 
         return aznHandler;
     }
+
+    @SuppressWarnings("unchecked")
+    public static WorkerTopologyContext makeWorkerContext(Map<String, Object> workerData) {
+        try {
+            StormTopology stormTopology = (StormTopology) workerData.get(Constants.SYSTEM_TOPOLOGY);
+            Map stormConf = (Map) workerData.get(Constants.STORM_CONF);
+            Map<Integer, String> taskToComponent = (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT);
+            Map<String, List<Integer>> componentToSortedTasks =
+                    (Map<String, List<Integer>>) workerData.get(Constants.COMPONENT_TO_SORTED_TASKS);
+            Map<String, Map<String, Fields>> componentToStreamToFields =
+                    (Map<String, Map<String, Fields>>) workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS);
+            String stormId = (String) workerData.get(Constants.STORM_ID);
+            Map conf = (Map) workerData.get(Constants.CONF);
+            Integer port = (Integer) workerData.get(Constants.PORT);
+            String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, stormId));
+            String pidDir = ConfigUtils.workerPidsRoot(conf, stormId);
+            List<Integer> workerTasks = (List<Integer>) workerData.get(Constants.TASK_IDS);
+            Map<String, Object> defaultResources = (Map<String, Object>) workerData.get(Constants.DEFAULT_SHARED_RESOURCES);
+            Map<String, Object> userResources = (Map<String, Object>) workerData.get(Constants.USER_SHARED_RESOURCES);
+            return new WorkerTopologyContext(stormTopology, stormConf, taskToComponent, componentToSortedTasks,
+                    componentToStreamToFields, stormId, codeDir, pidDir, port, workerTasks, defaultResources, userResources);
+        } catch (IOException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Task.java b/storm-core/src/jvm/org/apache/storm/daemon/Task.java
index 60b570a..2e846ff 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Task.java
@@ -18,9 +18,11 @@
 package org.apache.storm.daemon;
 
 import org.apache.storm.Config;
+import org.apache.storm.Constants;
 import org.apache.storm.Thrift;
 import org.apache.storm.daemon.metrics.BuiltinMetrics;
 import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.executor.Executor;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.ComponentObject;
 import org.apache.storm.generated.JavaObject;
@@ -32,7 +34,6 @@ import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
 import org.apache.storm.grouping.LoadMapping;
 import org.apache.storm.hooks.ITaskHook;
 import org.apache.storm.hooks.info.EmitInfo;
-import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.spout.ShellSpout;
 import org.apache.storm.stats.CommonStats;
 import org.apache.storm.task.ShellBolt;
@@ -57,7 +58,7 @@ public class Task {
 
     private static final Logger LOG = LoggerFactory.getLogger(Task.class);
 
-    private Map executorData;
+    private Executor executor;
     private Map workerData;
     private TopologyContext systemTopologyContext;
     private TopologyContext userTopologyContext;
@@ -65,7 +66,7 @@ public class Task {
     private LoadMapping loadMapping;
     private Integer taskId;
     private String componentId;
-    private Object taskObject;  // Spout/Bolt object
+    private Object taskObject; // Spout/Bolt object
     private Map stormConf;
     private Callable<Boolean> emitSampler;
     private CommonStats executorStats;
@@ -73,20 +74,20 @@ public class Task {
     private BuiltinMetrics builtInMetrics;
     private boolean debug;
 
-    public Task(Map executorData, Integer taskId) throws IOException {
+    public Task(Executor executor, Integer taskId) throws IOException {
         this.taskId = taskId;
-        this.executorData = executorData;
-        this.workerData = (Map) executorData.get("worker");
-        this.stormConf = (Map) executorData.get("storm-conf");
-        this.componentId = (String) executorData.get("component-id");
-        this.streamComponentToGrouper = (Map<String, Map<String, LoadAwareCustomStreamGrouping>>) executorData.get("stream->component->grouper");
-        this.executorStats = (CommonStats) executorData.get("stats");
-        this.builtInMetrics = BuiltinMetricsUtil.mkData((String) executorData.get("type"), this.executorStats);
-        this.workerTopologyContext = (WorkerTopologyContext) executorData.get("worker-context");
+        this.executor = executor;
+        this.workerData = executor.getWorkerData();
+        this.stormConf = executor.getStormConf();
+        this.componentId = executor.getComponentId();
+        this.streamComponentToGrouper = executor.getStreamToComponentToGrouper();
+        this.executorStats = executor.getStats();
+        this.builtInMetrics = BuiltinMetricsUtil.mkData(executor.getType(), this.executorStats);
+        this.workerTopologyContext = executor.getWorkerTopologyContext();
         this.emitSampler = ConfigUtils.mkStatsSampler(stormConf);
-        this.loadMapping = (LoadMapping) workerData.get("load-mapping");
-        this.systemTopologyContext = mkTopologyContext((StormTopology) workerData.get("system-topology"));
-        this.userTopologyContext = mkTopologyContext((StormTopology) workerData.get("topology"));
+        this.loadMapping = (LoadMapping) workerData.get(Constants.LOAD_MAPPING);
+        this.systemTopologyContext = mkTopologyContext((StormTopology) workerData.get(Constants.SYSTEM_TOPOLOGY));
+        this.userTopologyContext = mkTopologyContext((StormTopology) workerData.get(Constants.TOPOLOGY));
         this.taskObject = mkTaskObject();
         this.debug = stormConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) stormConf.get(Config.TOPOLOGY_DEBUG);
         this.addTaskHooks();
@@ -119,13 +120,14 @@ public class Task {
         if (null != outTaskId) {
             return Collections.singletonList(outTaskId);
         }
-        return null;
+        return new ArrayList<>(0);
     }
 
     public List<Integer> getOutgoingTasks(String stream, List<Object> values) {
         if (debug) {
             LOG.info("Emitting: {} {} {}", componentId, stream, values);
         }
+
         List<Integer> outTasks = new ArrayList<>();
         if (!streamComponentToGrouper.containsKey(stream)) {
             throw new IllegalArgumentException("Unknown stream ID: " + stream);
@@ -164,7 +166,7 @@ public class Task {
         return componentId;
     }
 
-    public TopologyContext getUserContext() throws IOException {
+    public TopologyContext getUserContext() {
         return userTopologyContext;
     }
 
@@ -177,25 +179,25 @@ public class Task {
     }
 
     private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
-        Map conf = (Map) workerData.get("conf");
+        Map conf = (Map) workerData.get(Constants.CONF);
         return new TopologyContext(
-            topology,
-            (Map) workerData.get("storm-conf"),
-            (Map<Integer, String>) workerData.get("task->component"),
-            (Map<String, List<Integer>>) workerData.get("component->sorted-tasks"),
-            (Map<String, Map<String, Fields>>) workerData.get("component->stream->fields"),
-            (String) workerData.get("storm-id"),
-            ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, (String) workerData.get("storm-id"))),
-            ConfigUtils.workerPidsRoot(conf, (String) workerData.get("worker-id")),
-            taskId,
-            (Integer) workerData.get("port"),
-            (List<Integer>) workerData.get("task-ids"),
-            (Map<String, Object>) workerData.get("default-shared-resources"),
-            (Map<String, Object>) workerData.get("user-shared-resources"),
-            (Map<String, Object>) executorData.get("shared-executor-data"),
-            (Map<Integer, Map<Integer, Map<String, IMetric>>>) executorData.get("interval->task->metric-registry"),
-            (clojure.lang.Atom) executorData.get("open-or-prepare-was-called?")
-        );
+                topology,
+                (Map) workerData.get(Constants.STORM_CONF),
+                (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT),
+                (Map<String, List<Integer>>) workerData.get(Constants.COMPONENT_TO_SORTED_TASKS),
+                (Map<String, Map<String, Fields>>) workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS),
+                (String) workerData.get(Constants.STORM_ID),
+                ConfigUtils.supervisorStormResourcesPath(
+                        ConfigUtils.supervisorStormDistRoot(conf, (String) workerData.get(Constants.STORM_ID))),
+                ConfigUtils.workerPidsRoot(conf, (String) workerData.get(Constants.WORKER_ID)),
+                taskId,
+                (Integer) workerData.get(Constants.PORT),
+                (List<Integer>) workerData.get(Constants.TASK_IDS),
+                (Map<String, Object>) workerData.get(Constants.DEFAULT_SHARED_RESOURCES),
+                (Map<String, Object>) workerData.get(Constants.USER_SHARED_RESOURCES),
+                executor.getSharedExecutorData(),
+                executor.getIntervalToTaskToMetricToRegistry(),
+                executor.getOpenOrPrepareWasCalled());
     }
 
     private Object mkTaskObject() {
@@ -203,8 +205,8 @@ public class Task {
         Map<String, SpoutSpec> spouts = topology.get_spouts();
         Map<String, Bolt> bolts = topology.get_bolts();
         Map<String, StateSpoutSpec> stateSpouts = topology.get_state_spouts();
-        Object result = null;
-        ComponentObject componentObject = null;
+        Object result;
+        ComponentObject componentObject;
         if (spouts.containsKey(componentId)) {
             componentObject = spouts.get(componentId).get_spout_object();
         } else if (bolts.containsKey(componentId)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/Executor.java b/storm-core/src/jvm/org/apache/storm/executor/Executor.java
new file mode 100644
index 0000000..614d44d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/Executor.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler<Object> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Executor.class);
+
+    protected final Map workerData;
+    protected final WorkerTopologyContext workerTopologyContext;
+    protected final List<Long> executorId;
+    protected final List<Integer> taskIds;
+    protected final String componentId;
+    protected final AtomicBoolean openOrPrepareWasCalled;
+    protected final Map stormConf;
+    protected final Map conf;
+    protected final String stormId;
+    protected final HashMap sharedExecutorData;
+    protected final AtomicBoolean stormActive;
+    protected final AtomicReference<Map<String, DebugOptions>> stormComponentDebug;
+    protected final Runnable suicideFn;
+    protected final IStormClusterState stormClusterState;
+    protected final Map<Integer, String> taskToComponent;
+    protected CommonStats stats;
+    protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;
+    protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamToComponentToGrouper;
+    protected final ReportErrorAndDie reportErrorDie;
+    protected final Callable<Boolean> sampler;
+    protected final AtomicBoolean backpressure;
+    protected ExecutorTransfer executorTransfer;
+    protected final String type;
+    protected final AtomicBoolean throttleOn;
+    protected IFn transferFn;
+
+    protected final IReportError reportError;
+    protected final Random rand;
+    protected final DisruptorQueue transferQueue;
+    protected final DisruptorQueue receiveQueue;
+    protected Map<Integer, Task> idToTask;
+    protected final Map<String, String> credentials;
+    protected final Boolean isDebug;
+    protected final Boolean hasEventLoggers;
+    protected String hostname;
+
+    protected Executor(Map workerData, List<Long> executorId, Map<String, String> credentials) {
+        this.workerData = workerData;
+        this.executorId = executorId;
+        this.workerTopologyContext = StormCommon.makeWorkerContext(workerData);
+        this.taskIds = StormCommon.executorIdToTasks(executorId);
+        this.componentId = workerTopologyContext.getComponentId(taskIds.get(0));
+        this.openOrPrepareWasCalled = new AtomicBoolean(false);
+        this.stormConf = normalizedComponentConf((Map) workerData.get(Constants.STORM_CONF), workerTopologyContext, componentId);
+        this.receiveQueue = (DisruptorQueue) (((Map) workerData.get(Constants.EXECUTOR_RECEIVE_QUEUE_MAP)).get(executorId));
+        this.stormId = (String) workerData.get(Constants.STORM_ID);
+        this.conf = (Map) workerData.get(Constants.CONF);
+        this.sharedExecutorData = new HashMap();
+        this.stormActive = (AtomicBoolean) workerData.get(Constants.STORM_ACTIVE_ATOM);
+        this.stormComponentDebug = (AtomicReference<Map<String, DebugOptions>>) workerData.get(Constants.COMPONENT_TO_DEBUG_ATOM);
+
+        this.transferQueue = mkExecutorBatchQueue(stormConf, executorId);
+        this.transferFn = (IFn) workerData.get(Constants.TRANSFER_FN);
+        this.executorTransfer = new ExecutorTransfer(workerTopologyContext, transferQueue, stormConf, transferFn);
+
+        this.suicideFn = (Runnable) workerData.get(Constants.SUICIDE_FN);
+        try {
+            this.stormClusterState = ClusterUtils.mkStormClusterState(workerData.get("state-store"), Utils.getWorkerACL(stormConf),
+                    new ClusterStateContext(DaemonType.WORKER));
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+
+        StormTopology topology = workerTopologyContext.getRawTopology();
+        Map<String, SpoutSpec> spouts = topology.get_spouts();
+        Map<String, Bolt> bolts = topology.get_bolts();
+        if (spouts.containsKey(componentId)) {
+            this.type = StatsUtil.SPOUT;
+            this.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(stormConf));
+        } else if (bolts.containsKey(componentId)) {
+            this.type = StatsUtil.BOLT;
+            this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(stormConf));
+        } else {
+            throw new RuntimeException("Could not find " + componentId + " in " + topology);
+        }
+
+        this.intervalToTaskToMetricToRegistry = new HashMap<>();
+        this.taskToComponent = (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT);
+        this.streamToComponentToGrouper = outboundComponents(workerTopologyContext, componentId, stormConf);
+        this.reportError = new ReportError(stormConf, stormClusterState, stormId, componentId, workerTopologyContext);
+        this.reportErrorDie = new ReportErrorAndDie(reportError, suicideFn);
+        this.sampler = ConfigUtils.mkStatsSampler(stormConf);
+        this.backpressure = new AtomicBoolean(false);
+        this.throttleOn = (AtomicBoolean) workerData.get(Constants.THROTTLE_ON);
+        this.isDebug = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
+        this.rand = new Random(Utils.secureRandomLong());
+        this.credentials = credentials;
+        this.hasEventLoggers = StormCommon.hasEventLoggers(stormConf);
+
+        try {
+            this.hostname = Utils.hostname(stormConf);
+        } catch (UnknownHostException ignored) {
+            this.hostname = "";
+        }
+    }
+
+    public static Executor mkExecutor(Map workerData, List<Long> executorId, Map<String, String> credentials) {
+        Executor executor;
+
+        Map<String, Object> convertedWorkerData = Utils.convertClojureMapToJavaMap(workerData);
+        WorkerTopologyContext workerTopologyContext = StormCommon.makeWorkerContext(convertedWorkerData);
+        List<Integer> taskIds = StormCommon.executorIdToTasks(executorId);
+        String componentId = workerTopologyContext.getComponentId(taskIds.get(0));
+
+        String type = getExecutorType(workerTopologyContext, componentId);
+        if (StatsUtil.SPOUT.equals(type)) {
+            executor = new SpoutExecutor(convertedWorkerData, executorId, credentials);
+            executor.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()));
+        } else {
+            executor = new BoltExecutor(convertedWorkerData, executorId, credentials);
+            executor.stats = new BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()));
+        }
+
+        Map<Integer, Task> idToTask = new HashMap<>();
+        for (Integer taskId : taskIds) {
+            try {
+                Task task = new Task(executor, taskId);
+                executor.sendUnanchored(
+                        task, StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer());
+                idToTask.put(taskId, task);
+            } catch (IOException ex) {
+                throw Utils.wrapInRuntime(ex);
+            }
+        }
+        executor.init(idToTask);
+
+        return executor;
+    }
+
+    private static String getExecutorType(WorkerTopologyContext workerTopologyContext, String componentId) {
+        StormTopology topology = workerTopologyContext.getRawTopology();
+        Map<String, SpoutSpec> spouts = topology.get_spouts();
+        Map<String, Bolt> bolts = topology.get_bolts();
+        if (spouts.containsKey(componentId)) {
+            return StatsUtil.SPOUT;
+        } else if (bolts.containsKey(componentId)) {
+            return StatsUtil.BOLT;
+        } else {
+            throw new RuntimeException("Could not find " + componentId + " in " + topology);
+        }
+    }
+
+    /**
+     * separated from mkExecutor in order to replace executor transfer in executor data for testing
+     */
+    public ExecutorShutdown execute() throws Exception {
+        LOG.info("Loading executor tasks " + componentId + ":" + executorId);
+
+        registerBackpressure();
+        Utils.SmartThread systemThreads =
+                Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie);
+
+        String handlerName = componentId + "-executor" + executorId;
+        Utils.SmartThread handlers =
+                Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);
+        setupTicks(StatsUtil.SPOUT.equals(type));
+
+        LOG.info("Finished loading executor " + componentId + ":" + executorId);
+        return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
+    }
+
+    public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
+
+    public abstract void init(Map<Integer, Task> idToTask);
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
+        ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event;
+        for (AddressedTuple addressedTuple : addressedTuples) {
+            TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+            int taskId = addressedTuple.getDest();
+            if (isDebug) {
+                LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
+            }
+            if (taskId != AddressedTuple.BROADCAST_DEST) {
+                tupleActionFn(taskId, tuple);
+            } else {
+                for (Integer t : taskIds) {
+                    tupleActionFn(t, tuple);
+                }
+            }
+        }
+    }
+
+    public void metricsTick(Task taskData, TupleImpl tuple) {
+        try {
+            Integer interval = tuple.getInteger(0);
+            int taskId = taskData.getTaskId();
+            Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = intervalToTaskToMetricToRegistry.get(interval);
+            Map<String, IMetric> nameToRegistry = null;
+            if (taskToMetricToRegistry != null) {
+                nameToRegistry = taskToMetricToRegistry.get(taskId);
+            }
+            if (nameToRegistry != null) {
+                IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
+                        hostname, workerTopologyContext.getThisWorkerPort(),
+                        componentId, taskId, Time.currentTimeSecs(), interval);
+                List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
+                for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {
+                    IMetric metric = entry.getValue();
+                    Object value = metric.getValueAndReset();
+                    if (value != null) {
+                        IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
+                        dataPoints.add(dataPoint);
+                    }
+                }
+                if (!dataPoints.isEmpty()) {
+                    sendUnanchored(taskData, Constants.METRICS_STREAM_ID,
+                            new Values(taskInfo, dataPoints), executorTransfer);
+                }
+            }
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    protected void setupMetrics() {
+        for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
+            StormTimer timerTask = (StormTimer) workerData.get(Constants.USER_TIMER);
+            timerTask.scheduleRecurring(interval, interval, new Runnable() {
+                @Override
+                public void run() {
+                    TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(interval),
+                            (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
+                    List<AddressedTuple> metricsTickTuple =
+                            Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+                    receiveQueue.publish(metricsTickTuple);
+                }
+            });
+        }
+    }
+
+    public void sendUnanchored(Task task, String stream, List<Object> values, ExecutorTransfer transfer) {
+        Tuple tuple = task.getTuple(stream, values);
+        List<Integer> tasks = task.getOutgoingTasks(stream, values);
+        for (Integer t : tasks) {
+            transfer.transfer(t, tuple);
+        }
+    }
+
+    /**
+     * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
+     */
+    public void sendToEventLogger(Executor executor, Task taskData, List values,
+                                  String componentId, Object messageId, Random random) {
+        Map<String, DebugOptions> componentDebug = executor.getStormComponentDebug().get();
+        DebugOptions debugOptions = componentDebug.get(componentId);
+        if (debugOptions == null) {
+            debugOptions = componentDebug.get(executor.getStormId());
+        }
+        double spct = ((debugOptions != null) && (debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0;
+        if (spct > 0 && (random.nextDouble() * 100) < spct) {
+            sendUnanchored(taskData, StormCommon.EVENTLOGGER_STREAM_ID,
+                    new Values(componentId, messageId, System.currentTimeMillis(), values),
+                    executor.getExecutorTransfer());
+        }
+    }
+
+    private void registerBackpressure() {
+        receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback() {
+            @Override
+            public void highWaterMark() throws Exception {
+                if (!backpressure.get()) {
+                    backpressure.set(true);
+                    LOG.debug("executor " + executorId + " is congested, set backpressure flag true");
+                    WorkerBackpressureThread.notifyBackpressureChecker(workerData.get("backpressure-trigger"));
+                }
+            }
+
+            @Override
+            public void lowWaterMark() throws Exception {
+                if (backpressure.get()) {
+                    backpressure.set(false);
+                    LOG.debug("executor " + executorId + " is not-congested, set backpressure flag false");
+                    WorkerBackpressureThread.notifyBackpressureChecker(workerData.get("backpressure-trigger"));
+                }
+            }
+        });
+        receiveQueue.setHighWaterMark(Utils.getDouble(stormConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
+        receiveQueue.setLowWaterMark(Utils.getDouble(stormConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
+        receiveQueue.setEnableBackpressure(Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false));
+    }
+
+    protected void setupTicks(boolean isSpout) {
+        final Integer tickTimeSecs = Utils.getInt(stormConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
+        boolean enableMessageTimeout = (Boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
+        if (tickTimeSecs != null) {
+            if (Utils.isSystemId(componentId) || (!enableMessageTimeout && isSpout)) {
+                LOG.info("Timeouts disabled for executor " + componentId + ":" + executorId);
+            } else {
+                StormTimer timerTask = (StormTimer) workerData.get(Constants.USER_TIMER);
+                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, new Runnable() {
+                    @Override
+                    public void run() {
+                        TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
+                                (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID);
+                        List<AddressedTuple> tickTuple =
+                                Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+                        receiveQueue.publish(tickTuple);
+                    }
+                });
+            }
+        }
+    }
+
+
+    private DisruptorQueue mkExecutorBatchQueue(Map stormConf, List<Long> executorId) {
+        int sendSize = Utils.getInt(stormConf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
+        int waitTimeOutMs = Utils.getInt(stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS));
+        int batchSize = Utils.getInt(stormConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE));
+        int batchTimeOutMs = Utils.getInt(stormConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
+        return new DisruptorQueue("executor" + executorId + "-send-queue", ProducerType.SINGLE,
+                sendSize, waitTimeOutMs, batchSize, batchTimeOutMs);
+    }
+
+    /**
+     * Returns map of stream id to component id to grouper
+     */
+    private Map<String, Map<String, LoadAwareCustomStreamGrouping>> outboundComponents(
+            WorkerTopologyContext workerTopologyContext, String componentId, Map stormConf) {
+        Map<String, Map<String, LoadAwareCustomStreamGrouping>> ret = new HashMap<>();
+
+        Map<String, Map<String, Grouping>> outputGroupings = workerTopologyContext.getTargets(componentId);
+        for (Map.Entry<String, Map<String, Grouping>> entry : outputGroupings.entrySet()) {
+            String streamId = entry.getKey();
+            Map<String, Grouping> componentGrouping = entry.getValue();
+            Fields outFields = workerTopologyContext.getComponentOutputFields(componentId, streamId);
+            Map<String, LoadAwareCustomStreamGrouping> componentGrouper = new HashMap<String, LoadAwareCustomStreamGrouping>();
+            for (Map.Entry<String, Grouping> cg : componentGrouping.entrySet()) {
+                String component = cg.getKey();
+                Grouping grouping = cg.getValue();
+                List<Integer> outTasks = workerTopologyContext.getComponentTasks(component);
+                LoadAwareCustomStreamGrouping grouper = GrouperFactory.mkGrouper(
+                        workerTopologyContext, componentId, streamId, outFields, grouping, outTasks, stormConf);
+                componentGrouper.put(component, grouper);
+            }
+            if (componentGrouper.size() > 0) {
+                ret.put(streamId, componentGrouper);
+            }
+        }
+
+        for (String stream : workerTopologyContext.getComponentCommon(componentId).get_streams().keySet()) {
+            if (!ret.containsKey(stream)) {
+                ret.put(stream, null);
+            }
+        }
+
+        return ret;
+    }
+
+    private Map normalizedComponentConf(Map stormConf, WorkerTopologyContext topologyContext, String componentId) {
+        List<Object> keysToRemove = ConfigUtils.All_CONFIGS();
+        keysToRemove.remove(Config.TOPOLOGY_DEBUG);
+        keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING);
+        keysToRemove.remove(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
+        keysToRemove.remove(Config.TOPOLOGY_TRANSACTIONAL_ID);
+        keysToRemove.remove(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+        keysToRemove.remove(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS);
+        keysToRemove.remove(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME);
+        keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER);
+        keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER_CONFIG);
+        keysToRemove.remove(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
+
+        Map<Object, Object> componentConf;
+        String specJsonConf = topologyContext.getComponentCommon(componentId).get_json_conf();
+        if (specJsonConf != null) {
+            componentConf = (Map<Object, Object>) JSONValue.parse(specJsonConf);
+            for (Object p : keysToRemove) {
+                componentConf.remove(p);
+            }
+        } else {
+            componentConf = new HashMap<>();
+        }
+
+        Map<Object, Object> ret = new HashMap<>();
+        ret.putAll(stormConf);
+        ret.putAll(componentConf);
+
+        return ret;
+    }
+
+    // =============================================================================
+    // ============================ getter methods =================================
+    // =============================================================================
+
+    public List<Long> getExecutorId() {
+        return executorId;
+    }
+
+    public List<Integer> getTaskIds() {
+        return taskIds;
+    }
+
+    public String getComponentId() {
+        return componentId;
+    }
+
+    public AtomicBoolean getOpenOrPrepareWasCalled() {
+        return openOrPrepareWasCalled;
+    }
+
+    public Map getStormConf() {
+        return stormConf;
+    }
+
+    public String getStormId() {
+        return stormId;
+    }
+
+    public CommonStats getStats() {
+        return stats;
+    }
+
+    public AtomicBoolean getThrottleOn() {
+        return throttleOn;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public Boolean getIsDebug() {
+        return isDebug;
+    }
+
+    public ExecutorTransfer getExecutorTransfer() {
+        return executorTransfer;
+    }
+
+    public IReportError getReportError() {
+        return reportError;
+    }
+
+    public WorkerTopologyContext getWorkerTopologyContext() {
+        return workerTopologyContext;
+    }
+
+    public Callable<Boolean> getSampler() {
+        return sampler;
+    }
+
+    public AtomicReference<Map<String, DebugOptions>> getStormComponentDebug() {
+        return stormComponentDebug;
+    }
+
+    public DisruptorQueue getReceiveQueue() {
+        return receiveQueue;
+    }
+
+    public AtomicBoolean getBackpressure() {
+        return backpressure;
+    }
+
+    public DisruptorQueue getTransferWorkerQueue() {
+        return transferQueue;
+    }
+
+    public IStormClusterState getStormClusterState() {
+        return stormClusterState;
+    }
+
+    public Map getWorkerData() {
+        return workerData;
+    }
+
+    public Map<String, Map<String, LoadAwareCustomStreamGrouping>> getStreamToComponentToGrouper() {
+        return streamToComponentToGrouper;
+    }
+
+    public HashMap getSharedExecutorData() {
+        return sharedExecutorData;
+    }
+
+    public Map<Integer, Map<Integer, Map<String, IMetric>>> getIntervalToTaskToMetricToRegistry() {
+        return intervalToTaskToMetricToRegistry;
+    }
+
+    public IFn getTransferFn() {
+        return transferFn;
+    }
+
+    @VisibleForTesting
+    public void setLocalExecutorTransfer(ExecutorTransfer executorTransfer) {
+        this.executorTransfer = executorTransfer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
new file mode 100644
index 0000000..795a33c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.Constants;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ExecutorShutdown.class);
+    private final Executor executor;
+    private final List<Utils.SmartThread> threads;
+    private final Map<Integer, Task> taskDatas;
+
+    public ExecutorShutdown(Executor executor, List<Utils.SmartThread> threads, Map<Integer, Task> taskDatas) {
+        this.executor = executor;
+        this.threads = threads;
+        this.taskDatas = taskDatas;
+    }
+
+    @Override
+    public ExecutorStats renderStats() {
+        return executor.getStats().renderStats();
+    }
+
+    @Override
+    public List<Long> getExecutorId() {
+        return executor.getExecutorId();
+    }
+
+    @Override
+    public void credenetialsChanged(Credentials credentials) {
+        TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), new Values(credentials), (int) Constants.SYSTEM_TASK_ID,
+                Constants.CREDENTIALS_CHANGED_STREAM_ID);
+        List<AddressedTuple> addressedTuple = Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+        executor.getReceiveQueue().publish(addressedTuple);
+    }
+
+    @Override
+    public boolean getBackPressureFlag() {
+        return executor.getBackpressure().get();
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            LOG.info("Shutting down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
+            executor.getReceiveQueue().haltWithInterrupt();
+            executor.getTransferWorkerQueue().haltWithInterrupt();
+            for (Utils.SmartThread t : threads) {
+                t.interrupt();
+                t.join();
+            }
+            executor.getStats().cleanupStats();
+            for (Task task : taskDatas.values()) {
+                TopologyContext userContext = task.getUserContext();
+                for (ITaskHook hook : userContext.getHooks()) {
+                    hook.cleanup();
+                }
+            }
+            executor.getStormClusterState().disconnect();
+            if (executor.getOpenOrPrepareWasCalled().get()) {
+                for (Task task : taskDatas.values()) {
+                    Object object = task.getTaskObject();
+                    if (object instanceof ISpout) {
+                        ((ISpout) object).close();
+                    } else if (object instanceof IBolt) {
+                        ((IBolt) object).cleanup();
+                    } else {
+                        LOG.error("unknown component object");
+                    }
+                }
+            }
+            LOG.info("Shut down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/ExecutorTransfer.java b/storm-core/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
new file mode 100644
index 0000000..8707319
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.lmax.disruptor.EventHandler;
+import org.apache.storm.Config;
+import org.apache.storm.serialization.KryoTupleSerializer;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableObject;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class ExecutorTransfer implements EventHandler, Callable {
+    private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
+
+    private final DisruptorQueue batchTransferQueue;
+    private final KryoTupleSerializer serializer;
+    private final MutableObject cachedEmit;
+    private final IFn transferFn;
+    private final boolean isDebug;
+
+    public ExecutorTransfer(WorkerTopologyContext workerTopologyContext, DisruptorQueue batchTransferQueue, Map stormConf, IFn transferFn) {
+        this.batchTransferQueue = batchTransferQueue;
+        this.serializer = new KryoTupleSerializer(stormConf, workerTopologyContext);
+        this.cachedEmit = new MutableObject(new ArrayList<>());
+        this.transferFn = transferFn;
+        this.isDebug = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
+    }
+
+    public void transfer(int task, Tuple tuple) {
+        AddressedTuple val = new AddressedTuple(task, tuple);
+        if (isDebug) {
+            LOG.info("TRANSFERRING tuple {}", val);
+        }
+        batchTransferQueue.publish(val);
+    }
+
+    @VisibleForTesting
+    public DisruptorQueue getBatchTransferQueue() {
+        return this.batchTransferQueue;
+    }
+
+    @Override
+    public Object call() throws Exception {
+        batchTransferQueue.consumeBatchWhenAvailable(this);
+        return 0L;
+    }
+
+    public String getName() {
+        return batchTransferQueue.getName();
+    }
+
+    @Override
+    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
+        ArrayList cachedEvents = (ArrayList) cachedEmit.getObject();
+        cachedEvents.add(event);
+        if (endOfBatch) {
+            transferFn.invoke(serializer, cachedEvents);
+            cachedEmit.setObject(new ArrayList<>());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/IRunningExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/IRunningExecutor.java b/storm-core/src/jvm/org/apache/storm/executor/IRunningExecutor.java
new file mode 100644
index 0000000..43d297d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/IRunningExecutor.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ExecutorStats;
+
+import java.util.List;
+
+public interface IRunningExecutor {
+
+    ExecutorStats renderStats();
+    List<Long> getExecutorId();
+    void credenetialsChanged(Credentials credentials);
+    boolean getBackPressureFlag();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/TupleInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/TupleInfo.java b/storm-core/src/jvm/org/apache/storm/executor/TupleInfo.java
new file mode 100644
index 0000000..4b6d0fa
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/TupleInfo.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class TupleInfo implements Serializable {
+
+    private static final long serialVersionUID = -3348670497595864118L;
+
+    private int taskId;
+    private Object messageId;
+    private String stream;
+    private List<Object> values;
+    private long timestamp;
+    private String id;
+
+    public Object getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(Object messageId) {
+        this.messageId = messageId;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public String getStream() {
+        return stream;
+    }
+
+    public void setStream(String stream) {
+        this.stream = stream;
+    }
+
+    public List<Object> getValues() {
+        return values;
+    }
+
+    public void setValues(List<Object> values) {
+        this.values = values;
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this,
+                ToStringStyle.SHORT_PREFIX_STYLE);
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(int taskId) {
+        this.taskId = taskId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
new file mode 100644
index 0000000..2fceb28
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import clojure.lang.Atom;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class BoltExecutor extends Executor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class);
+
+    private final Callable<Boolean> executeSampler;
+
+    public BoltExecutor(Map workerData, List<Long> executorId, Map<String, String> credentials) {
+        super(workerData, executorId, credentials);
+        this.executeSampler = ConfigUtils.mkStatsSampler(stormConf);
+    }
+
+    @Override
+    public void init(Map<Integer, Task> idToTask) {
+        this.idToTask = idToTask;
+        LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet());
+        for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
+            Task taskData = entry.getValue();
+            IBolt boltObject = (IBolt) taskData.getTaskObject();
+            TopologyContext userContext = taskData.getUserContext();
+            taskData.getBuiltInMetrics().registerAll(stormConf, userContext);
+            if (boltObject instanceof ICredentialsListener) {
+                ((ICredentialsListener) boltObject).setCredentials(credentials);
+            }
+            if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
+                Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue,
+                        "transfer", (DisruptorQueue) workerData.get("transfer-queue"));
+                BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext);
+
+                Map cachedNodePortToSocket = (Map) ((Atom) workerData.get("cached-node+port->socket")).deref();
+                BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, stormConf, userContext);
+                BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.get("receiver"), stormConf, userContext);
+            } else {
+                Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+                BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext);
+            }
+
+            IOutputCollector outputCollector = new BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, hasEventLoggers, isDebug);
+            boltObject.prepare(stormConf, userContext, new OutputCollector(outputCollector));
+        }
+        openOrPrepareWasCalled.set(true);
+        LOG.info("Prepared bolt {}:{}", componentId, idToTask.keySet());
+        setupMetrics();
+    }
+
+    @Override
+    public Callable<Object> call() throws Exception {
+        while (!stormActive.get()) {
+            Utils.sleep(100);
+        }
+        return new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                receiveQueue.consumeBatchWhenAvailable(BoltExecutor.this);
+                return 0L;
+            }
+        };
+    }
+
+    @Override
+    public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
+        String streamId = tuple.getSourceStreamId();
+        if (Constants.CREDENTIALS_CHANGED_STREAM_ID.equals(streamId)) {
+            Object taskObject = idToTask.get(taskId).getTaskObject();
+            if (taskObject instanceof ICredentialsListener) {
+                ((ICredentialsListener) taskObject).setCredentials((Map<String, String>) tuple.getValue(0));
+            }
+        } else if (Constants.METRICS_TICK_STREAM_ID.equals(streamId)) {
+            metricsTick(idToTask.get(taskId), tuple);
+        } else {
+            IBolt boltObject = (IBolt) idToTask.get(taskId).getTaskObject();
+            boolean isSampled = sampler.call();
+            boolean isExecuteSampler = executeSampler.call();
+            Long now = (isSampled || isExecuteSampler) ? System.currentTimeMillis() : null;
+            if (isSampled) {
+                tuple.setProcessSampleStartTime(now);
+            }
+            if (isExecuteSampler) {
+                tuple.setExecuteSampleStartTime(now);
+            }
+            boltObject.execute(tuple);
+
+            Long ms = tuple.getExecuteSampleStartTime();
+            long delta = (ms != null) ? Time.deltaMs(ms) : 0;
+            if (isDebug) {
+                LOG.info("Execute done TUPLE {} TASK: {} DELTA: {}", tuple, taskId, delta);
+            }
+            new BoltExecuteInfo(tuple, taskId, delta).applyOn(idToTask.get(taskId).getUserContext());
+            if (delta != 0) {
+                ((BoltExecutorStats) stats).boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
new file mode 100644
index 0000000..34de025
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BoltOutputCollectorImpl implements IOutputCollector {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
+
+    private final BoltExecutor executor;
+    private final Task taskData;
+    private final int taskId;
+    private final Random random;
+    private final boolean isEventLoggers;
+    private final boolean isDebug;
+
+    public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random,
+                                   boolean isEventLoggers, boolean isDebug) {
+        this.executor = executor;
+        this.taskData = taskData;
+        this.taskId = taskId;
+        this.random = random;
+        this.isEventLoggers = isEventLoggers;
+        this.isDebug = isDebug;
+    }
+
+    public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+        return boltEmit(streamId, anchors, tuple, null);
+    }
+
+    @Override
+    public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+        boltEmit(streamId, anchors, tuple, taskId);
+    }
+
+    private List<Integer> boltEmit(String streamId, Collection<Tuple> anchors, List<Object> values, Integer targetTaskId) {
+        List<Integer> outTasks;
+        if (targetTaskId != null) {
+            outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values);
+        } else {
+            outTasks = taskData.getOutgoingTasks(streamId, values);
+        }
+
+        for (Integer t : outTasks) {
+            Map<Long, Long> anchorsToIds = new HashMap<>();
+            if (anchors != null) {
+                for (Tuple a : anchors) {
+                    Set<Long> rootIds = a.getMessageId().getAnchorsToIds().keySet();
+                    if (rootIds.size() > 0) {
+                        long edgeId = MessageId.generateId(random);
+                        ((TupleImpl) a).updateAckVal(edgeId);
+                        for (Long root_id : rootIds) {
+                            putXor(anchorsToIds, root_id, edgeId);
+                        }
+                    }
+                }
+            }
+            MessageId msgId = MessageId.makeId(anchorsToIds);
+            TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
+            executor.getExecutorTransfer().transfer(t, tupleExt);
+        }
+        if (isEventLoggers) {
+            executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random);
+        }
+        return outTasks;
+    }
+
+    @Override
+    public void ack(Tuple input) {
+        long ackValue = ((TupleImpl) input).getAckVal();
+        Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds();
+        for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
+            executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID,
+                    new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
+                    executor.getExecutorTransfer());
+        }
+        long delta = tupleTimeDelta((TupleImpl) input);
+        if (isDebug) {
+            LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
+        }
+        BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
+        boltAckInfo.applyOn(taskData.getUserContext());
+        if (delta != 0) {
+            ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
+                    input.getSourceComponent(), input.getSourceStreamId(), delta);
+        }
+    }
+
+    @Override
+    public void fail(Tuple input) {
+        Set<Long> roots = input.getMessageId().getAnchors();
+        for (Long root : roots) {
+            executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
+                    new Values(root), executor.getExecutorTransfer());
+        }
+        long delta = tupleTimeDelta((TupleImpl) input);
+        if (isDebug) {
+            LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
+        }
+        BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
+        boltFailInfo.applyOn(taskData.getUserContext());
+        if (delta != 0) {
+            ((BoltExecutorStats) executor.getStats()).boltFailedTuple(
+                    input.getSourceComponent(), input.getSourceStreamId(), delta);
+        }
+    }
+
+    @Override
+    public void resetTimeout(Tuple input) {
+        Set<Long> roots = input.getMessageId().getAnchors();
+        for (Long root : roots) {
+            executor.sendUnanchored(taskData, Acker.ACKER_RESET_TIMEOUT_STREAM_ID,
+                    new Values(root), executor.getExecutorTransfer());
+        }
+    }
+
+    @Override
+    public void reportError(Throwable error) {
+        executor.getReportError().report(error);
+    }
+
+    private long tupleTimeDelta(TupleImpl tuple) {
+        Long ms = tuple.getProcessSampleStartTime();
+        if (ms != null)
+            return Time.deltaMs(ms);
+        return 0;
+    }
+
+    private void putXor(Map<Long, Long> pending, Long key, Long id) {
+        Long curr = pending.get(key);
+        if (curr == null) {
+            curr = 0l;
+        }
+        pending.put(key, Utils.bitXor(curr, id));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/error/IReportError.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/error/IReportError.java b/storm-core/src/jvm/org/apache/storm/executor/error/IReportError.java
new file mode 100644
index 0000000..73451f8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/error/IReportError.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+public interface IReportError {
+    void report(Throwable error);
+}


[4/5] storm git commit: Merge branch 'STORM-1277'

Posted by ka...@apache.org.
Merge branch 'STORM-1277'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7d450f3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7d450f3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7d450f3

Branch: refs/heads/master
Commit: c7d450f3fa871ff35452ac72b011e86c34446264
Parents: 44068c4 a5e19d9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Aug 8 17:14:48 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Aug 8 17:14:48 2016 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/common.clj  |  18 +-
 .../clj/org/apache/storm/daemon/executor.clj    | 839 -------------------
 .../org/apache/storm/daemon/local_executor.clj  |  42 +
 .../src/clj/org/apache/storm/daemon/worker.clj  |  70 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  16 +-
 .../src/jvm/org/apache/storm/Constants.java     |  24 +-
 .../org/apache/storm/daemon/StormCommon.java    | 175 ++--
 .../src/jvm/org/apache/storm/daemon/Task.java   |  76 +-
 .../jvm/org/apache/storm/executor/Executor.java | 576 +++++++++++++
 .../apache/storm/executor/ExecutorShutdown.java | 111 +++
 .../apache/storm/executor/ExecutorTransfer.java |  87 ++
 .../apache/storm/executor/IRunningExecutor.java |  31 +
 .../org/apache/storm/executor/TupleInfo.java    |  90 ++
 .../storm/executor/bolt/BoltExecutor.java       | 138 +++
 .../executor/bolt/BoltOutputCollectorImpl.java  | 171 ++++
 .../storm/executor/error/IReportError.java      |  22 +
 .../storm/executor/error/ReportError.java       |  76 ++
 .../storm/executor/error/ReportErrorAndDie.java |  47 ++
 .../storm/executor/spout/SpoutExecutor.java     | 255 ++++++
 .../spout/SpoutOutputCollectorImpl.java         | 147 ++++
 .../apache/storm/stats/BoltExecutorStats.java   |   1 +
 .../jvm/org/apache/storm/stats/CommonStats.java |   5 +-
 .../apache/storm/stats/SpoutExecutorStats.java  |   1 +
 .../org/apache/storm/task/TopologyContext.java  |  32 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  29 +
 .../storm/utils/WorkerBackpressureThread.java   |  11 +-
 .../org/apache/storm/integration_test.clj       |  11 +-
 .../test/clj/org/apache/storm/grouping_test.clj |   2 +-
 28 files changed, 2077 insertions(+), 1026 deletions(-)
----------------------------------------------------------------------