You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:27 UTC
[38/52] [partial] storm git commit: STORM-2441 Break down
'storm-core' to extract client (worker) artifacts
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
new file mode 100644
index 0000000..97b16d8
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class BoltExecutor extends Executor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class);
+
+ private final Callable<Boolean> executeSampler;
+
+ public BoltExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
+ super(workerData, executorId, credentials);
+ this.executeSampler = ConfigUtils.mkStatsSampler(stormConf);
+ }
+
+ public void init(Map<Integer, Task> idToTask) {
+ while (!stormActive.get()) {
+ Utils.sleep(100);
+ }
+
+ LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet());
+ for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
+ Task taskData = entry.getValue();
+ IBolt boltObject = (IBolt) taskData.getTaskObject();
+ TopologyContext userContext = taskData.getUserContext();
+ taskData.getBuiltInMetrics().registerAll(stormConf, userContext);
+ if (boltObject instanceof ICredentialsListener) {
+ ((ICredentialsListener) boltObject).setCredentials(credentials);
+ }
+ if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
+ Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue,
+ "transfer", workerData.getTransferQueue());
+ BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext);
+
+ Map cachedNodePortToSocket = (Map) workerData.getCachedNodeToPortSocket().get();
+ BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, stormConf, userContext);
+ BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), stormConf, userContext);
+ } else {
+ Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+ BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext);
+ }
+
+ IOutputCollector outputCollector = new BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, hasEventLoggers, isDebug);
+ boltObject.prepare(stormConf, userContext, new OutputCollector(outputCollector));
+ }
+ openOrPrepareWasCalled.set(true);
+ LOG.info("Prepared bolt {}:{}", componentId, idToTask.keySet());
+ setupMetrics();
+ }
+
+ @Override
+ public Callable<Object> call() throws Exception {
+ init(idToTask);
+
+ return new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ receiveQueue.consumeBatchWhenAvailable(BoltExecutor.this);
+ return 0L;
+ }
+ };
+ }
+
+ @Override
+ public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
+ String streamId = tuple.getSourceStreamId();
+ if (Constants.CREDENTIALS_CHANGED_STREAM_ID.equals(streamId)) {
+ Object taskObject = idToTask.get(taskId).getTaskObject();
+ if (taskObject instanceof ICredentialsListener) {
+ ((ICredentialsListener) taskObject).setCredentials((Map<String, String>) tuple.getValue(0));
+ }
+ } else if (Constants.METRICS_TICK_STREAM_ID.equals(streamId)) {
+ metricsTick(idToTask.get(taskId), tuple);
+ } else {
+ IBolt boltObject = (IBolt) idToTask.get(taskId).getTaskObject();
+ boolean isSampled = sampler.call();
+ boolean isExecuteSampler = executeSampler.call();
+ Long now = (isSampled || isExecuteSampler) ? System.currentTimeMillis() : null;
+ if (isSampled) {
+ tuple.setProcessSampleStartTime(now);
+ }
+ if (isExecuteSampler) {
+ tuple.setExecuteSampleStartTime(now);
+ }
+ boltObject.execute(tuple);
+
+ Long ms = tuple.getExecuteSampleStartTime();
+ long delta = (ms != null) ? Time.deltaMs(ms) : 0;
+ if (isDebug) {
+ LOG.info("Execute done TUPLE {} TASK: {} DELTA: {}", tuple, taskId, delta);
+ }
+ new BoltExecuteInfo(tuple, taskId, delta).applyOn(idToTask.get(taskId).getUserContext());
+ if (delta != 0) {
+ ((BoltExecutorStats) stats).boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
new file mode 100644
index 0000000..c490c3d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BoltOutputCollectorImpl implements IOutputCollector {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
+
+ private final BoltExecutor executor;
+ private final Task taskData;
+ private final int taskId;
+ private final Random random;
+ private final boolean isEventLoggers;
+ private final boolean isDebug;
+
+ public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random,
+ boolean isEventLoggers, boolean isDebug) {
+ this.executor = executor;
+ this.taskData = taskData;
+ this.taskId = taskId;
+ this.random = random;
+ this.isEventLoggers = isEventLoggers;
+ this.isDebug = isDebug;
+ }
+
+ public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ return boltEmit(streamId, anchors, tuple, null);
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ boltEmit(streamId, anchors, tuple, taskId);
+ }
+
+ private List<Integer> boltEmit(String streamId, Collection<Tuple> anchors, List<Object> values, Integer targetTaskId) {
+ List<Integer> outTasks;
+ if (targetTaskId != null) {
+ outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values);
+ } else {
+ outTasks = taskData.getOutgoingTasks(streamId, values);
+ }
+
+ for (Integer t : outTasks) {
+ Map<Long, Long> anchorsToIds = new HashMap<>();
+ if (anchors != null) {
+ for (Tuple a : anchors) {
+ Set<Long> rootIds = a.getMessageId().getAnchorsToIds().keySet();
+ if (rootIds.size() > 0) {
+ long edgeId = MessageId.generateId(random);
+ ((TupleImpl) a).updateAckVal(edgeId);
+ for (Long root_id : rootIds) {
+ putXor(anchorsToIds, root_id, edgeId);
+ }
+ }
+ }
+ }
+ MessageId msgId = MessageId.makeId(anchorsToIds);
+ TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
+ executor.getExecutorTransfer().transfer(t, tupleExt);
+ }
+ if (isEventLoggers) {
+ executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random);
+ }
+ return outTasks;
+ }
+
+ @Override
+ public void ack(Tuple input) {
+ long ackValue = ((TupleImpl) input).getAckVal();
+ Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds();
+ for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
+ executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID,
+ new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
+ executor.getExecutorTransfer());
+ }
+ long delta = tupleTimeDelta((TupleImpl) input);
+ if (isDebug) {
+ LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
+ }
+ BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
+ boltAckInfo.applyOn(taskData.getUserContext());
+ if (delta != 0) {
+ ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
+ input.getSourceComponent(), input.getSourceStreamId(), delta);
+ }
+ }
+
+ @Override
+ public void fail(Tuple input) {
+ Set<Long> roots = input.getMessageId().getAnchors();
+ for (Long root : roots) {
+ executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
+ new Values(root), executor.getExecutorTransfer());
+ }
+ long delta = tupleTimeDelta((TupleImpl) input);
+ if (isDebug) {
+ LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
+ }
+ BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
+ boltFailInfo.applyOn(taskData.getUserContext());
+ if (delta != 0) {
+ ((BoltExecutorStats) executor.getStats()).boltFailedTuple(
+ input.getSourceComponent(), input.getSourceStreamId(), delta);
+ }
+ }
+
+ @Override
+ public void resetTimeout(Tuple input) {
+ Set<Long> roots = input.getMessageId().getAnchors();
+ for (Long root : roots) {
+ executor.sendUnanchored(taskData, Acker.ACKER_RESET_TIMEOUT_STREAM_ID,
+ new Values(root), executor.getExecutorTransfer());
+ }
+ }
+
+ @Override
+ public void reportError(Throwable error) {
+ executor.getReportError().report(error);
+ }
+
+ private long tupleTimeDelta(TupleImpl tuple) {
+ Long ms = tuple.getProcessSampleStartTime();
+ if (ms != null)
+ return Time.deltaMs(ms);
+ return 0;
+ }
+
+ private void putXor(Map<Long, Long> pending, Long key, Long id) {
+ Long curr = pending.get(key);
+ if (curr == null) {
+ curr = 0l;
+ }
+ pending.put(key, Utils.bitXor(curr, id));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/error/IReportError.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/error/IReportError.java b/storm-client/src/jvm/org/apache/storm/executor/error/IReportError.java
new file mode 100644
index 0000000..73451f8
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/error/IReportError.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+public interface IReportError {
+ void report(Throwable error);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java b/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java
new file mode 100644
index 0000000..3272b85
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ReportError implements IReportError {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReportError.class);
+
+ private final Map stormConf;
+ private final IStormClusterState stormClusterState;
+ private final String stormId;
+ private final String componentId;
+ private final WorkerTopologyContext workerTopologyContext;
+
+ private int maxPerInterval;
+ private int errorIntervalSecs;
+ private AtomicInteger intervalStartTime;
+ private AtomicInteger intervalErrors;
+
+ public ReportError(Map stormConf, IStormClusterState stormClusterState, String stormId, String componentId, WorkerTopologyContext workerTopologyContext) {
+ this.stormConf = stormConf;
+ this.stormClusterState = stormClusterState;
+ this.stormId = stormId;
+ this.componentId = componentId;
+ this.workerTopologyContext = workerTopologyContext;
+ this.errorIntervalSecs = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS));
+ this.maxPerInterval = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL));
+ this.intervalStartTime = new AtomicInteger(Time.currentTimeSecs());
+ this.intervalErrors = new AtomicInteger(0);
+ }
+
+ @Override
+ public void report(Throwable error) {
+ LOG.error("Error", error);
+ if (Time.deltaSecs(intervalStartTime.get()) > errorIntervalSecs) {
+ intervalErrors.set(0);
+ intervalStartTime.set(Time.currentTimeSecs());
+ }
+ if (intervalErrors.incrementAndGet() <= maxPerInterval) {
+ try {
+ stormClusterState.reportError(stormId, componentId, Utils.hostname(),
+ workerTopologyContext.getThisWorkerPort().longValue(), error);
+ } catch (UnknownHostException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java b/storm-client/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java
new file mode 100644
index 0000000..b2e1f34
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReportErrorAndDie implements Thread.UncaughtExceptionHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(ReportErrorAndDie.class);
+ private final IReportError reportError;
+ private final Runnable suicideFn;
+
+ public ReportErrorAndDie(IReportError reportError, Runnable suicideFn) {
+ this.reportError = reportError;
+ this.suicideFn = suicideFn;
+ }
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ try {
+ reportError.report(e);
+ } catch (Exception ex) {
+ LOG.error("Error while reporting error to cluster, proceeding with shutdown", ex);
+ }
+ if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)
+ || (Utils.exceptionCauseIsInstanceOf(java.io.InterruptedIOException.class, e)
+ && !Utils.exceptionCauseIsInstanceOf(java.net.SocketTimeoutException.class, e))) {
+ LOG.info("Got interrupted exception shutting thread down...");
+ } else {
+ suicideFn.run();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
new file mode 100644
index 0000000..9e7622c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.concurrent.Callable;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);
+
+ private final ISpoutWaitStrategy spoutWaitStrategy;
+ private Integer maxSpoutPending;
+ private final AtomicBoolean lastActive;
+ private List<ISpout> spouts;
+ private List<SpoutOutputCollector> outputCollectors;
+ private final MutableLong emittedCount;
+ private final MutableLong emptyEmitStreak;
+ private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+ private final boolean hasAckers;
+ private RotatingMap<Long, TupleInfo> pending;
+ private final boolean backPressureEnabled;
+
+ public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) {
+ super(workerData, executorId, credentials);
+ this.spoutWaitStrategy = ReflectionUtils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+ this.spoutWaitStrategy.prepare(stormConf);
+
+ this.backPressureEnabled = ObjectReader.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+ this.lastActive = new AtomicBoolean(false);
+ this.hasAckers = StormCommon.hasAckers(stormConf);
+ this.emittedCount = new MutableLong(0);
+ this.emptyEmitStreak = new MutableLong(0);
+ this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+ }
+
+ public void init(final Map<Integer, Task> idToTask) {
+ while (!stormActive.get()) {
+ Utils.sleep(100);
+ }
+
+ LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+ this.idToTask = idToTask;
+ this.maxSpoutPending = ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
+ this.spouts = new ArrayList<>();
+ for (Task task : idToTask.values()) {
+ this.spouts.add((ISpout) task.getTaskObject());
+ }
+ this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() {
+ @Override
+ public void expire(Long key, TupleInfo tupleInfo) {
+ Long timeDelta = null;
+ if (tupleInfo.getTimestamp() != 0) {
+ timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+ }
+ failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId()), timeDelta, tupleInfo, "TIMEOUT");
+ }
+ });
+
+ this.spoutThrottlingMetrics.registerAll(stormConf, idToTask.values().iterator().next().getUserContext());
+ this.outputCollectors = new ArrayList<>();
+ for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
+ Task taskData = entry.getValue();
+ ISpout spoutObject = (ISpout) taskData.getTaskObject();
+ SpoutOutputCollectorImpl spoutOutputCollector = new SpoutOutputCollectorImpl(
+ spoutObject, this, taskData, entry.getKey(), emittedCount,
+ hasAckers, rand, hasEventLoggers, isDebug, pending);
+ SpoutOutputCollector outputCollector = new SpoutOutputCollector(spoutOutputCollector);
+ this.outputCollectors.add(outputCollector);
+
+ taskData.getBuiltInMetrics().registerAll(stormConf, taskData.getUserContext());
+ Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+ BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, taskData.getUserContext());
+
+ if (spoutObject instanceof ICredentialsListener) {
+ ((ICredentialsListener) spoutObject).setCredentials(credentials);
+ }
+ spoutObject.open(stormConf, taskData.getUserContext(), outputCollector);
+ }
+ openOrPrepareWasCalled.set(true);
+ LOG.info("Opened spout {}:{}", componentId, idToTask.keySet());
+ setupMetrics();
+ }
+
+ @Override
+ public Callable<Object> call() throws Exception {
+ init(idToTask);
+
+ return new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ receiveQueue.consumeBatch(SpoutExecutor.this);
+
+ long currCount = emittedCount.get();
+ boolean throttleOn = backPressureEnabled && SpoutExecutor.this.throttleOn.get();
+ boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
+ boolean isActive = stormActive.get();
+ if (isActive) {
+ if (!lastActive.get()) {
+ lastActive.set(true);
+ LOG.info("Activating spout {}:{}", componentId, idToTask.keySet());
+ for (ISpout spout : spouts) {
+ spout.activate();
+ }
+ }
+ if (!transferQueue.isFull() && !throttleOn && !reachedMaxSpoutPending) {
+ for (ISpout spout : spouts) {
+ spout.nextTuple();
+ }
+ }
+ } else {
+ if (lastActive.get()) {
+ lastActive.set(false);
+ LOG.info("Deactivating spout {}:{}", componentId, idToTask.keySet());
+ for (ISpout spout : spouts) {
+ spout.deactivate();
+ }
+ }
+ Time.sleep(100);
+ spoutThrottlingMetrics.skippedInactive(stats);
+ }
+ if (currCount == emittedCount.get() && isActive) {
+ emptyEmitStreak.increment();
+ spoutWaitStrategy.emptyEmit(emptyEmitStreak.get());
+ if (throttleOn) {
+ spoutThrottlingMetrics.skippedThrottle(stats);
+ } else if (reachedMaxSpoutPending) {
+ spoutThrottlingMetrics.skippedMaxSpout(stats);
+ }
+ } else {
+ emptyEmitStreak.set(0);
+ }
+ return 0L;
+ }
+ };
+ }
+
+ @Override
+ public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
+ String streamId = tuple.getSourceStreamId();
+ if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+ pending.rotate();
+ } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
+ metricsTick(idToTask.get(taskId), tuple);
+ } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
+ Object spoutObj = idToTask.get(taskId).getTaskObject();
+ if (spoutObj instanceof ICredentialsListener) {
+ ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
+ }
+ } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
+ Long id = (Long) tuple.getValue(0);
+ TupleInfo pendingForId = pending.get(id);
+ if (pendingForId != null) {
+ pending.put(id, pendingForId);
+ }
+ } else {
+ Long id = (Long) tuple.getValue(0);
+ Long timeDeltaMs = (Long) tuple.getValue(1);
+ TupleInfo tupleInfo = (TupleInfo) pending.remove(id);
+ if (tupleInfo != null && tupleInfo.getMessageId() != null) {
+ if (taskId != tupleInfo.getTaskId()) {
+ throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
+ }
+ long startTimeMs = tupleInfo.getTimestamp();
+ Long timeDelta = null;
+ if (startTimeMs != 0) {
+ timeDelta = timeDeltaMs;
+ }
+ if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
+ ackSpoutMsg(this, idToTask.get(taskId), timeDelta, tupleInfo);
+ } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
+ failSpoutMsg(this, idToTask.get(taskId), timeDelta, tupleInfo, "FAIL-STREAM");
+ }
+ }
+ }
+ }
+
+ public void ackSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
+ try {
+ ISpout spout = (ISpout) taskData.getTaskObject();
+ int taskId = taskData.getTaskId();
+ if (executor.getIsDebug()) {
+ LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId());
+ }
+ spout.ack(tupleInfo.getMessageId());
+ new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
+ if (timeDelta != null) {
+ ((SpoutExecutorStats) executor.getStats()).spoutAckedTuple(tupleInfo.getStream(), timeDelta);
+ }
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ public void failSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
+ try {
+ ISpout spout = (ISpout) taskData.getTaskObject();
+ int taskId = taskData.getTaskId();
+ if (executor.getIsDebug()) {
+ LOG.info("SPOUT Failing {} : {} REASON: {}", tupleInfo.getId(), tupleInfo, reason);
+ }
+ spout.fail(tupleInfo.getMessageId());
+ new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
+ if (timeDelta != null) {
+ ((SpoutExecutorStats) executor.getStats()).spoutFailedTuple(tupleInfo.getStream(), timeDelta);
+ }
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
new file mode 100644
index 0000000..f81b2c2
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
+
+ private final SpoutExecutor executor;
+ private final Task taskData;
+ private final int taskId;
+ private final MutableLong emittedCount;
+ private final boolean hasAckers;
+ private final Random random;
+ private final Boolean isEventLoggers;
+ private final Boolean isDebug;
+ private final RotatingMap<Long, TupleInfo> pending;
+
+ @SuppressWarnings("unused")
+ public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData, int taskId,
+ MutableLong emittedCount, boolean hasAckers, Random random,
+ Boolean isEventLoggers, Boolean isDebug, RotatingMap<Long, TupleInfo> pending) {
+ this.executor = executor;
+ this.taskData = taskData;
+ this.taskId = taskId;
+ this.emittedCount = emittedCount;
+ this.hasAckers = hasAckers;
+ this.random = random;
+ this.isEventLoggers = isEventLoggers;
+ this.isDebug = isDebug;
+ this.pending = pending;
+ }
+
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+ return sendSpoutMsg(streamId, tuple, messageId, null);
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
+ sendSpoutMsg(streamId, tuple, messageId, taskId);
+ }
+
+ @Override
+ public long getPendingCount() {
+ return pending.size();
+ }
+
+ @Override
+ public void reportError(Throwable error) {
+ executor.getReportError().report(error);
+ }
+
+ private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) {
+ emittedCount.increment();
+
+ List<Integer> outTasks;
+ if (outTaskId != null) {
+ outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
+ } else {
+ outTasks = taskData.getOutgoingTasks(stream, values);
+ }
+
+ List<Long> ackSeq = new ArrayList<>();
+ boolean needAck = (messageId != null) && hasAckers;
+
+ long rootId = MessageId.generateId(random);
+ for (Integer t : outTasks) {
+ MessageId msgId;
+ if (needAck) {
+ long as = MessageId.generateId(random);
+ msgId = MessageId.makeRootId(rootId, as);
+ ackSeq.add(as);
+ } else {
+ msgId = MessageId.makeUnanchored();
+ }
+
+ TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId);
+ executor.getExecutorTransfer().transfer(t, tuple);
+ }
+ if (isEventLoggers) {
+ executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), messageId, random);
+ }
+
+ boolean sample = false;
+ try {
+ sample = executor.getSampler().call();
+ } catch (Exception ignored) {
+ }
+ if (needAck) {
+ TupleInfo info = new TupleInfo();
+ info.setTaskId(this.taskId);
+ info.setStream(stream);
+ info.setMessageId(messageId);
+ if (isDebug) {
+ info.setValues(values);
+ }
+ if (sample) {
+ info.setTimestamp(System.currentTimeMillis());
+ }
+
+ pending.put(rootId, info);
+ List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
+ executor.sendUnanchored(taskData, Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer());
+ } else if (messageId != null) {
+ TupleInfo info = new TupleInfo();
+ info.setStream(stream);
+ info.setValues(values);
+ info.setMessageId(messageId);
+ info.setTimestamp(0);
+ Long timeDelta = sample ? 0L : null;
+ info.setId("0:");
+ executor.ackSpoutMsg(executor, taskData, timeDelta, info);
+ }
+
+ return outTasks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/generated/AccessControl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/AccessControl.java b/storm-client/src/jvm/org/apache/storm/generated/AccessControl.java
new file mode 100644
index 0000000..c1c072a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/generated/AccessControl.java
@@ -0,0 +1,627 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)")
+public class AccessControl implements org.apache.thrift.TBase<AccessControl, AccessControl._Fields>, java.io.Serializable, Cloneable, Comparable<AccessControl> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AccessControl");
+
+ private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)1);
+ private static final org.apache.thrift.protocol.TField NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("name", org.apache.thrift.protocol.TType.STRING, (short)2);
+ private static final org.apache.thrift.protocol.TField ACCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("access", org.apache.thrift.protocol.TType.I32, (short)3);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new AccessControlStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new AccessControlTupleSchemeFactory());
+ }
+
+ private AccessControlType type; // required
+ private String name; // optional
+ private int access; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ /**
+ *
+ * @see AccessControlType
+ */
+ TYPE((short)1, "type"),
+ NAME((short)2, "name"),
+ ACCESS((short)3, "access");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // TYPE
+ return TYPE;
+ case 2: // NAME
+ return NAME;
+ case 3: // ACCESS
+ return ACCESS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __ACCESS_ISSET_ID = 0;
+ private byte __isset_bitfield = 0;
+ private static final _Fields optionals[] = {_Fields.NAME};
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.TYPE, new org.apache.thrift.meta_data.FieldMetaData("type", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, AccessControlType.class)));
+ tmpMap.put(_Fields.NAME, new org.apache.thrift.meta_data.FieldMetaData("name", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.ACCESS, new org.apache.thrift.meta_data.FieldMetaData("access", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(AccessControl.class, metaDataMap);
+ }
+
+ public AccessControl() {
+ }
+
+ public AccessControl(
+ AccessControlType type,
+ int access)
+ {
+ this();
+ this.type = type;
+ this.access = access;
+ set_access_isSet(true);
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public AccessControl(AccessControl other) {
+ __isset_bitfield = other.__isset_bitfield;
+ if (other.is_set_type()) {
+ this.type = other.type;
+ }
+ if (other.is_set_name()) {
+ this.name = other.name;
+ }
+ this.access = other.access;
+ }
+
+ public AccessControl deepCopy() {
+ return new AccessControl(this);
+ }
+
+ @Override
+ public void clear() {
+ this.type = null;
+ this.name = null;
+ set_access_isSet(false);
+ this.access = 0;
+ }
+
+ /**
+ *
+ * @see AccessControlType
+ */
+ public AccessControlType get_type() {
+ return this.type;
+ }
+
+ /**
+ *
+ * @see AccessControlType
+ */
+ public void set_type(AccessControlType type) {
+ this.type = type;
+ }
+
+ public void unset_type() {
+ this.type = null;
+ }
+
+ /** Returns true if field type is set (has been assigned a value) and false otherwise */
+ public boolean is_set_type() {
+ return this.type != null;
+ }
+
+ public void set_type_isSet(boolean value) {
+ if (!value) {
+ this.type = null;
+ }
+ }
+
+ public String get_name() {
+ return this.name;
+ }
+
+ public void set_name(String name) {
+ this.name = name;
+ }
+
+ public void unset_name() {
+ this.name = null;
+ }
+
+ /** Returns true if field name is set (has been assigned a value) and false otherwise */
+ public boolean is_set_name() {
+ return this.name != null;
+ }
+
+ public void set_name_isSet(boolean value) {
+ if (!value) {
+ this.name = null;
+ }
+ }
+
+ public int get_access() {
+ return this.access;
+ }
+
+ public void set_access(int access) {
+ this.access = access;
+ set_access_isSet(true);
+ }
+
+ public void unset_access() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __ACCESS_ISSET_ID);
+ }
+
+ /** Returns true if field access is set (has been assigned a value) and false otherwise */
+ public boolean is_set_access() {
+ return EncodingUtils.testBit(__isset_bitfield, __ACCESS_ISSET_ID);
+ }
+
+ public void set_access_isSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ACCESS_ISSET_ID, value);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case TYPE:
+ if (value == null) {
+ unset_type();
+ } else {
+ set_type((AccessControlType)value);
+ }
+ break;
+
+ case NAME:
+ if (value == null) {
+ unset_name();
+ } else {
+ set_name((String)value);
+ }
+ break;
+
+ case ACCESS:
+ if (value == null) {
+ unset_access();
+ } else {
+ set_access((Integer)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case TYPE:
+ return get_type();
+
+ case NAME:
+ return get_name();
+
+ case ACCESS:
+ return get_access();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case TYPE:
+ return is_set_type();
+ case NAME:
+ return is_set_name();
+ case ACCESS:
+ return is_set_access();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof AccessControl)
+ return this.equals((AccessControl)that);
+ return false;
+ }
+
+ public boolean equals(AccessControl that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_type = true && this.is_set_type();
+ boolean that_present_type = true && that.is_set_type();
+ if (this_present_type || that_present_type) {
+ if (!(this_present_type && that_present_type))
+ return false;
+ if (!this.type.equals(that.type))
+ return false;
+ }
+
+ boolean this_present_name = true && this.is_set_name();
+ boolean that_present_name = true && that.is_set_name();
+ if (this_present_name || that_present_name) {
+ if (!(this_present_name && that_present_name))
+ return false;
+ if (!this.name.equals(that.name))
+ return false;
+ }
+
+ boolean this_present_access = true;
+ boolean that_present_access = true;
+ if (this_present_access || that_present_access) {
+ if (!(this_present_access && that_present_access))
+ return false;
+ if (this.access != that.access)
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_type = true && (is_set_type());
+ list.add(present_type);
+ if (present_type)
+ list.add(type.getValue());
+
+ boolean present_name = true && (is_set_name());
+ list.add(present_name);
+ if (present_name)
+ list.add(name);
+
+ boolean present_access = true;
+ list.add(present_access);
+ if (present_access)
+ list.add(access);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(AccessControl other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_type()).compareTo(other.is_set_type());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_type()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.type, other.type);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_name()).compareTo(other.is_set_name());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_name()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.name, other.name);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_access()).compareTo(other.is_set_access());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_access()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.access, other.access);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("AccessControl(");
+ boolean first = true;
+
+ sb.append("type:");
+ if (this.type == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.type);
+ }
+ first = false;
+ if (is_set_name()) {
+ if (!first) sb.append(", ");
+ sb.append("name:");
+ if (this.name == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.name);
+ }
+ first = false;
+ }
+ if (!first) sb.append(", ");
+ sb.append("access:");
+ sb.append(this.access);
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!is_set_type()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'type' is unset! Struct:" + toString());
+ }
+
+ if (!is_set_access()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'access' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class AccessControlStandardSchemeFactory implements SchemeFactory {
+ public AccessControlStandardScheme getScheme() {
+ return new AccessControlStandardScheme();
+ }
+ }
+
+ private static class AccessControlStandardScheme extends StandardScheme<AccessControl> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, AccessControl struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // TYPE
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.type = org.apache.storm.generated.AccessControlType.findByValue(iprot.readI32());
+ struct.set_type_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // NAME
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.name = iprot.readString();
+ struct.set_name_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 3: // ACCESS
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.access = iprot.readI32();
+ struct.set_access_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, AccessControl struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.type != null) {
+ oprot.writeFieldBegin(TYPE_FIELD_DESC);
+ oprot.writeI32(struct.type.getValue());
+ oprot.writeFieldEnd();
+ }
+ if (struct.name != null) {
+ if (struct.is_set_name()) {
+ oprot.writeFieldBegin(NAME_FIELD_DESC);
+ oprot.writeString(struct.name);
+ oprot.writeFieldEnd();
+ }
+ }
+ oprot.writeFieldBegin(ACCESS_FIELD_DESC);
+ oprot.writeI32(struct.access);
+ oprot.writeFieldEnd();
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class AccessControlTupleSchemeFactory implements SchemeFactory {
+ public AccessControlTupleScheme getScheme() {
+ return new AccessControlTupleScheme();
+ }
+ }
+
+ private static class AccessControlTupleScheme extends TupleScheme<AccessControl> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, AccessControl struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ oprot.writeI32(struct.type.getValue());
+ oprot.writeI32(struct.access);
+ BitSet optionals = new BitSet();
+ if (struct.is_set_name()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.is_set_name()) {
+ oprot.writeString(struct.name);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, AccessControl struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ struct.type = org.apache.storm.generated.AccessControlType.findByValue(iprot.readI32());
+ struct.set_type_isSet(true);
+ struct.access = iprot.readI32();
+ struct.set_access_isSet(true);
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.name = iprot.readString();
+ struct.set_name_isSet(true);
+ }
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/generated/AccessControlType.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/AccessControlType.java b/storm-client/src/jvm/org/apache/storm/generated/AccessControlType.java
new file mode 100644
index 0000000..cb75b9b
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/generated/AccessControlType.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.storm.generated;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum AccessControlType implements org.apache.thrift.TEnum {
+ OTHER(1),
+ USER(2);
+
+ private final int value;
+
+ private AccessControlType(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static AccessControlType findByValue(int value) {
+ switch (value) {
+ case 1:
+ return OTHER;
+ case 2:
+ return USER;
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/generated/AlreadyAliveException.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/AlreadyAliveException.java b/storm-client/src/jvm/org/apache/storm/generated/AlreadyAliveException.java
new file mode 100644
index 0000000..4e635fb
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/generated/AlreadyAliveException.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)")
+public class AlreadyAliveException extends TException implements org.apache.thrift.TBase<AlreadyAliveException, AlreadyAliveException._Fields>, java.io.Serializable, Cloneable, Comparable<AlreadyAliveException> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AlreadyAliveException");
+
+ private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.STRING, (short)1);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new AlreadyAliveExceptionStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new AlreadyAliveExceptionTupleSchemeFactory());
+ }
+
+ private String msg; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ MSG((short)1, "msg");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // MSG
+ return MSG;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.MSG, new org.apache.thrift.meta_data.FieldMetaData("msg", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(AlreadyAliveException.class, metaDataMap);
+ }
+
+ public AlreadyAliveException() {
+ }
+
+ public AlreadyAliveException(
+ String msg)
+ {
+ this();
+ this.msg = msg;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public AlreadyAliveException(AlreadyAliveException other) {
+ if (other.is_set_msg()) {
+ this.msg = other.msg;
+ }
+ }
+
+ public AlreadyAliveException deepCopy() {
+ return new AlreadyAliveException(this);
+ }
+
+ @Override
+ public void clear() {
+ this.msg = null;
+ }
+
+ public String get_msg() {
+ return this.msg;
+ }
+
+ public void set_msg(String msg) {
+ this.msg = msg;
+ }
+
+ public void unset_msg() {
+ this.msg = null;
+ }
+
+ /** Returns true if field msg is set (has been assigned a value) and false otherwise */
+ public boolean is_set_msg() {
+ return this.msg != null;
+ }
+
+ public void set_msg_isSet(boolean value) {
+ if (!value) {
+ this.msg = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case MSG:
+ if (value == null) {
+ unset_msg();
+ } else {
+ set_msg((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case MSG:
+ return get_msg();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case MSG:
+ return is_set_msg();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof AlreadyAliveException)
+ return this.equals((AlreadyAliveException)that);
+ return false;
+ }
+
+ public boolean equals(AlreadyAliveException that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_msg = true && this.is_set_msg();
+ boolean that_present_msg = true && that.is_set_msg();
+ if (this_present_msg || that_present_msg) {
+ if (!(this_present_msg && that_present_msg))
+ return false;
+ if (!this.msg.equals(that.msg))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_msg = true && (is_set_msg());
+ list.add(present_msg);
+ if (present_msg)
+ list.add(msg);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(AlreadyAliveException other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_msg()).compareTo(other.is_set_msg());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_msg()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.msg, other.msg);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("AlreadyAliveException(");
+ boolean first = true;
+
+ sb.append("msg:");
+ if (this.msg == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.msg);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!is_set_msg()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class AlreadyAliveExceptionStandardSchemeFactory implements SchemeFactory {
+ public AlreadyAliveExceptionStandardScheme getScheme() {
+ return new AlreadyAliveExceptionStandardScheme();
+ }
+ }
+
+ private static class AlreadyAliveExceptionStandardScheme extends StandardScheme<AlreadyAliveException> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // MSG
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.msg = iprot.readString();
+ struct.set_msg_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.msg != null) {
+ oprot.writeFieldBegin(MSG_FIELD_DESC);
+ oprot.writeString(struct.msg);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class AlreadyAliveExceptionTupleSchemeFactory implements SchemeFactory {
+ public AlreadyAliveExceptionTupleScheme getScheme() {
+ return new AlreadyAliveExceptionTupleScheme();
+ }
+ }
+
+ private static class AlreadyAliveExceptionTupleScheme extends TupleScheme<AlreadyAliveException> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ oprot.writeString(struct.msg);
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ struct.msg = iprot.readString();
+ struct.set_msg_isSet(true);
+ }
+ }
+
+}
+