You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:27 UTC

[38/52] [partial] storm git commit: STORM-2441 Break down 'storm-core' to extract client (worker) artifacts

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
new file mode 100644
index 0000000..97b16d8
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class BoltExecutor extends Executor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class);
+
+    private final Callable<Boolean> executeSampler;
+
+    public BoltExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
+        super(workerData, executorId, credentials);
+        this.executeSampler = ConfigUtils.mkStatsSampler(stormConf);
+    }
+
+    public void init(Map<Integer, Task> idToTask) {
+        while (!stormActive.get()) {
+            Utils.sleep(100);
+        }
+
+        LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet());
+        for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
+            Task taskData = entry.getValue();
+            IBolt boltObject = (IBolt) taskData.getTaskObject();
+            TopologyContext userContext = taskData.getUserContext();
+            taskData.getBuiltInMetrics().registerAll(stormConf, userContext);
+            if (boltObject instanceof ICredentialsListener) {
+                ((ICredentialsListener) boltObject).setCredentials(credentials);
+            }
+            if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
+                Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue,
+                        "transfer", workerData.getTransferQueue());
+                BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext);
+
+                Map cachedNodePortToSocket = (Map) workerData.getCachedNodeToPortSocket().get();
+                BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, stormConf, userContext);
+                BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), stormConf, userContext);
+            } else {
+                Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+                BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext);
+            }
+
+            IOutputCollector outputCollector = new BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, hasEventLoggers, isDebug);
+            boltObject.prepare(stormConf, userContext, new OutputCollector(outputCollector));
+        }
+        openOrPrepareWasCalled.set(true);
+        LOG.info("Prepared bolt {}:{}", componentId, idToTask.keySet());
+        setupMetrics();
+    }
+
+    @Override
+    public Callable<Object> call() throws Exception {
+        init(idToTask);
+
+        return new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                receiveQueue.consumeBatchWhenAvailable(BoltExecutor.this);
+                return 0L;
+            }
+        };
+    }
+
+    @Override
+    public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
+        String streamId = tuple.getSourceStreamId();
+        if (Constants.CREDENTIALS_CHANGED_STREAM_ID.equals(streamId)) {
+            Object taskObject = idToTask.get(taskId).getTaskObject();
+            if (taskObject instanceof ICredentialsListener) {
+                ((ICredentialsListener) taskObject).setCredentials((Map<String, String>) tuple.getValue(0));
+            }
+        } else if (Constants.METRICS_TICK_STREAM_ID.equals(streamId)) {
+            metricsTick(idToTask.get(taskId), tuple);
+        } else {
+            IBolt boltObject = (IBolt) idToTask.get(taskId).getTaskObject();
+            boolean isSampled = sampler.call();
+            boolean isExecuteSampler = executeSampler.call();
+            Long now = (isSampled || isExecuteSampler) ? System.currentTimeMillis() : null;
+            if (isSampled) {
+                tuple.setProcessSampleStartTime(now);
+            }
+            if (isExecuteSampler) {
+                tuple.setExecuteSampleStartTime(now);
+            }
+            boltObject.execute(tuple);
+
+            Long ms = tuple.getExecuteSampleStartTime();
+            long delta = (ms != null) ? Time.deltaMs(ms) : 0;
+            if (isDebug) {
+                LOG.info("Execute done TUPLE {} TASK: {} DELTA: {}", tuple, taskId, delta);
+            }
+            new BoltExecuteInfo(tuple, taskId, delta).applyOn(idToTask.get(taskId).getUserContext());
+            if (delta != 0) {
+                ((BoltExecutorStats) stats).boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
new file mode 100644
index 0000000..c490c3d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BoltOutputCollectorImpl implements IOutputCollector {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
+
+    private final BoltExecutor executor;
+    private final Task taskData;
+    private final int taskId;
+    private final Random random;
+    private final boolean isEventLoggers;
+    private final boolean isDebug;
+
+    public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random,
+                                   boolean isEventLoggers, boolean isDebug) {
+        this.executor = executor;
+        this.taskData = taskData;
+        this.taskId = taskId;
+        this.random = random;
+        this.isEventLoggers = isEventLoggers;
+        this.isDebug = isDebug;
+    }
+
+    public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+        return boltEmit(streamId, anchors, tuple, null);
+    }
+
+    @Override
+    public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+        boltEmit(streamId, anchors, tuple, taskId);
+    }
+
+    private List<Integer> boltEmit(String streamId, Collection<Tuple> anchors, List<Object> values, Integer targetTaskId) {
+        List<Integer> outTasks;
+        if (targetTaskId != null) {
+            outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values);
+        } else {
+            outTasks = taskData.getOutgoingTasks(streamId, values);
+        }
+
+        for (Integer t : outTasks) {
+            Map<Long, Long> anchorsToIds = new HashMap<>();
+            if (anchors != null) {
+                for (Tuple a : anchors) {
+                    Set<Long> rootIds = a.getMessageId().getAnchorsToIds().keySet();
+                    if (rootIds.size() > 0) {
+                        long edgeId = MessageId.generateId(random);
+                        ((TupleImpl) a).updateAckVal(edgeId);
+                        for (Long root_id : rootIds) {
+                            putXor(anchorsToIds, root_id, edgeId);
+                        }
+                    }
+                }
+            }
+            MessageId msgId = MessageId.makeId(anchorsToIds);
+            TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
+            executor.getExecutorTransfer().transfer(t, tupleExt);
+        }
+        if (isEventLoggers) {
+            executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random);
+        }
+        return outTasks;
+    }
+
+    @Override
+    public void ack(Tuple input) {
+        long ackValue = ((TupleImpl) input).getAckVal();
+        Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds();
+        for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
+            executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID,
+                    new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
+                    executor.getExecutorTransfer());
+        }
+        long delta = tupleTimeDelta((TupleImpl) input);
+        if (isDebug) {
+            LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
+        }
+        BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
+        boltAckInfo.applyOn(taskData.getUserContext());
+        if (delta != 0) {
+            ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
+                    input.getSourceComponent(), input.getSourceStreamId(), delta);
+        }
+    }
+
+    @Override
+    public void fail(Tuple input) {
+        Set<Long> roots = input.getMessageId().getAnchors();
+        for (Long root : roots) {
+            executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
+                    new Values(root), executor.getExecutorTransfer());
+        }
+        long delta = tupleTimeDelta((TupleImpl) input);
+        if (isDebug) {
+            LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
+        }
+        BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
+        boltFailInfo.applyOn(taskData.getUserContext());
+        if (delta != 0) {
+            ((BoltExecutorStats) executor.getStats()).boltFailedTuple(
+                    input.getSourceComponent(), input.getSourceStreamId(), delta);
+        }
+    }
+
+    @Override
+    public void resetTimeout(Tuple input) {
+        Set<Long> roots = input.getMessageId().getAnchors();
+        for (Long root : roots) {
+            executor.sendUnanchored(taskData, Acker.ACKER_RESET_TIMEOUT_STREAM_ID,
+                    new Values(root), executor.getExecutorTransfer());
+        }
+    }
+
+    @Override
+    public void reportError(Throwable error) {
+        executor.getReportError().report(error);
+    }
+
+    private long tupleTimeDelta(TupleImpl tuple) {
+        Long ms = tuple.getProcessSampleStartTime();
+        if (ms != null)
+            return Time.deltaMs(ms);
+        return 0;
+    }
+
+    private void putXor(Map<Long, Long> pending, Long key, Long id) {
+        Long curr = pending.get(key);
+        if (curr == null) {
+            curr = 0l;
+        }
+        pending.put(key, Utils.bitXor(curr, id));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/error/IReportError.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/error/IReportError.java b/storm-client/src/jvm/org/apache/storm/executor/error/IReportError.java
new file mode 100644
index 0000000..73451f8
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/error/IReportError.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+public interface IReportError {
+    void report(Throwable error);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java b/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java
new file mode 100644
index 0000000..3272b85
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ReportError implements IReportError {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReportError.class);
+
+    private final Map stormConf;
+    private final IStormClusterState stormClusterState;
+    private final String stormId;
+    private final String componentId;
+    private final WorkerTopologyContext workerTopologyContext;
+
+    private int maxPerInterval;
+    private int errorIntervalSecs;
+    private AtomicInteger intervalStartTime;
+    private AtomicInteger intervalErrors;
+
+    public ReportError(Map stormConf, IStormClusterState stormClusterState, String stormId, String componentId, WorkerTopologyContext workerTopologyContext) {
+        this.stormConf = stormConf;
+        this.stormClusterState = stormClusterState;
+        this.stormId = stormId;
+        this.componentId = componentId;
+        this.workerTopologyContext = workerTopologyContext;
+        this.errorIntervalSecs = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS));
+        this.maxPerInterval = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL));
+        this.intervalStartTime = new AtomicInteger(Time.currentTimeSecs());
+        this.intervalErrors = new AtomicInteger(0);
+    }
+
+    @Override
+    public void report(Throwable error) {
+        LOG.error("Error", error);
+        if (Time.deltaSecs(intervalStartTime.get()) > errorIntervalSecs) {
+            intervalErrors.set(0);
+            intervalStartTime.set(Time.currentTimeSecs());
+        }
+        if (intervalErrors.incrementAndGet() <= maxPerInterval) {
+            try {
+                stormClusterState.reportError(stormId, componentId, Utils.hostname(),
+                        workerTopologyContext.getThisWorkerPort().longValue(), error);
+            } catch (UnknownHostException e) {
+                throw Utils.wrapInRuntime(e);
+            }
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java b/storm-client/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java
new file mode 100644
index 0000000..b2e1f34
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReportErrorAndDie implements Thread.UncaughtExceptionHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(ReportErrorAndDie.class);
+    private final IReportError reportError;
+    private final Runnable suicideFn;
+
+    public ReportErrorAndDie(IReportError reportError, Runnable suicideFn) {
+        this.reportError = reportError;
+        this.suicideFn = suicideFn;
+    }
+
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+        try {
+            reportError.report(e);
+        } catch (Exception ex) {
+            LOG.error("Error while reporting error to cluster, proceeding with shutdown", ex);
+        }
+        if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)
+                || (Utils.exceptionCauseIsInstanceOf(java.io.InterruptedIOException.class, e)
+                    && !Utils.exceptionCauseIsInstanceOf(java.net.SocketTimeoutException.class, e))) {
+            LOG.info("Got interrupted exception shutting thread down...");
+        } else {
+            suicideFn.run();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
new file mode 100644
index 0000000..9e7622c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.concurrent.Callable;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);
+
+    private final ISpoutWaitStrategy spoutWaitStrategy;
+    private Integer maxSpoutPending;
+    private final AtomicBoolean lastActive;
+    private List<ISpout> spouts;
+    private List<SpoutOutputCollector> outputCollectors;
+    private final MutableLong emittedCount;
+    private final MutableLong emptyEmitStreak;
+    private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+    private final boolean hasAckers;
+    private RotatingMap<Long, TupleInfo> pending;
+    private final boolean backPressureEnabled;
+
+    public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) {
+        super(workerData, executorId, credentials);
+        this.spoutWaitStrategy = ReflectionUtils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+        this.spoutWaitStrategy.prepare(stormConf);
+
+        this.backPressureEnabled = ObjectReader.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+        this.lastActive = new AtomicBoolean(false);
+        this.hasAckers = StormCommon.hasAckers(stormConf);
+        this.emittedCount = new MutableLong(0);
+        this.emptyEmitStreak = new MutableLong(0);
+        this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+    }
+
+    public void init(final Map<Integer, Task> idToTask) {
+        while (!stormActive.get()) {
+            Utils.sleep(100);
+        }
+
+        LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+        this.idToTask = idToTask;
+        this.maxSpoutPending = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
+        this.spouts = new ArrayList<>();
+        for (Task task : idToTask.values()) {
+            this.spouts.add((ISpout) task.getTaskObject());
+        }
+        this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() {
+            @Override
+            public void expire(Long key, TupleInfo tupleInfo) {
+                Long timeDelta = null;
+                if (tupleInfo.getTimestamp() != 0) {
+                    timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+                }
+                failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId()), timeDelta, tupleInfo, "TIMEOUT");
+            }
+        });
+
+        this.spoutThrottlingMetrics.registerAll(stormConf, idToTask.values().iterator().next().getUserContext());
+        this.outputCollectors = new ArrayList<>();
+        for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
+            Task taskData = entry.getValue();
+            ISpout spoutObject = (ISpout) taskData.getTaskObject();
+            SpoutOutputCollectorImpl spoutOutputCollector = new SpoutOutputCollectorImpl(
+                    spoutObject, this, taskData, entry.getKey(), emittedCount,
+                    hasAckers, rand, hasEventLoggers, isDebug, pending);
+            SpoutOutputCollector outputCollector = new SpoutOutputCollector(spoutOutputCollector);
+            this.outputCollectors.add(outputCollector);
+
+            taskData.getBuiltInMetrics().registerAll(stormConf, taskData.getUserContext());
+            Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+            BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, taskData.getUserContext());
+
+            if (spoutObject instanceof ICredentialsListener) {
+                ((ICredentialsListener) spoutObject).setCredentials(credentials);
+            }
+            spoutObject.open(stormConf, taskData.getUserContext(), outputCollector);
+        }
+        openOrPrepareWasCalled.set(true);
+        LOG.info("Opened spout {}:{}", componentId, idToTask.keySet());
+        setupMetrics();
+    }
+
+    @Override
+    public Callable<Object> call() throws Exception {
+        init(idToTask);
+
+        return new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                receiveQueue.consumeBatch(SpoutExecutor.this);
+
+                long currCount = emittedCount.get();
+                boolean throttleOn = backPressureEnabled && SpoutExecutor.this.throttleOn.get();
+                boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
+                boolean isActive = stormActive.get();
+                if (isActive) {
+                    if (!lastActive.get()) {
+                        lastActive.set(true);
+                        LOG.info("Activating spout {}:{}", componentId, idToTask.keySet());
+                        for (ISpout spout : spouts) {
+                            spout.activate();
+                        }
+                    }
+                    if (!transferQueue.isFull() && !throttleOn && !reachedMaxSpoutPending) {
+                        for (ISpout spout : spouts) {
+                            spout.nextTuple();
+                        }
+                    }
+                } else {
+                    if (lastActive.get()) {
+                        lastActive.set(false);
+                        LOG.info("Deactivating spout {}:{}", componentId, idToTask.keySet());
+                        for (ISpout spout : spouts) {
+                            spout.deactivate();
+                        }
+                    }
+                    Time.sleep(100);
+                    spoutThrottlingMetrics.skippedInactive(stats);
+                }
+                if (currCount == emittedCount.get() && isActive) {
+                    emptyEmitStreak.increment();
+                    spoutWaitStrategy.emptyEmit(emptyEmitStreak.get());
+                    if (throttleOn) {
+                        spoutThrottlingMetrics.skippedThrottle(stats);
+                    } else if (reachedMaxSpoutPending) {
+                        spoutThrottlingMetrics.skippedMaxSpout(stats);
+                    }
+                } else {
+                    emptyEmitStreak.set(0);
+                }
+                return 0L;
+            }
+        };
+    }
+
+    @Override
+    public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
+        String streamId = tuple.getSourceStreamId();
+        if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+            pending.rotate();
+        } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
+            metricsTick(idToTask.get(taskId), tuple);
+        } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
+            Object spoutObj = idToTask.get(taskId).getTaskObject();
+            if (spoutObj instanceof ICredentialsListener) {
+                ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
+            }
+        } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
+            Long id = (Long) tuple.getValue(0);
+            TupleInfo pendingForId = pending.get(id);
+            if (pendingForId != null) {
+                pending.put(id, pendingForId);
+            }
+        } else {
+            Long id = (Long) tuple.getValue(0);
+            Long timeDeltaMs = (Long) tuple.getValue(1);
+            TupleInfo tupleInfo = (TupleInfo) pending.remove(id);
+            if (tupleInfo != null && tupleInfo.getMessageId() != null) {
+                if (taskId != tupleInfo.getTaskId()) {
+                    throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
+                }
+                long startTimeMs = tupleInfo.getTimestamp();
+                Long timeDelta = null;
+                if (startTimeMs != 0) {
+                    timeDelta = timeDeltaMs;
+                }
+                if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
+                    ackSpoutMsg(this, idToTask.get(taskId), timeDelta, tupleInfo);
+                } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
+                    failSpoutMsg(this, idToTask.get(taskId), timeDelta, tupleInfo, "FAIL-STREAM");
+                }
+            }
+        }
+    }
+
+    public void ackSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
+        try {
+            ISpout spout = (ISpout) taskData.getTaskObject();
+            int taskId = taskData.getTaskId();
+            if (executor.getIsDebug()) {
+                LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId());
+            }
+            spout.ack(tupleInfo.getMessageId());
+            new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
+            if (timeDelta != null) {
+                ((SpoutExecutorStats) executor.getStats()).spoutAckedTuple(tupleInfo.getStream(), timeDelta);
+            }
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    public void failSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
+        try {
+            ISpout spout = (ISpout) taskData.getTaskObject();
+            int taskId = taskData.getTaskId();
+            if (executor.getIsDebug()) {
+                LOG.info("SPOUT Failing {} : {} REASON: {}", tupleInfo.getId(), tupleInfo, reason);
+            }
+            spout.fail(tupleInfo.getMessageId());
+            new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
+            if (timeDelta != null) {
+                ((SpoutExecutorStats) executor.getStats()).spoutFailedTuple(tupleInfo.getStream(), timeDelta);
+            }
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
new file mode 100644
index 0000000..f81b2c2
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
+
+    private final SpoutExecutor executor;
+    private final Task taskData;
+    private final int taskId;
+    private final MutableLong emittedCount;
+    private final boolean hasAckers;
+    private final Random random;
+    private final Boolean isEventLoggers;
+    private final Boolean isDebug;
+    private final RotatingMap<Long, TupleInfo> pending;
+
+    @SuppressWarnings("unused")
+    public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData, int taskId,
+                                    MutableLong emittedCount, boolean hasAckers, Random random,
+                                    Boolean isEventLoggers, Boolean isDebug, RotatingMap<Long, TupleInfo> pending) {
+        this.executor = executor;
+        this.taskData = taskData;
+        this.taskId = taskId;
+        this.emittedCount = emittedCount;
+        this.hasAckers = hasAckers;
+        this.random = random;
+        this.isEventLoggers = isEventLoggers;
+        this.isDebug = isDebug;
+        this.pending = pending;
+    }
+
+    @Override
+    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+        return sendSpoutMsg(streamId, tuple, messageId, null);
+    }
+
+    @Override
+    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
+        sendSpoutMsg(streamId, tuple, messageId, taskId);
+    }
+
+    @Override
+    public long getPendingCount() {
+        return pending.size();
+    }
+
+    @Override
+    public void reportError(Throwable error) {
+        executor.getReportError().report(error);
+    }
+
+    private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) {
+        emittedCount.increment();
+
+        List<Integer> outTasks;
+        if (outTaskId != null) {
+            outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
+        } else {
+            outTasks = taskData.getOutgoingTasks(stream, values);
+        }
+
+        List<Long> ackSeq = new ArrayList<>();
+        boolean needAck = (messageId != null) && hasAckers;
+
+        long rootId = MessageId.generateId(random);
+        for (Integer t : outTasks) {
+            MessageId msgId;
+            if (needAck) {
+                long as = MessageId.generateId(random);
+                msgId = MessageId.makeRootId(rootId, as);
+                ackSeq.add(as);
+            } else {
+                msgId = MessageId.makeUnanchored();
+            }
+
+            TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId);
+            executor.getExecutorTransfer().transfer(t, tuple);
+        }
+        if (isEventLoggers) {
+            executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), messageId, random);
+        }
+
+        boolean sample = false;
+        try {
+            sample = executor.getSampler().call();
+        } catch (Exception ignored) {
+        }
+        if (needAck) {
+            TupleInfo info = new TupleInfo();
+            info.setTaskId(this.taskId);
+            info.setStream(stream);
+            info.setMessageId(messageId);
+            if (isDebug) {
+                info.setValues(values);
+            }
+            if (sample) {
+                info.setTimestamp(System.currentTimeMillis());
+            }
+
+            pending.put(rootId, info);
+            List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
+            executor.sendUnanchored(taskData, Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer());
+        } else if (messageId != null) {
+            TupleInfo info = new TupleInfo();
+            info.setStream(stream);
+            info.setValues(values);
+            info.setMessageId(messageId);
+            info.setTimestamp(0);
+            Long timeDelta = sample ? 0L : null;
+            info.setId("0:");
+            executor.ackSpoutMsg(executor, taskData, timeDelta, info);
+        }
+
+        return outTasks;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/generated/AccessControl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/AccessControl.java b/storm-client/src/jvm/org/apache/storm/generated/AccessControl.java
new file mode 100644
index 0000000..c1c072a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/generated/AccessControl.java
@@ -0,0 +1,627 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)")
+public class AccessControl implements org.apache.thrift.TBase<AccessControl, AccessControl._Fields>, java.io.Serializable, Cloneable, Comparable<AccessControl> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AccessControl");
+
+  private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)1);
+  private static final org.apache.thrift.protocol.TField NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("name", org.apache.thrift.protocol.TType.STRING, (short)2);
+  private static final org.apache.thrift.protocol.TField ACCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("access", org.apache.thrift.protocol.TType.I32, (short)3);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new AccessControlStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new AccessControlTupleSchemeFactory());
+  }
+
+  private AccessControlType type; // required
+  private String name; // optional
+  private int access; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    /**
+     * 
+     * @see AccessControlType
+     */
+    TYPE((short)1, "type"),
+    NAME((short)2, "name"),
+    ACCESS((short)3, "access");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // TYPE
+          return TYPE;
+        case 2: // NAME
+          return NAME;
+        case 3: // ACCESS
+          return ACCESS;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __ACCESS_ISSET_ID = 0;
+  private byte __isset_bitfield = 0;
+  private static final _Fields optionals[] = {_Fields.NAME};
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.TYPE, new org.apache.thrift.meta_data.FieldMetaData("type", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, AccessControlType.class)));
+    tmpMap.put(_Fields.NAME, new org.apache.thrift.meta_data.FieldMetaData("name", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.ACCESS, new org.apache.thrift.meta_data.FieldMetaData("access", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(AccessControl.class, metaDataMap);
+  }
+
+  public AccessControl() {
+  }
+
+  public AccessControl(
+    AccessControlType type,
+    int access)
+  {
+    this();
+    this.type = type;
+    this.access = access;
+    set_access_isSet(true);
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public AccessControl(AccessControl other) {
+    __isset_bitfield = other.__isset_bitfield;
+    if (other.is_set_type()) {
+      this.type = other.type;
+    }
+    if (other.is_set_name()) {
+      this.name = other.name;
+    }
+    this.access = other.access;
+  }
+
+  public AccessControl deepCopy() {
+    return new AccessControl(this);
+  }
+
+  @Override
+  public void clear() {
+    this.type = null;
+    this.name = null;
+    set_access_isSet(false);
+    this.access = 0;
+  }
+
+  /**
+   * 
+   * @see AccessControlType
+   */
+  public AccessControlType get_type() {
+    return this.type;
+  }
+
+  /**
+   * 
+   * @see AccessControlType
+   */
+  public void set_type(AccessControlType type) {
+    this.type = type;
+  }
+
+  public void unset_type() {
+    this.type = null;
+  }
+
+  /** Returns true if field type is set (has been assigned a value) and false otherwise */
+  public boolean is_set_type() {
+    return this.type != null;
+  }
+
+  public void set_type_isSet(boolean value) {
+    if (!value) {
+      this.type = null;
+    }
+  }
+
+  public String get_name() {
+    return this.name;
+  }
+
+  public void set_name(String name) {
+    this.name = name;
+  }
+
+  public void unset_name() {
+    this.name = null;
+  }
+
+  /** Returns true if field name is set (has been assigned a value) and false otherwise */
+  public boolean is_set_name() {
+    return this.name != null;
+  }
+
+  public void set_name_isSet(boolean value) {
+    if (!value) {
+      this.name = null;
+    }
+  }
+
+  public int get_access() {
+    return this.access;
+  }
+
+  public void set_access(int access) {
+    this.access = access;
+    set_access_isSet(true);
+  }
+
+  public void unset_access() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __ACCESS_ISSET_ID);
+  }
+
+  /** Returns true if field access is set (has been assigned a value) and false otherwise */
+  public boolean is_set_access() {
+    return EncodingUtils.testBit(__isset_bitfield, __ACCESS_ISSET_ID);
+  }
+
+  public void set_access_isSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ACCESS_ISSET_ID, value);
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case TYPE:
+      if (value == null) {
+        unset_type();
+      } else {
+        set_type((AccessControlType)value);
+      }
+      break;
+
+    case NAME:
+      if (value == null) {
+        unset_name();
+      } else {
+        set_name((String)value);
+      }
+      break;
+
+    case ACCESS:
+      if (value == null) {
+        unset_access();
+      } else {
+        set_access((Integer)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case TYPE:
+      return get_type();
+
+    case NAME:
+      return get_name();
+
+    case ACCESS:
+      return get_access();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case TYPE:
+      return is_set_type();
+    case NAME:
+      return is_set_name();
+    case ACCESS:
+      return is_set_access();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof AccessControl)
+      return this.equals((AccessControl)that);
+    return false;
+  }
+
+  public boolean equals(AccessControl that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_type = true && this.is_set_type();
+    boolean that_present_type = true && that.is_set_type();
+    if (this_present_type || that_present_type) {
+      if (!(this_present_type && that_present_type))
+        return false;
+      if (!this.type.equals(that.type))
+        return false;
+    }
+
+    boolean this_present_name = true && this.is_set_name();
+    boolean that_present_name = true && that.is_set_name();
+    if (this_present_name || that_present_name) {
+      if (!(this_present_name && that_present_name))
+        return false;
+      if (!this.name.equals(that.name))
+        return false;
+    }
+
+    boolean this_present_access = true;
+    boolean that_present_access = true;
+    if (this_present_access || that_present_access) {
+      if (!(this_present_access && that_present_access))
+        return false;
+      if (this.access != that.access)
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    List<Object> list = new ArrayList<Object>();
+
+    boolean present_type = true && (is_set_type());
+    list.add(present_type);
+    if (present_type)
+      list.add(type.getValue());
+
+    boolean present_name = true && (is_set_name());
+    list.add(present_name);
+    if (present_name)
+      list.add(name);
+
+    boolean present_access = true;
+    list.add(present_access);
+    if (present_access)
+      list.add(access);
+
+    return list.hashCode();
+  }
+
+  @Override
+  public int compareTo(AccessControl other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = Boolean.valueOf(is_set_type()).compareTo(other.is_set_type());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_type()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.type, other.type);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_name()).compareTo(other.is_set_name());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_name()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.name, other.name);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_access()).compareTo(other.is_set_access());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_access()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.access, other.access);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("AccessControl(");
+    boolean first = true;
+
+    sb.append("type:");
+    if (this.type == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.type);
+    }
+    first = false;
+    if (is_set_name()) {
+      if (!first) sb.append(", ");
+      sb.append("name:");
+      if (this.name == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.name);
+      }
+      first = false;
+    }
+    if (!first) sb.append(", ");
+    sb.append("access:");
+    sb.append(this.access);
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (!is_set_type()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'type' is unset! Struct:" + toString());
+    }
+
+    if (!is_set_access()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'access' is unset! Struct:" + toString());
+    }
+
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class AccessControlStandardSchemeFactory implements SchemeFactory {
+    public AccessControlStandardScheme getScheme() {
+      return new AccessControlStandardScheme();
+    }
+  }
+
+  private static class AccessControlStandardScheme extends StandardScheme<AccessControl> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, AccessControl struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // TYPE
+            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+              struct.type = org.apache.storm.generated.AccessControlType.findByValue(iprot.readI32());
+              struct.set_type_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // NAME
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.name = iprot.readString();
+              struct.set_name_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 3: // ACCESS
+            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+              struct.access = iprot.readI32();
+              struct.set_access_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, AccessControl struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.type != null) {
+        oprot.writeFieldBegin(TYPE_FIELD_DESC);
+        oprot.writeI32(struct.type.getValue());
+        oprot.writeFieldEnd();
+      }
+      if (struct.name != null) {
+        if (struct.is_set_name()) {
+          oprot.writeFieldBegin(NAME_FIELD_DESC);
+          oprot.writeString(struct.name);
+          oprot.writeFieldEnd();
+        }
+      }
+      oprot.writeFieldBegin(ACCESS_FIELD_DESC);
+      oprot.writeI32(struct.access);
+      oprot.writeFieldEnd();
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class AccessControlTupleSchemeFactory implements SchemeFactory {
+    public AccessControlTupleScheme getScheme() {
+      return new AccessControlTupleScheme();
+    }
+  }
+
+  private static class AccessControlTupleScheme extends TupleScheme<AccessControl> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, AccessControl struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      oprot.writeI32(struct.type.getValue());
+      oprot.writeI32(struct.access);
+      BitSet optionals = new BitSet();
+      if (struct.is_set_name()) {
+        optionals.set(0);
+      }
+      oprot.writeBitSet(optionals, 1);
+      if (struct.is_set_name()) {
+        oprot.writeString(struct.name);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, AccessControl struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      struct.type = org.apache.storm.generated.AccessControlType.findByValue(iprot.readI32());
+      struct.set_type_isSet(true);
+      struct.access = iprot.readI32();
+      struct.set_access_isSet(true);
+      BitSet incoming = iprot.readBitSet(1);
+      if (incoming.get(0)) {
+        struct.name = iprot.readString();
+        struct.set_name_isSet(true);
+      }
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/generated/AccessControlType.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/AccessControlType.java b/storm-client/src/jvm/org/apache/storm/generated/AccessControlType.java
new file mode 100644
index 0000000..cb75b9b
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/generated/AccessControlType.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.storm.generated;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum AccessControlType implements org.apache.thrift.TEnum {
+  OTHER(1),
+  USER(2);
+
+  private final int value;
+
+  private AccessControlType(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  public static AccessControlType findByValue(int value) { 
+    switch (value) {
+      case 1:
+        return OTHER;
+      case 2:
+        return USER;
+      default:
+        return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/generated/AlreadyAliveException.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/AlreadyAliveException.java b/storm-client/src/jvm/org/apache/storm/generated/AlreadyAliveException.java
new file mode 100644
index 0000000..4e635fb
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/generated/AlreadyAliveException.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)")
+public class AlreadyAliveException extends TException implements org.apache.thrift.TBase<AlreadyAliveException, AlreadyAliveException._Fields>, java.io.Serializable, Cloneable, Comparable<AlreadyAliveException> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AlreadyAliveException");
+
+  private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.STRING, (short)1);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new AlreadyAliveExceptionStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new AlreadyAliveExceptionTupleSchemeFactory());
+  }
+
+  private String msg; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    MSG((short)1, "msg");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // MSG
+          return MSG;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.MSG, new org.apache.thrift.meta_data.FieldMetaData("msg", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(AlreadyAliveException.class, metaDataMap);
+  }
+
+  public AlreadyAliveException() {
+  }
+
+  public AlreadyAliveException(
+    String msg)
+  {
+    this();
+    this.msg = msg;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public AlreadyAliveException(AlreadyAliveException other) {
+    if (other.is_set_msg()) {
+      this.msg = other.msg;
+    }
+  }
+
+  public AlreadyAliveException deepCopy() {
+    return new AlreadyAliveException(this);
+  }
+
+  @Override
+  public void clear() {
+    this.msg = null;
+  }
+
+  public String get_msg() {
+    return this.msg;
+  }
+
+  public void set_msg(String msg) {
+    this.msg = msg;
+  }
+
+  public void unset_msg() {
+    this.msg = null;
+  }
+
+  /** Returns true if field msg is set (has been assigned a value) and false otherwise */
+  public boolean is_set_msg() {
+    return this.msg != null;
+  }
+
+  public void set_msg_isSet(boolean value) {
+    if (!value) {
+      this.msg = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case MSG:
+      if (value == null) {
+        unset_msg();
+      } else {
+        set_msg((String)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case MSG:
+      return get_msg();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case MSG:
+      return is_set_msg();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof AlreadyAliveException)
+      return this.equals((AlreadyAliveException)that);
+    return false;
+  }
+
+  public boolean equals(AlreadyAliveException that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_msg = true && this.is_set_msg();
+    boolean that_present_msg = true && that.is_set_msg();
+    if (this_present_msg || that_present_msg) {
+      if (!(this_present_msg && that_present_msg))
+        return false;
+      if (!this.msg.equals(that.msg))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    List<Object> list = new ArrayList<Object>();
+
+    boolean present_msg = true && (is_set_msg());
+    list.add(present_msg);
+    if (present_msg)
+      list.add(msg);
+
+    return list.hashCode();
+  }
+
+  @Override
+  public int compareTo(AlreadyAliveException other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = Boolean.valueOf(is_set_msg()).compareTo(other.is_set_msg());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_msg()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.msg, other.msg);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("AlreadyAliveException(");
+    boolean first = true;
+
+    sb.append("msg:");
+    if (this.msg == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.msg);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (!is_set_msg()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString());
+    }
+
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class AlreadyAliveExceptionStandardSchemeFactory implements SchemeFactory {
+    public AlreadyAliveExceptionStandardScheme getScheme() {
+      return new AlreadyAliveExceptionStandardScheme();
+    }
+  }
+
+  private static class AlreadyAliveExceptionStandardScheme extends StandardScheme<AlreadyAliveException> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // MSG
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.msg = iprot.readString();
+              struct.set_msg_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.msg != null) {
+        oprot.writeFieldBegin(MSG_FIELD_DESC);
+        oprot.writeString(struct.msg);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class AlreadyAliveExceptionTupleSchemeFactory implements SchemeFactory {
+    public AlreadyAliveExceptionTupleScheme getScheme() {
+      return new AlreadyAliveExceptionTupleScheme();
+    }
+  }
+
+  private static class AlreadyAliveExceptionTupleScheme extends TupleScheme<AlreadyAliveException> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      oprot.writeString(struct.msg);
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      struct.msg = iprot.readString();
+      struct.set_msg_isSet(true);
+    }
+  }
+
+}
+