You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/08 08:17:37 UTC
[1/5] storm git commit: STORM-1277 port
backtype.storm.daemon.executor to java
Repository: storm
Updated Branches:
refs/heads/master 44068c419 -> 28563ece1
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/error/ReportError.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/error/ReportError.java b/storm-core/src/jvm/org/apache/storm/executor/error/ReportError.java
new file mode 100644
index 0000000..ec71ed4
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/error/ReportError.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ReportError implements IReportError {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReportError.class);
+
+ private final Map stormConf;
+ private final IStormClusterState stormClusterState;
+ private final String stormId;
+ private final String componentId;
+ private final WorkerTopologyContext workerTopologyContext;
+
+ private int maxPerInterval;
+ private int errorIntervalSecs;
+ private AtomicInteger intervalStartTime;
+ private AtomicInteger intervalErrors;
+
+ public ReportError(Map stormConf, IStormClusterState stormClusterState, String stormId, String componentId, WorkerTopologyContext workerTopologyContext) {
+ this.stormConf = stormConf;
+ this.stormClusterState = stormClusterState;
+ this.stormId = stormId;
+ this.componentId = componentId;
+ this.workerTopologyContext = workerTopologyContext;
+ this.errorIntervalSecs = Utils.getInt(stormConf.get(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS));
+ this.maxPerInterval = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL));
+ this.intervalStartTime = new AtomicInteger(Time.currentTimeSecs());
+ this.intervalErrors = new AtomicInteger(0);
+ }
+
+ @Override
+ public void report(Throwable error) {
+ LOG.error("Error", error);
+ if (Time.deltaSecs(intervalStartTime.get()) > errorIntervalSecs) {
+ intervalErrors.set(0);
+ intervalStartTime.set(Time.currentTimeSecs());
+ }
+ if (intervalErrors.incrementAndGet() <= maxPerInterval) {
+ try {
+ stormClusterState.reportError(stormId, componentId, Utils.hostname(stormConf),
+ workerTopologyContext.getThisWorkerPort().longValue(), error);
+ } catch (UnknownHostException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java b/storm-core/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java
new file mode 100644
index 0000000..d326af1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/error/ReportErrorAndDie.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReportErrorAndDie implements Thread.UncaughtExceptionHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(ReportErrorAndDie.class);
+ private final IReportError reportError;
+ private final Runnable suicideFn;
+
+ public ReportErrorAndDie(IReportError reportError, Runnable suicideFn) {
+ this.reportError = reportError;
+ this.suicideFn = suicideFn;
+ }
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ try {
+ reportError.report(e);
+ } catch (Exception ex) {
+ LOG.error("Error while reporting error to cluster, proceeding with shutdown", ex);
+ }
+ if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)
+ || Utils.exceptionCauseIsInstanceOf(java.io.InterruptedIOException.class, e)) {
+ LOG.info("Got interrupted exception shutting thread down...");
+ suicideFn.run();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
new file mode 100644
index 0000000..a2ee650
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.concurrent.Callable;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);
+
+ private final ISpoutWaitStrategy spoutWaitStrategy;
+ private Integer maxSpoutPending;
+ private final AtomicBoolean lastActive;
+ private List<ISpout> spouts;
+ private List<SpoutOutputCollector> outputCollectors;
+ private final MutableLong emittedCount;
+ private final MutableLong emptyEmitStreak;
+ private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+ private final boolean hasAckers;
+ private RotatingMap<Long, TupleInfo> pending;
+ private final boolean backPressureEnabled;
+
+ public SpoutExecutor(final Map workerData, final List<Long> executorId, Map<String, String> credentials) {
+ super(workerData, executorId, credentials);
+ this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+ this.spoutWaitStrategy.prepare(stormConf);
+
+ this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+ this.lastActive = new AtomicBoolean(false);
+ this.hasAckers = StormCommon.hasAckers(stormConf);
+ this.emittedCount = new MutableLong(0);
+ this.emptyEmitStreak = new MutableLong(0);
+ this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+ }
+
+ @Override
+ public void init(final Map<Integer, Task> idToTask) {
+ LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+ this.idToTask = idToTask;
+ this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
+ this.spouts = new ArrayList<>();
+ for (Task task : idToTask.values()) {
+ this.spouts.add((ISpout) task.getTaskObject());
+ }
+ this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() {
+ @Override
+ public void expire(Long key, TupleInfo tupleInfo) {
+ Long timeDelta = null;
+ if (tupleInfo.getTimestamp() != 0) {
+ timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+ }
+ failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId()), timeDelta, tupleInfo, "TIMEOUT");
+ }
+ });
+
+ this.spoutThrottlingMetrics.registerAll(stormConf, idToTask.values().iterator().next().getUserContext());
+ this.outputCollectors = new ArrayList<>();
+ for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
+ Task taskData = entry.getValue();
+ ISpout spoutObject = (ISpout) taskData.getTaskObject();
+ SpoutOutputCollectorImpl spoutOutputCollector = new SpoutOutputCollectorImpl(
+ spoutObject, this, taskData, entry.getKey(), emittedCount,
+ hasAckers, rand, hasEventLoggers, isDebug, pending);
+ SpoutOutputCollector outputCollector = new SpoutOutputCollector(spoutOutputCollector);
+ this.outputCollectors.add(outputCollector);
+
+ taskData.getBuiltInMetrics().registerAll(stormConf, taskData.getUserContext());
+ Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+ BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, taskData.getUserContext());
+
+ if (spoutObject instanceof ICredentialsListener) {
+ ((ICredentialsListener) spoutObject).setCredentials(credentials);
+ }
+ spoutObject.open(stormConf, taskData.getUserContext(), outputCollector);
+ }
+ openOrPrepareWasCalled.set(true);
+ LOG.info("Opened spout {}:{}", componentId, idToTask.keySet());
+ setupMetrics();
+ }
+
+ @Override
+ public Callable<Object> call() throws Exception {
+ while (!stormActive.get()) {
+ Utils.sleep(100);
+ }
+
+ return new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ receiveQueue.consumeBatch(SpoutExecutor.this);
+
+ long currCount = emittedCount.get();
+ boolean throttleOn = backPressureEnabled && SpoutExecutor.this.throttleOn.get();
+ boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
+ boolean isActive = stormActive.get();
+ if (isActive) {
+ if (!lastActive.get()) {
+ lastActive.set(true);
+ LOG.info("Activating spout {}:{}", componentId, idToTask.keySet());
+ for (ISpout spout : spouts) {
+ spout.activate();
+ }
+ }
+ if (!transferQueue.isFull() && !throttleOn && !reachedMaxSpoutPending) {
+ for (ISpout spout : spouts) {
+ spout.nextTuple();
+ }
+ }
+ } else {
+ if (lastActive.get()) {
+ lastActive.set(false);
+ LOG.info("Deactivating spout {}:{}", componentId, idToTask.keySet());
+ for (ISpout spout : spouts) {
+ spout.deactivate();
+ }
+ }
+ Time.sleep(100);
+ spoutThrottlingMetrics.skippedInactive(stats);
+ }
+ if (currCount == emittedCount.get() && isActive) {
+ emptyEmitStreak.increment();
+ spoutWaitStrategy.emptyEmit(emptyEmitStreak.get());
+ if (throttleOn) {
+ spoutThrottlingMetrics.skippedThrottle(stats);
+ } else if (reachedMaxSpoutPending) {
+ spoutThrottlingMetrics.skippedMaxSpout(stats);
+ }
+ } else {
+ emptyEmitStreak.set(0);
+ }
+ return 0L;
+ }
+ };
+ }
+
+ @Override
+ public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
+ String streamId = tuple.getSourceStreamId();
+ if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+ pending.rotate();
+ } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
+ metricsTick(idToTask.get(taskId), tuple);
+ } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
+ Object spoutObj = idToTask.get(taskId).getTaskObject();
+ if (spoutObj instanceof ICredentialsListener) {
+ ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
+ }
+ } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
+ Long id = (Long) tuple.getValue(0);
+ TupleInfo pendingForId = pending.get(id);
+ if (pendingForId != null) {
+ pending.put(id, pendingForId);
+ }
+ } else {
+ Long id = (Long) tuple.getValue(0);
+ Long timeDeltaMs = (Long) tuple.getValue(1);
+ TupleInfo tupleInfo = (TupleInfo) pending.remove(id);
+ if (tupleInfo.getMessageId() != null) {
+ if (taskId != tupleInfo.getTaskId()) {
+ throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
+ }
+ long startTimeMs = tupleInfo.getTimestamp();
+ Long timeDelta = null;
+ if (startTimeMs != 0) {
+ timeDelta = timeDeltaMs;
+ }
+ if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
+ ackSpoutMsg(this, idToTask.get(taskId), timeDelta, tupleInfo);
+ } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
+ failSpoutMsg(this, idToTask.get(taskId), timeDelta, tupleInfo, "FAIL-STREAM");
+ }
+ }
+ }
+ }
+
+ public void ackSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
+ try {
+ ISpout spout = (ISpout) taskData.getTaskObject();
+ int taskId = taskData.getTaskId();
+ if (executor.getIsDebug()) {
+ LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId());
+ }
+ spout.ack(tupleInfo.getMessageId());
+ new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
+ if (timeDelta != null) {
+ ((SpoutExecutorStats) executor.getStats()).spoutAckedTuple(tupleInfo.getStream(), timeDelta);
+ }
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ public void failSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
+ try {
+ ISpout spout = (ISpout) taskData.getTaskObject();
+ int taskId = taskData.getTaskId();
+ if (executor.getIsDebug()) {
+ LOG.info("SPOUT Failing {} : {} REASON: {}", tupleInfo.getId(), tupleInfo, reason);
+ }
+ spout.fail(tupleInfo.getMessageId());
+ new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
+ if (timeDelta != null) {
+ ((SpoutExecutorStats) executor.getStats()).spoutFailedTuple(tupleInfo.getStream(), timeDelta);
+ }
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
new file mode 100644
index 0000000..9fbb994
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
+
+ private final SpoutExecutor executor;
+ private final Task taskData;
+ private final int taskId;
+ private final MutableLong emittedCount;
+ private final boolean hasAckers;
+ private final Random random;
+ private final Boolean isEventLoggers;
+ private final Boolean isDebug;
+ private final RotatingMap<Long, TupleInfo> pending;
+
+ @SuppressWarnings("unused")
+ public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData, int taskId,
+ MutableLong emittedCount, boolean hasAckers, Random random,
+ Boolean isEventLoggers, Boolean isDebug, RotatingMap<Long, TupleInfo> pending) {
+ this.executor = executor;
+ this.taskData = taskData;
+ this.taskId = taskId;
+ this.emittedCount = emittedCount;
+ this.hasAckers = hasAckers;
+ this.random = random;
+ this.isEventLoggers = isEventLoggers;
+ this.isDebug = isDebug;
+ this.pending = pending;
+ }
+
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+ return sendSpoutMsg(streamId, tuple, messageId, null);
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
+ sendSpoutMsg(streamId, tuple, messageId, taskId);
+ }
+
+ @Override
+ public long getPendingCount() {
+ return pending.size();
+ }
+
+ @Override
+ public void reportError(Throwable error) {
+ executor.getReportError().report(error);
+ }
+
+ private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) {
+ emittedCount.increment();
+
+ List<Integer> outTasks;
+ if (outTaskId != null) {
+ outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
+ } else {
+ outTasks = taskData.getOutgoingTasks(stream, values);
+ }
+
+ List<Long> ackSeq = new ArrayList<>();
+ boolean needAck = (messageId != null) && hasAckers;
+
+ long rootId = MessageId.generateId(random);
+ for (Integer t : outTasks) {
+ MessageId msgId;
+ if (needAck) {
+ long as = MessageId.generateId(random);
+ msgId = MessageId.makeRootId(rootId, as);
+ ackSeq.add(as);
+ } else {
+ msgId = MessageId.makeUnanchored();
+ }
+
+ TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId);
+ executor.getExecutorTransfer().transfer(t, tuple);
+ }
+ if (isEventLoggers) {
+ executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), messageId, random);
+ }
+
+ boolean sample = false;
+ try {
+ sample = executor.getSampler().call();
+ } catch (Exception ignored) {
+ }
+ if (needAck) {
+ TupleInfo info = new TupleInfo();
+ info.setTaskId(this.taskId);
+ info.setStream(stream);
+ info.setMessageId(messageId);
+ if (isDebug) {
+ info.setValues(values);
+ }
+ if (sample) {
+ info.setTimestamp(System.currentTimeMillis());
+ }
+
+ pending.put(rootId, info);
+ List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
+ executor.sendUnanchored(taskData, Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer());
+ } else if (messageId != null) {
+ TupleInfo info = new TupleInfo();
+ info.setStream(stream);
+ info.setValues(values);
+ info.setMessageId(messageId);
+ info.setTimestamp(0);
+ Long timeDelta = sample ? 0L : null;
+ info.setId("0:");
+ executor.ackSpoutMsg(executor, taskData, timeDelta, info);
+ }
+
+ return outTasks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index bfd0d36..6102549 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -84,6 +84,7 @@ public class BoltExecutorStats extends CommonStats {
}
+ @Override
public ExecutorStats renderStats() {
ExecutorStats ret = new ExecutorStats();
// common stats
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
index f7826f9..9b9a4f3 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
@@ -19,12 +19,13 @@ package org.apache.storm.stats;
import java.util.HashMap;
import java.util.Map;
+import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.internal.MultiCountStatAndMetric;
import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
@SuppressWarnings("unchecked")
-public class CommonStats {
+public abstract class CommonStats {
public static final int NUM_STAT_BUCKETS = 20;
public static final String RATE = "rate";
@@ -109,4 +110,6 @@ public class CommonStats {
return null;
}
+ public abstract ExecutorStats renderStats();
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
index 28c885a..c7ba844 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
@@ -58,6 +58,7 @@ public class SpoutExecutorStats extends CommonStats {
this.getFailed().incBy(stream, this.rate);
}
+ @Override
public ExecutorStats renderStats() {
ExecutorStats ret = new ExecutorStats();
// common fields
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
index 91cbee9..5092b1c 100644
--- a/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.task;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.StormTopology;
@@ -55,16 +56,25 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
private List<ITaskHook> _hooks = new ArrayList<>();
private Map<String, Object> _executorData;
private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics;
- private clojure.lang.Atom _openOrPrepareWasCalled;
-
-
- public TopologyContext(StormTopology topology, Map stormConf,
- Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
- Map<String, Map<String, Fields>> componentToStreamToFields,
- String stormId, String codeDir, String pidDir, Integer taskId,
- Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
- Map<String, Object> userResources, Map<String, Object> executorData, Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics,
- clojure.lang.Atom openOrPrepareWasCalled) {
+ private AtomicBoolean _openOrPrepareWasCalled;
+
+
+ public TopologyContext(StormTopology topology,
+ Map stormConf,
+ Map<Integer, String> taskToComponent,
+ Map<String, List<Integer>> componentToSortedTasks,
+ Map<String, Map<String, Fields>> componentToStreamToFields,
+ String stormId,
+ String codeDir,
+ String pidDir,
+ Integer taskId,
+ Integer workerPort,
+ List<Integer> workerTasks,
+ Map<String, Object> defaultResources,
+ Map<String, Object> userResources,
+ Map<String, Object> executorData,
+ Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics,
+ AtomicBoolean openOrPrepareWasCalled) {
super(topology, stormConf, taskToComponent, componentToSortedTasks,
componentToStreamToFields, stormId, codeDir, pidDir,
workerPort, workerTasks, defaultResources, userResources);
@@ -312,7 +322,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
* @return The IMetric argument unchanged.
*/
public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
- if((Boolean) _openOrPrepareWasCalled.deref()) {
+ if(_openOrPrepareWasCalled.get()) {
throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
"IBolt::prepare() or ISpout::open() method.");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 026ad86..e17705e 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.utils;
+import clojure.lang.Keyword;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -1775,6 +1776,15 @@ public class Utils {
Runtime.getRuntime().exit(val);
}
+ public static Runnable mkSuicideFn() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ Utils.exitProcess(1, "Worker died");
+ }
+ };
+ }
+
/**
* "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
*
@@ -2381,4 +2391,23 @@ public class Utils {
return rtn;
}
+ /**
+ * converts a clojure PersistentMap to java HashMap
+ */
+ public static Map<String, Object> convertClojureMapToJavaMap(Map map) {
+ Map<String, Object> ret = new HashMap<>(map.size());
+ for (Object obj : map.entrySet()) {
+ Map.Entry entry = (Map.Entry) obj;
+ Keyword keyword = (Keyword) entry.getKey();
+ String key = keyword.getName();
+ if (key.startsWith(":")) {
+ key = key.substring(1, key.length());
+ }
+ Object value = entry.getValue();
+ ret.put(key, value);
+ }
+
+ return ret;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
index f3b5a66..b7966e0 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
@@ -24,9 +24,9 @@ import org.slf4j.LoggerFactory;
public class WorkerBackpressureThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(WorkerBackpressureThread.class);
- private Object trigger;
- private Object workerData;
- private WorkerBackpressureCallback callback;
+ private final Object trigger;
+ private final Object workerData;
+ private final WorkerBackpressureCallback callback;
private volatile boolean running = true;
public WorkerBackpressureThread(Object trigger, Object workerData, WorkerBackpressureCallback callback) {
@@ -38,7 +38,7 @@ public class WorkerBackpressureThread extends Thread {
this.setUncaughtExceptionHandler(new BackpressureUncaughtExceptionHandler());
}
- static public void notifyBackpressureChecker(Object trigger) {
+ static public void notifyBackpressureChecker(final Object trigger) {
try {
synchronized (trigger) {
trigger.notifyAll();
@@ -57,7 +57,7 @@ public class WorkerBackpressureThread extends Thread {
public void run() {
while (running) {
try {
- synchronized(trigger) {
+ synchronized (trigger) {
trigger.wait(100);
}
callback.onEvent(workerData); // check all executors and update zk backpressure throttle for the worker if needed
@@ -70,6 +70,7 @@ public class WorkerBackpressureThread extends Thread {
class BackpressureUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(BackpressureUncaughtExceptionHandler.class);
+
@Override
public void uncaughtException(Thread t, Throwable e) {
// note that exception that happens during connecting to ZK has been ignored in the callback implementation
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index b4827e9..787fbcd 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -403,17 +403,18 @@
(def bolt-prepared? (atom false))
(defbolt prepare-tracked-bolt [] {:prepare true}
[conf context collector]
- (reset! bolt-prepared? true)
(bolt
(execute [tuple]
+ (reset! bolt-prepared? true)
(ack! collector tuple))))
(def spout-opened? (atom false))
(defspout open-tracked-spout ["val"]
[conf context collector]
- (reset! spout-opened? true)
(spout
- (nextTuple [])))
+ (nextTuple []
+ (reset! spout-opened? true)
+ )))
(deftest test-submit-inactive-topology
(with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
@@ -440,8 +441,8 @@
(advance-cluster-time cluster 9)
(is (not @bolt-prepared?))
(is (not @spout-opened?))
- (.activate (:nimbus cluster) "test")
-
+ (.activate (:nimbus cluster) "test")
+
(advance-cluster-time cluster 12)
(assert-acked tracker 1)
(is @bolt-prepared?)
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index 487d80f..dc56f81 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -20,7 +20,7 @@
(:import [org.apache.storm.grouping LoadMapping])
(:use [org.apache.storm testing log config])
(:use [org.apache.storm.internal clojure])
- (:use [org.apache.storm.daemon common executor])
+ (:use [org.apache.storm.daemon common])
(:import [org.apache.storm Thrift])
(:import [org.apache.storm.utils Utils]
(org.apache.storm.daemon GrouperFactory)))
[5/5] storm git commit: add STORM-1277 to CHANGELOG
Posted by ka...@apache.org.
add STORM-1277 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/28563ece
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/28563ece
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/28563ece
Branch: refs/heads/master
Commit: 28563ece16274bcd61827f38e960ce2d27a57dea
Parents: c7d450f
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Aug 8 17:15:20 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Aug 8 17:15:20 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/28563ece/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bdd4e71..4c8b35e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-1277: port backtype.storm.daemon.executor to java
* STORM-2020: Stop using sun internal classes.
* STORM-2021: Fix license.
* STORM-2022: fix FieldsTest
[3/5] storm git commit: STORM-1277 port
backtype.storm.daemon.executor to java
Posted by ka...@apache.org.
STORM-1277 port backtype.storm.daemon.executor to java
* code rebased by Jungtaek Lim <ka...@gmail.com>
* Closes #1445
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a5e19d9b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a5e19d9b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a5e19d9b
Branch: refs/heads/master
Commit: a5e19d9b8064f83adf00190ed74518e2156faae2
Parents: 44068c4
Author: \u536b\u4e50 <we...@taobao.com>
Authored: Fri Apr 29 17:58:50 2016 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Aug 8 17:13:48 2016 +0900
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/common.clj | 18 +-
.../clj/org/apache/storm/daemon/executor.clj | 839 -------------------
.../org/apache/storm/daemon/local_executor.clj | 42 +
.../src/clj/org/apache/storm/daemon/worker.clj | 70 +-
storm-core/src/clj/org/apache/storm/testing.clj | 16 +-
.../src/jvm/org/apache/storm/Constants.java | 24 +-
.../org/apache/storm/daemon/StormCommon.java | 175 ++--
.../src/jvm/org/apache/storm/daemon/Task.java | 76 +-
.../jvm/org/apache/storm/executor/Executor.java | 576 +++++++++++++
.../apache/storm/executor/ExecutorShutdown.java | 111 +++
.../apache/storm/executor/ExecutorTransfer.java | 87 ++
.../apache/storm/executor/IRunningExecutor.java | 31 +
.../org/apache/storm/executor/TupleInfo.java | 90 ++
.../storm/executor/bolt/BoltExecutor.java | 138 +++
.../executor/bolt/BoltOutputCollectorImpl.java | 171 ++++
.../storm/executor/error/IReportError.java | 22 +
.../storm/executor/error/ReportError.java | 76 ++
.../storm/executor/error/ReportErrorAndDie.java | 47 ++
.../storm/executor/spout/SpoutExecutor.java | 255 ++++++
.../spout/SpoutOutputCollectorImpl.java | 147 ++++
.../apache/storm/stats/BoltExecutorStats.java | 1 +
.../jvm/org/apache/storm/stats/CommonStats.java | 5 +-
.../apache/storm/stats/SpoutExecutorStats.java | 1 +
.../org/apache/storm/task/TopologyContext.java | 32 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 29 +
.../storm/utils/WorkerBackpressureThread.java | 11 +-
.../org/apache/storm/integration_test.clj | 11 +-
.../test/clj/org/apache/storm/grouping_test.clj | 2 +-
28 files changed, 2077 insertions(+), 1026 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index cc5436c..01a49b3 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -52,20 +52,4 @@
(catch Throwable t#
(log-error t# "Error on initialization of server " ~(str name))
(Utils/exitProcess 13 "Error on initialization")
- )))))
-
-(defn worker-context [worker]
- (WorkerTopologyContext. (:system-topology worker)
- (:storm-conf worker)
- (:task->component worker)
- (:component->sorted-tasks worker)
- (:component->stream->fields worker)
- (:storm-id worker)
- (ConfigUtils/supervisorStormResourcesPath
- (ConfigUtils/supervisorStormDistRoot (:conf worker) (:storm-id worker)))
- (ConfigUtils/workerPidsRoot (:conf worker) (:worker-id worker))
- (:port worker)
- (:task-ids worker)
- (:default-shared-resources worker)
- (:user-shared-resources worker)
- ))
+ )))))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
deleted file mode 100644
index 1fdfbf5..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ /dev/null
@@ -1,839 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.executor
- (:use [org.apache.storm.daemon common])
- (:use [clojure.walk])
- (:import [org.apache.storm.generated Grouping Grouping$_Fields]
- [java.io Serializable]
- [org.apache.storm.stats BoltExecutorStats SpoutExecutorStats]
- [org.apache.storm.daemon.metrics BuiltinMetricsUtil SpoutThrottlingMetrics])
- (:use [org.apache.storm util config log])
- (:import [java.util List Random HashMap ArrayList LinkedList Map])
- (:import [org.apache.storm ICredentialsListener Thrift])
- (:import [org.apache.storm.hooks ITaskHook])
- (:import [org.apache.storm.tuple AddressedTuple Tuple Fields TupleImpl MessageId])
- (:import [org.apache.storm.spout ISpoutWaitStrategy ISpout SpoutOutputCollector ISpoutOutputCollector])
- (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
- EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
- (:import [org.apache.storm.grouping CustomStreamGrouping])
- (:import [org.apache.storm.task WorkerTopologyContext IBolt OutputCollector IOutputCollector])
- (:import [org.apache.storm.generated GlobalStreamId])
- (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback])
- (:import [com.lmax.disruptor InsufficientCapacityException])
- (:import [org.apache.storm.serialization KryoTupleSerializer])
- (:import [org.apache.storm.daemon Shutdownable StormCommon Acker Task GrouperFactory])
- (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
- (:import [org.apache.storm Config Constants])
- (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils])
- (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
- (:import [java.lang Thread Thread$UncaughtExceptionHandler]
- [java.util.concurrent ConcurrentLinkedQueue]
- [org.json.simple JSONValue]
- [com.lmax.disruptor.dsl ProducerType]
- [org.apache.storm StormTimer])
- (:require [clojure.set :as set]))
-
-
-;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
-(defn- outbound-groupings
- [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping topo-conf]
- (->> component->grouping
- (filter-key #(-> worker-context
- (.getComponentTasks %)
- count
- pos?))
- (map (fn [[component tgrouping]]
- [component
- (GrouperFactory/mkGrouper worker-context
- this-component-id
- stream-id
- out-fields
- tgrouping
- (.getComponentTasks worker-context component)
- topo-conf)]))
- (into {})
- (HashMap.)))
-
-(defn outbound-components
- "Returns map of stream id to component id to grouper"
- [^WorkerTopologyContext worker-context component-id topo-conf]
- (->> (.getTargets worker-context component-id)
- clojurify-structure
- (map (fn [[stream-id component->grouping]]
- [stream-id
- (outbound-groupings
- worker-context
- component-id
- stream-id
- (.getComponentOutputFields worker-context component-id stream-id)
- component->grouping
- topo-conf)]))
- (into (apply merge (map #(hash-map % nil) (.keySet (.get_streams (.getComponentCommon worker-context component-id))))))
- (HashMap.)))
-
-(defn executor-type [^WorkerTopologyContext context component-id]
- (let [topology (.getRawTopology context)
- spouts (.get_spouts topology)
- bolts (.get_bolts topology)]
- (cond (contains? spouts component-id) "spout"
- (contains? bolts component-id) "bolt"
- :else (throw (RuntimeException. (str "Could not find " component-id " in topology " topology))))))
-
-(defn executor-selector [executor-data & _] (keyword (:type executor-data)))
-
-(defmulti mk-threads executor-selector)
-(defmulti mk-executor-stats executor-selector)
-(defmulti close-component executor-selector)
-
-(defn- normalized-component-conf [storm-conf general-context component-id]
- (let [to-remove (disj (set ALL-CONFIGS)
- TOPOLOGY-DEBUG
- TOPOLOGY-MAX-SPOUT-PENDING
- TOPOLOGY-MAX-TASK-PARALLELISM
- TOPOLOGY-TRANSACTIONAL-ID
- TOPOLOGY-TICK-TUPLE-FREQ-SECS
- TOPOLOGY-SLEEP-SPOUT-WAIT-STRATEGY-TIME-MS
- TOPOLOGY-SPOUT-WAIT-STRATEGY
- TOPOLOGY-BOLTS-WINDOW-LENGTH-COUNT
- TOPOLOGY-BOLTS-WINDOW-LENGTH-DURATION-MS
- TOPOLOGY-BOLTS-SLIDING-INTERVAL-COUNT
- TOPOLOGY-BOLTS-SLIDING-INTERVAL-DURATION-MS
- TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-FIELD-NAME
- TOPOLOGY-BOLTS-LATE-TUPLE-STREAM
- TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-MAX-LAG-MS
- TOPOLOGY-BOLTS-MESSAGE-ID-FIELD-NAME
- TOPOLOGY-STATE-PROVIDER
- TOPOLOGY-STATE-PROVIDER-CONFIG
- )
- spec-conf (-> general-context
- (.getComponentCommon component-id)
- .get_json_conf
- (#(if % (JSONValue/parse %)))
- clojurify-structure)]
- (merge storm-conf (apply dissoc spec-conf to-remove))
- ))
-
-(defprotocol RunningExecutor
- (render-stats [this])
- (get-executor-id [this])
- (credentials-changed [this creds])
- (get-backpressure-flag [this]))
-
-(defn throttled-report-error-fn [executor]
- (let [storm-conf (:storm-conf executor)
- error-interval-secs (storm-conf TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS)
- max-per-interval (storm-conf TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL)
- interval-start-time (atom (Time/currentTimeSecs))
- interval-errors (atom 0)
- ]
- (fn [error]
- (log-error error)
- (when (> (Time/deltaSecs @interval-start-time)
- error-interval-secs)
- (reset! interval-errors 0)
- (reset! interval-start-time (Time/currentTimeSecs)))
- (swap! interval-errors inc)
-
- (when (<= @interval-errors max-per-interval)
- (.reportError (:storm-cluster-state executor) (:storm-id executor) (:component-id executor)
- (Utils/hostname storm-conf)
- (long (.getThisWorkerPort (:worker-context executor))) error)
- ))))
-
-;; in its own function so that it can be mocked out by tracked topologies
-(defn mk-executor-transfer-fn [batch-transfer->worker storm-conf]
- (fn this
- [task tuple]
- (let [val (AddressedTuple. task tuple)]
- (when (= true (storm-conf TOPOLOGY-DEBUG))
- (log-message "TRANSFERING tuple " val))
- (.publish ^DisruptorQueue batch-transfer->worker val))))
-
-(defn mk-executor-data [worker executor-id]
- (let [worker-context (worker-context worker)
- task-ids (clojurify-structure (StormCommon/executorIdToTasks executor-id))
- component-id (.getComponentId worker-context (first task-ids))
- storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)
- executor-type (executor-type worker-context component-id)
- batch-transfer->worker (DisruptorQueue.
- (str "executor" executor-id "-send-queue")
- ProducerType/SINGLE
- (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
- (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
- (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
- (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
- ]
- (recursive-map
- :worker worker
- :worker-context worker-context
- :executor-id executor-id
- :task-ids task-ids
- :component-id component-id
- :open-or-prepare-was-called? (atom false)
- :storm-conf storm-conf
- :receive-queue ((:executor-receive-queue-map worker) executor-id)
- :storm-id (:storm-id worker)
- :conf (:conf worker)
- :shared-executor-data (HashMap.)
- :storm-active-atom (:storm-active-atom worker)
- :storm-component->debug-atom (:storm-component->debug-atom worker)
- :batch-transfer-queue batch-transfer->worker
- :transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf)
- :suicide-fn (:suicide-fn worker)
- :storm-cluster-state (ClusterUtils/mkStormClusterState (:state-store worker) (Utils/getWorkerACL storm-conf)
- (ClusterStateContext. DaemonType/WORKER))
- :type executor-type
- ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
- :stats (mk-executor-stats <> (ConfigUtils/samplingRate storm-conf))
- :interval->task->metric-registry (HashMap.)
- :task->component (:task->component worker)
- :stream->component->grouper (outbound-components worker-context component-id storm-conf)
- :report-error (throttled-report-error-fn <>)
- :report-error-and-die (reify
- Thread$UncaughtExceptionHandler
- (uncaughtException [this _ error]
- (try
- ((:report-error <>) error)
- (catch Exception e
- (log-error e "Error while reporting error to cluster, proceeding with shutdown")))
- (if (or
- (Utils/exceptionCauseIsInstanceOf InterruptedException error)
- (Utils/exceptionCauseIsInstanceOf java.io.InterruptedIOException error))
- (log-message "Got interrupted excpetion shutting thread down...")
- ((:suicide-fn <>)))))
- :sampler (ConfigUtils/mkStatsSampler storm-conf)
- :backpressure (atom false)
- :spout-throttling-metrics (if (= (keyword executor-type) :spout)
- (SpoutThrottlingMetrics.)
- nil)
- ;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function?
- )))
-
-(defn- mk-disruptor-backpressure-handler [executor-data]
- "make a handler for the executor's receive disruptor queue to
- check highWaterMark and lowWaterMark for backpressure"
- (reify DisruptorBackpressureCallback
- (highWaterMark [this]
- "When receive queue is above highWaterMark"
- (if (not @(:backpressure executor-data))
- (do (reset! (:backpressure executor-data) true)
- (log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true")
- (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))
- (lowWaterMark [this]
- "When receive queue is below lowWaterMark"
- (if @(:backpressure executor-data)
- (do (reset! (:backpressure executor-data) false)
- (log-debug "executor " (:executor-id executor-data) " is not-congested, set backpressure flag false")
- (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))))
-
-(defn start-batch-transfer->worker-handler! [worker executor-data]
- (let [worker-transfer-fn (:transfer-fn worker)
- cached-emit (MutableObject. (ArrayList.))
- storm-conf (:storm-conf executor-data)
- serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data))
- ^DisruptorQueue batch-transfer-queue (:batch-transfer-queue executor-data)
- handler (reify com.lmax.disruptor.EventHandler
- (onEvent [this o seq-id batch-end?]
- (let [^ArrayList alist (.getObject cached-emit)]
- (.add alist o)
- (when batch-end?
- (worker-transfer-fn serializer alist)
- (.setObject cached-emit (ArrayList.))))))
- ]
- (Utils/asyncLoop
- (fn [] (.consumeBatchWhenAvailable batch-transfer-queue handler) 0)
- (.getName batch-transfer-queue)
- (:uncaught-exception-handler (:report-error-and-die executor-data)))))
-
-;; TODO: this is all expensive... should be precomputed
-(defn send-unanchored
- [^Task task-data stream values transfer-fn]
- (let [out-tuple (.getTuple task-data stream values)]
- (fast-list-iter [t (.getOutgoingTasks task-data stream values)]
- (transfer-fn t out-tuple))))
-
-(defn setup-metrics! [executor-data]
- (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
- distinct-time-bucket-intervals (keys interval->task->metric-registry)]
- (doseq [interval distinct-time-bucket-intervals]
- (.scheduleRecurring
- (:user-timer (:worker executor-data))
- interval
- interval
- (fn []
- (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
- (.publish ^DisruptorQueue receive-queue val)))))))
-
-(defn metrics-tick
- [executor-data task-data ^TupleImpl tuple]
- (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
- interval (.getInteger tuple 0)
- transfer-fn (:transfer-fn executor-data)
- task-id (.getTaskId task-data)
- name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
- task-info (IMetricsConsumer$TaskInfo.
- (Utils/hostname (:storm-conf executor-data))
- (.getThisWorkerPort worker-context)
- (:component-id executor-data)
- task-id
- (long (Time/currentTimeSecs))
- interval)
- data-points (->> name->imetric
- (map (fn [[name imetric]]
- (let [value (.getValueAndReset ^IMetric imetric)]
- (if value
- (IMetricsConsumer$DataPoint. name value)))))
- (filter identity)
- (into []))]
- (when (seq data-points)
- (send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points] transfer-fn))))
-
-(defn setup-ticks! [worker executor-data]
- (let [storm-conf (:storm-conf executor-data)
- tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
- receive-queue (:receive-queue executor-data)
- context (:worker-context executor-data)]
- (when tick-time-secs
- (if (or (Utils/isSystemId (:component-id executor-data))
- (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
- (= :spout (keyword (:type executor-data)))))
- (log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
- (.scheduleRecurring
- (:user-timer worker)
- tick-time-secs
- tick-time-secs
- (fn []
- (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
- (.publish ^DisruptorQueue receive-queue val))))))))
-
-(defn mk-executor [worker executor-id initial-credentials]
- (let [executor-data (mk-executor-data worker executor-id)
- transfer-fn (:transfer-fn executor-data)
- _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
- task-datas (->> executor-data
- :task-ids
- (map (fn [t] (let [task (Task. (stringify-keys executor-data) t)
- stream StormCommon/SYSTEM_STREAM_ID
- values ["startup"]]
- ;; when this is called, the threads for the executor haven't been started yet,
- ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
- (send-unanchored task stream values transfer-fn)
- [t task]
- )))
- (into {})
- (HashMap.))
- _ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id))
- report-error-and-die (:report-error-and-die executor-data)
- component-id (:component-id executor-data)
-
-
- disruptor-handler (mk-disruptor-backpressure-handler executor-data)
- _ (.registerBackpressureCallback (:receive-queue executor-data) disruptor-handler)
- _ (-> (.setHighWaterMark (:receive-queue executor-data) ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
- (.setLowWaterMark ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
- (.setEnableBackpressure ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)))
-
- ;; starting the batch-transfer->worker ensures that anything publishing to that queue
- ;; doesn't block (because it's a single threaded queue and the caching/consumer started
- ;; trick isn't thread-safe)
- system-threads [(start-batch-transfer->worker-handler! worker executor-data)]
- handlers (try
- (mk-threads executor-data task-datas initial-credentials)
- (catch Throwable t (.uncaughtException report-error-and-die nil t)))
- threads (concat handlers system-threads)]
- (setup-ticks! worker executor-data)
-
- (log-message "Finished loading executor " component-id ":" (pr-str executor-id))
- ;; TODO: add method here to get rendered stats... have worker call that when heartbeating
- (reify
- RunningExecutor
- (render-stats [this]
- (.renderStats (:stats executor-data)))
- (get-executor-id [this]
- executor-id)
- (credentials-changed [this creds]
- (let [receive-queue (:receive-queue executor-data)
- context (:worker-context executor-data)
- val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID))]]
- (.publish ^DisruptorQueue receive-queue val)))
- (get-backpressure-flag [this]
- @(:backpressure executor-data))
- Shutdownable
- (shutdown
- [this]
- (log-message "Shutting down executor " component-id ":" (pr-str executor-id))
- (.haltWithInterrupt ^DisruptorQueue (:receive-queue executor-data))
- (.haltWithInterrupt ^DisruptorQueue (:batch-transfer-queue executor-data))
- (doseq [t threads]
- (.interrupt t)
- (.join t))
-
- (.cleanupStats (:stats executor-data))
- (doseq [user-context (map #(.getUserContext %) (vals task-datas))]
- (doseq [hook (.getHooks user-context)]
- (.cleanup hook)))
- (.disconnect (:storm-cluster-state executor-data))
- (when @(:open-or-prepare-was-called? executor-data)
- (doseq [obj (map #(.getTaskObject %) (vals task-datas))]
- (close-component executor-data obj)))
- (log-message "Shut down executor " component-id ":" (pr-str executor-id)))
- )))
-
-(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id]
- (let [^ISpout spout (.getTaskObject task-data)
- storm-conf (:storm-conf executor-data)
- task-id (.getTaskId task-data)]
- ;;TODO: need to throttle these when there's lots of failures
- (when (= true (storm-conf TOPOLOGY-DEBUG))
- (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
- (.fail spout msg-id)
- (.applyOn (SpoutFailInfo. msg-id task-id time-delta) (.getUserContext task-data))
- (when time-delta
- (.spoutFailedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
-
-(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
- (let [storm-conf (:storm-conf executor-data)
- ^ISpout spout (.getTaskObject task-data)
- task-id (.getTaskId task-data)]
- (when (= true (storm-conf TOPOLOGY-DEBUG))
- (log-message "SPOUT Acking message " id " " msg-id))
- (.ack spout msg-id)
- (.applyOn (SpoutAckInfo. msg-id task-id time-delta) (.getUserContext task-data))
- (when time-delta
- (.spoutAckedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
-
-(defn mk-task-receiver [executor-data tuple-action-fn]
- (let [task-ids (:task-ids executor-data)
- debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
- ]
- (reify com.lmax.disruptor.EventHandler
- (onEvent [this tuple-batch sequence-id end-of-batch?]
- (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
- (let [^TupleImpl tuple (.getTuple addressed-tuple)
- task-id (.getDest addressed-tuple)]
- (when debug? (log-message "Processing received message FOR " task-id " TUPLE: " tuple))
- (if (not= task-id AddressedTuple/BROADCAST_DEST)
- (tuple-action-fn task-id tuple)
- ;; null task ids are broadcast tuples
- (fast-list-iter [task-id task-ids]
- (tuple-action-fn task-id tuple)
- ))
- ))))))
-
-(defn executor-max-spout-pending [storm-conf num-tasks]
- (let [p (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)]
- (if p (* p num-tasks))))
-
-(defn init-spout-wait-strategy [storm-conf]
- (let [ret (-> storm-conf (get TOPOLOGY-SPOUT-WAIT-STRATEGY) Utils/newInstance)]
- (.prepare ret storm-conf)
- ret
- ))
-
-;; Send sampled data to the eventlogger if the global or component level
-;; debug flag is set (via nimbus api).
-(defn send-to-eventlogger [executor-data task-data values component-id message-id random]
- (let [c->d @(:storm-component->debug-atom executor-data)
- options (get c->d component-id (get c->d (:storm-id executor-data)))
- spct (if (and (not-nil? options) (:enable options)) (:samplingpct options) 0)]
- ;; the thread's initialized random number generator is used to generate
- ;; uniformily distributed random numbers.
- (when (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
- (send-unanchored
- task-data
- StormCommon/EVENTLOGGER_STREAM_ID
- [component-id message-id (System/currentTimeMillis) values]
- (:transfer-fn executor-data)))))
-
-(defmethod mk-threads :spout [executor-data task-datas initial-credentials]
- (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data
- ^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf)
- max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
- ^Integer max-spout-pending (if max-spout-pending (int max-spout-pending))
- last-active (atom false)
- spouts (ArrayList. (map #(.getTaskObject %) (vals task-datas)))
- rand (Random. (Utils/secureRandomLong))
- ^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))
-
- pending (RotatingMap.
- 2 ;; microoptimize for performance of .size method
- (reify RotatingMap$ExpiredCallback
- (expire [this id [task-id spout-id tuple-info start-time-ms]]
- (let [time-delta (if start-time-ms (Time/deltaMs start-time-ms))]
- (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta "TIMEOUT" id)
- ))))
- tuple-action-fn (fn [task-id ^TupleImpl tuple]
- (let [stream-id (.getSourceStreamId tuple)]
- (condp = stream-id
- Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
- Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
- Constants/CREDENTIALS_CHANGED_STREAM_ID
- (let [task-data (get task-datas task-id)
- spout-obj (.getTaskObject task-data)]
- (when (instance? ICredentialsListener spout-obj)
- (.setCredentials spout-obj (.getValue tuple 0))))
- Acker/ACKER_RESET_TIMEOUT_STREAM_ID
- (let [id (.getValue tuple 0)
- pending-for-id (.get pending id)]
- (when pending-for-id
- (.put pending id pending-for-id)))
- (let [id (.getValue tuple 0)
- time-delta-ms (.getValue tuple 1)
- [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
- (when spout-id
- (when-not (= stored-task-id task-id)
- (throw (RuntimeException. (str "Fatal error, mismatched task ids: " task-id " " stored-task-id))))
- (let [time-delta (if start-time-ms time-delta-ms)]
- (condp = stream-id
- Acker/ACKER_ACK_STREAM_ID (ack-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta id)
- Acker/ACKER_FAIL_STREAM_ID (fail-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta "FAIL-STREAM" id)
- )))
- ;; TODO: on failure, emit tuple to failure stream
- ))))
- receive-queue (:receive-queue executor-data)
- event-handler (mk-task-receiver executor-data tuple-action-fn)
- has-ackers? (StormCommon/hasAckers storm-conf)
- has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
- emitted-count (MutableLong. 0)
- empty-emit-streak (MutableLong. 0)
- spout-transfer-fn (fn []
- ;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
- (while (not @(:storm-active-atom executor-data))
- (Thread/sleep 100))
- (log-message "Opening spout " component-id ":" (keys task-datas))
- (.registerAll (:spout-throttling-metrics executor-data) storm-conf (.getUserContext (first (vals task-datas))))
- (doseq [[task-id task-data] task-datas
- :let [^ISpout spout-obj (.getTaskObject task-data)
- send-spout-msg (fn [out-stream-id values message-id out-task-id]
- (.increment emitted-count)
- (let [out-tasks (if out-task-id
- (.getOutgoingTasks task-data out-task-id out-stream-id values)
- (.getOutgoingTasks task-data out-stream-id values))
- rooted? (and message-id has-ackers?)
- root-id (if rooted? (MessageId/generateId rand))
- ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
- (fast-list-iter [out-task out-tasks id out-ids]
- (let [tuple-id (if rooted?
- (MessageId/makeRootId root-id id)
- (MessageId/makeUnanchored))
- out-tuple (TupleImpl. worker-context
- values
- task-id
- out-stream-id
- tuple-id)]
- (transfer-fn out-task out-tuple)))
- (if has-eventloggers?
- (send-to-eventlogger executor-data task-data values component-id message-id rand))
- (if (and rooted?
- (not (.isEmpty out-ids)))
- (do
- (.put pending root-id [task-id
- message-id
- {:stream out-stream-id
- :values (if debug? values nil)}
- (if (.call ^Callable sampler) (System/currentTimeMillis))])
- (send-unanchored task-data
- Acker/ACKER_INIT_STREAM_ID
- [root-id (Utils/bitXorVals out-ids) task-id]
- (:transfer-fn executor-data)))
- (when message-id
- (ack-spout-msg executor-data task-data message-id
- {:stream out-stream-id :values values}
- (if (.call ^Callable sampler) 0) "0:")))
- (or out-tasks [])))]]
-
- (.registerAll (.getBuiltInMetrics task-data) storm-conf (.getUserContext task-data))
- (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
- "receive" receive-queue}
- storm-conf (.getUserContext task-data))
-
- (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials))
-
- (.open spout-obj
- storm-conf
- (.getUserContext task-data)
- (SpoutOutputCollector.
- (reify ISpoutOutputCollector
- (^long getPendingCount[this]
- (.size pending))
- (^List emit [this ^String stream-id ^List tuple ^Object message-id]
- (send-spout-msg stream-id tuple message-id nil))
- (^void emitDirect [this ^int out-task-id ^String stream-id
- ^List tuple ^Object message-id]
- (send-spout-msg stream-id tuple message-id out-task-id))
- (reportError [this error]
- (report-error error))))))
-
- (reset! open-or-prepare-was-called? true)
- (log-message "Opened spout " component-id ":" (keys task-datas))
- (setup-metrics! executor-data)
-
- (fn []
- ;; This design requires that spouts be non-blocking
- (.consumeBatch ^DisruptorQueue receive-queue event-handler)
-
- (let [active? @(:storm-active-atom executor-data)
- curr-count (.get emitted-count)
- backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
- throttle-on (and backpressure-enabled
- @(:throttle-on (:worker executor-data)))
- reached-max-spout-pending (and max-spout-pending
- (>= (.size pending) max-spout-pending))]
- (if active?
- ; activated
- (do
- (when-not @last-active
- (reset! last-active true)
- (log-message "Activating spout " component-id ":" (keys task-datas))
- (fast-list-iter [^ISpout spout spouts] (.activate spout)))
-
- (if (and (not (.isFull transfer-queue))
- (not throttle-on)
- (not reached-max-spout-pending))
- (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))))
- ; deactivated
- (do
- (when @last-active
- (reset! last-active false)
- (log-message "Deactivating spout " component-id ":" (keys task-datas))
- (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
- ;; TODO: log that it's getting throttled
- (Time/sleep 100)
- (.skippedInactive (:spout-throttling-metrics executor-data) (:stats executor-data))))
-
- (if (and (= curr-count (.get emitted-count)) active?)
- (do (.increment empty-emit-streak)
- (.emptyEmit spout-wait-strategy (.get empty-emit-streak))
- ;; update the spout throttling metrics
- (if throttle-on
- (.skippedThrottle (:spout-throttling-metrics executor-data) (:stats executor-data))
- (if reached-max-spout-pending
- (.skippedMaxSpout (:spout-throttling-metrics executor-data) (:stats executor-data)))))
- (.set empty-emit-streak 0)))
- 0))]
-
- [(Utils/asyncLoop
- spout-transfer-fn
- false ; isDaemon
- (:report-error-and-die executor-data)
- Thread/NORM_PRIORITY
- true ; isFactory
- true ; startImmediately
- (str component-id "-executor" (:executor-id executor-data)))]))
-
-(defn- tuple-time-delta! [^TupleImpl tuple]
- (let [ms (.getProcessSampleStartTime tuple)]
- (if ms
- (Time/deltaMs ms))))
-
-(defn- tuple-execute-time-delta! [^TupleImpl tuple]
- (let [ms (.getExecuteSampleStartTime tuple)]
- (if ms
- (Time/deltaMs ms))))
-
-(defn put-xor! [^Map pending key id]
- (let [curr (or (.get pending key) (long 0))]
- (.put pending key (bit-xor curr id))))
-
-(defmethod mk-threads :bolt [executor-data task-datas initial-credentials]
- (let [storm-conf (:storm-conf executor-data)
- execute-sampler (ConfigUtils/mkStatsSampler storm-conf)
- executor-stats (:stats executor-data)
- {:keys [storm-conf component-id worker-context transfer-fn report-error sampler
- open-or-prepare-was-called?]} executor-data
- rand (Random. (Utils/secureRandomLong))
-
- tuple-action-fn (fn [task-id ^TupleImpl tuple]
- ;; synchronization needs to be done with a key provided by this bolt, otherwise:
- ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
- ;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization
- ;; buffer other tuples until fully synchronized, then process all of those tuples
- ;; then go into normal loop
- ;; spill to disk?
- ;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task
- ;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests
- ;; or just timeout the sync messages that are coming in until full sync is hit from that task
- ;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates
- ;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
- ;; TODO: how to handle incremental updates as well as synchronizations at same time
- ;; TODO: need to version tuples somehow
-
- ;;(log-debug "Received tuple " tuple " at task " task-id)
- ;; need to do it this way to avoid reflection
- (let [stream-id (.getSourceStreamId tuple)]
- (condp = stream-id
- Constants/CREDENTIALS_CHANGED_STREAM_ID
- (let [^Task task-data (get task-datas task-id)
- bolt-obj (.getTaskObject task-data)]
- (when (instance? ICredentialsListener bolt-obj)
- (.setCredentials ^ICredentialsListener bolt-obj (.getValue tuple 0))))
- Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
- (let [^Task task-data (get task-datas task-id)
- ^IBolt bolt-obj (.getTaskObject task-data)
- user-context (.getUserContext task-data)
- sampler? (.call ^Callable sampler)
- execute-sampler? (.call ^Callable execute-sampler)
- now (if (or sampler? execute-sampler?) (System/currentTimeMillis))
- receive-queue (:receive-queue executor-data)]
- (when sampler?
- (.setProcessSampleStartTime tuple now))
- (when execute-sampler?
- (.setExecuteSampleStartTime tuple now))
- (.execute bolt-obj tuple)
- (let [delta (tuple-execute-time-delta! tuple)]
- (when (= true (storm-conf TOPOLOGY-DEBUG))
- (log-message "Execute done TUPLE " tuple " TASK: " task-id " DELTA: " delta))
-
- (.applyOn (BoltExecuteInfo. tuple task-id delta) user-context)
- (when delta
- (.boltExecuteTuple executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple)
- delta)))))))
- has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
- bolt-transfer-fn (fn []
- ;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
- (while (not @(:storm-active-atom executor-data))
- (Thread/sleep 100))
-
- (log-message "Preparing bolt " component-id ":" (keys task-datas))
- (doseq [[task-id task-data] task-datas
- :let [^IBolt bolt-obj (.getTaskObject task-data)
- user-context (.getUserContext task-data)
- transfer-fn (:transfer-fn executor-data)
- bolt-emit (fn [stream anchors values task]
- (let [out-tasks (if task
- (.getOutgoingTasks task-data task stream values)
- (.getOutgoingTasks task-data stream values))]
- (fast-list-iter [t out-tasks]
- (let [anchors-to-ids (HashMap.)]
- (fast-list-iter [^TupleImpl a anchors]
- (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
- (when (pos? (count root-ids))
- (let [edge-id (MessageId/generateId rand)]
- (.updateAckVal a edge-id)
- (fast-list-iter [root-id root-ids]
- (put-xor! anchors-to-ids root-id edge-id))))))
- (let [tuple (TupleImpl. worker-context
- values
- task-id
- stream
- (MessageId/makeId anchors-to-ids))]
- (transfer-fn t tuple))))
- (if has-eventloggers?
- (send-to-eventlogger executor-data task-data values component-id nil rand))
- (or out-tasks [])))]]
- (.registerAll (.getBuiltInMetrics task-data) storm-conf user-context)
- (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials))
- (if (= component-id Constants/SYSTEM_COMPONENT_ID)
- (do
- (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
- "receive" (:receive-queue executor-data)
- "transfer" (:transfer-queue (:worker executor-data))}
- storm-conf user-context)
- (BuiltinMetricsUtil/registerIconnectionClientMetrics
- (.deref (:cached-node+port->socket (:worker executor-data))) storm-conf user-context)
- (BuiltinMetricsUtil/registerIconnectionServerMetric (:receiver (:worker executor-data)) storm-conf user-context))
- (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data)
- "receive" (:receive-queue executor-data)}
- storm-conf user-context))
-
- (.prepare bolt-obj
- storm-conf
- user-context
- (OutputCollector.
- (reify IOutputCollector
- (emit [this stream anchors values]
- (bolt-emit stream anchors values nil))
- (emitDirect [this task stream anchors values]
- (bolt-emit stream anchors values task))
- (^void ack [this ^Tuple tuple]
- (let [^TupleImpl tuple tuple
- ack-val (.getAckVal tuple)]
- (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
- (send-unanchored task-data
- Acker/ACKER_ACK_STREAM_ID
- [root (bit-xor id ack-val)]
- transfer-fn)))
- (let [delta (tuple-time-delta! tuple)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))]
- (when debug?
- (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
- (.applyOn (BoltAckInfo. tuple task-id delta) user-context)
- (when delta
- (.boltAckedTuple executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple)
- delta))))
- (^void fail [this ^Tuple tuple]
- (fast-list-iter [root (.. tuple getMessageId getAnchors)]
- (send-unanchored task-data
- Acker/ACKER_FAIL_STREAM_ID
- [root]
- transfer-fn))
- (let [delta (tuple-time-delta! tuple)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))]
- (when debug?
- (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
- (.applyOn (BoltFailInfo. tuple task-id delta) user-context)
- (when delta
- (.boltFailedTuple executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple)
- delta))))
- (^void resetTimeout [this ^Tuple tuple]
- (fast-list-iter [root (.. tuple getMessageId getAnchors)]
- (send-unanchored task-data
- Acker/ACKER_RESET_TIMEOUT_STREAM_ID
- [root]
- transfer-fn)))
- (reportError [this error]
- (report-error error))))))
- (reset! open-or-prepare-was-called? true)
- (log-message "Prepared bolt " component-id ":" (keys task-datas))
- (setup-metrics! executor-data)
-
- (let [receive-queue (:receive-queue executor-data)
- event-handler (mk-task-receiver executor-data tuple-action-fn)]
- (fn []
- (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue event-handler)
- 0)))]
- ;; TODO: can get any SubscribedState objects out of the context now
-
- [(Utils/asyncLoop
- bolt-transfer-fn
- false ; isDaemon
- (:report-error-and-die executor-data)
- Thread/NORM_PRIORITY
- true ; isFactory
- true ; startImmediately
- (str component-id "-executor" (:executor-id executor-data)))]))
-
-(defmethod close-component :spout [executor-data spout]
- (.close spout))
-
-(defmethod close-component :bolt [executor-data bolt]
- (.cleanup bolt))
-
-;; TODO: refactor this to be part of an executor-specific map
-(defmethod mk-executor-stats :spout [_ rate]
- (SpoutExecutorStats. rate))
-
-(defmethod mk-executor-stats :bolt [_ rate]
- (BoltExecutorStats. rate))
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
new file mode 100644
index 0000000..1e46e37
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
@@ -0,0 +1,42 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.local-executor
+ (:use [org.apache.storm util config log])
+ (:import [org.apache.storm.tuple AddressedTuple]
+ [org.apache.storm.executor Executor ExecutorTransfer])
+ (:import [org.apache.storm.utils DisruptorQueue])
+ (:import [org.apache.storm Config Constants]))
+
+(defn local-transfer-executor-tuple []
+ (fn [task tuple batch-transfer->worker]
+ (let [val (AddressedTuple. task tuple)]
+ (.publish ^DisruptorQueue batch-transfer->worker val))))
+
+(defn mk-local-executor-transfer [worker-topology-context batch-queue storm-conf transfer-fn]
+ (proxy [ExecutorTransfer] [worker-topology-context batch-queue storm-conf transfer-fn]
+ (transfer [task tuple]
+ (let [batch-transfer->worker (.getBatchTransferQueue this)]
+ ((local-transfer-executor-tuple) task tuple batch-transfer->worker)))))
+
+(defn mk-local-executor [workerData executorId credentials]
+ (let [executor (Executor/mkExecutor workerData executorId credentials)
+ worker-topology-context (.getWorkerTopologyContext executor)
+ batch-transfer-queue (.getTransferWorkerQueue executor)
+ storm-conf (.getStormConf executor)
+ transfer-fn (.getTransferFn executor)
+ local-executor-transfer (mk-local-executor-transfer worker-topology-context batch-transfer-queue storm-conf transfer-fn)]
+ (.setLocalExecutorTransfer executor local-executor-transfer)
+ (.execute executor)))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index dddce68..781558c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -18,11 +18,13 @@
(:use [org.apache.storm config log util converter local-state-converter])
(:require [clj-time.core :as time])
(:require [clj-time.coerce :as coerce])
- (:require [org.apache.storm.daemon [executor :as executor]])
(:require [clojure.set :as set])
+ (:require [org.apache.storm.daemon
+ [local-executor :as local-executor]])
(:import [java.io File]
- [org.apache.storm.stats StatsUtil])
+ [org.apache.storm.stats StatsUtil]
+ [java.util.concurrent.atomic AtomicBoolean AtomicReference])
(:import [java.util.concurrent Executors]
[org.apache.storm.hooks IWorkerHook BaseWorkerHook]
[uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J])
@@ -48,10 +50,9 @@
(:import [org.apache.logging.log4j.core.config LoggerConfig])
(:import [org.apache.storm.generated LogConfig LogLevelAction])
(:import [org.apache.storm StormTimer])
+ (:import [org.apache.storm.executor Executor])
(:gen-class))
-(defmulti mk-suicide-fn cluster-mode)
-
(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port assignment-versions]
(log-message "Reading Assignments.")
(let [assignment (:executor->node+port (clojurify-assignment (.assignmentInfo storm-cluster-state storm-id nil)))]
@@ -69,7 +70,7 @@
(let [stats (if-not executors
(StatsUtil/mkEmptyExecutorZkHbs (:executors worker))
(StatsUtil/convertExecutorZkHbs (->> executors
- (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))
+ (map (fn [e] {(.getExecutorId e) (.renderStats e)}))
(apply merge))))
zk-hb (StatsUtil/mkZkWorkerHb (:storm-id worker) stats (. (:uptime worker) upTime))]
;; do the zookeeper heartbeat
@@ -96,7 +97,7 @@
(defn worker-outbound-tasks
"Returns seq of task-ids that receive messages from this worker"
[worker]
- (let [context (worker-context worker)
+ (let [context (StormCommon/makeWorkerContext (Utils/convertClojureMapToJavaMap worker))
components (mapcat
(fn [task-id]
(->> (.getComponentId context (int task-id))
@@ -145,11 +146,11 @@
assignment-id (:assignment-id worker)
port (:port worker)
storm-cluster-state (:storm-cluster-state worker)
- prev-backpressure-flag @(:backpressure worker)
+ prev-backpressure-flag (.get (:backpressure worker))
;; the backpressure flag is true if at least one of the disruptor queues has throttle-on
curr-backpressure-flag (if executors
(or (.getThrottleOn (:transfer-queue worker))
- (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors)))
+ (reduce #(or %1 %2) (map #(.getBackPressureFlag %1) executors)))
prev-backpressure-flag)]
;; update the worker's backpressure flag to zookeeper only when it has changed
(when (not= prev-backpressure-flag curr-backpressure-flag)
@@ -157,7 +158,7 @@
(log-debug "worker backpressure flag changing from " prev-backpressure-flag " to " curr-backpressure-flag)
(.workerBackpressure storm-cluster-state storm-id assignment-id (long port) curr-backpressure-flag)
;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
- (reset! (:backpressure worker) curr-backpressure-flag)
+ (.set (:backpressure worker) curr-backpressure-flag)
(catch Exception exc
(log-error exc "workerBackpressure update failed when connecting to ZK ... will retry"))))
))))
@@ -276,7 +277,7 @@
mq-context (if mq-context
mq-context
(TransportFactory/makeContext storm-conf))]
-
+ ;; TODO: when translating this function, use constants defined in Constants.java
(recursive-map
:conf conf
:mq-context mq-context
@@ -290,9 +291,9 @@
;; when worker bootup, worker will start to setup initial connections to
;; other workers. When all connection is ready, we will enable this flag
;; and spout and bolt will be activated.
- :worker-active-flag (atom false)
- :storm-active-atom (atom false)
- :storm-component->debug-atom (atom {})
+ :worker-active-flag (atom false) ;; used in worker only, keep it as atom
+ :storm-active-atom (AtomicBoolean. false)
+ :storm-component->debug-atom (AtomicReference.)
:executors executors
:task-ids (->> receive-queue-map keys (map int) sort)
:storm-conf storm-conf
@@ -321,7 +322,7 @@
(mapcat (fn [e] (for [t (executor->tasks e)] [t (first e)])))
(into {})
(HashMap.))
- :suicide-fn (mk-suicide-fn conf)
+ :suicide-fn (Utils/mkSuicideFn)
:uptime (Utils/makeUptimeComputer)
:default-shared-resources (mk-default-resources <>)
:user-shared-resources (mk-user-resources <>)
@@ -329,10 +330,10 @@
:transfer-fn (mk-transfer-fn <>)
:load-mapping (LoadMapping.)
:assignment-versions assignment-versions
- :backpressure (atom false) ;; whether this worker is going slow
- :transfer-backpressure (atom false) ;; if the transfer queue is backed-up
- :backpressure-trigger (atom false) ;; a trigger for synchronization with executors
- :throttle-on (atom false) ;; whether throttle is activated for spouts
+ :backpressure (AtomicBoolean. false) ;; whether this worker is going slow
+ :transfer-backpressure (AtomicBoolean. false) ;; if the transfer queue is backed-up
+ :backpressure-trigger (AtomicBoolean. false) ;; a trigger for synchronization with executors
+ :throttle-on (AtomicBoolean. false) ;; whether throttle is activated for spouts
)))
(defn- endpoint->string [[node port]]
@@ -422,6 +423,7 @@
current-connections (set (keys @(:cached-node+port->socket worker)))
new-connections (set/difference needed-connections current-connections)
remove-connections (set/difference current-connections needed-connections)]
+
(swap! (:cached-node+port->socket worker)
#(HashMap. (merge (into {} %1) %2))
(into {}
@@ -455,11 +457,9 @@
(:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
([worker callback]
(let [base (clojurify-storm-base (.stormBase (:storm-cluster-state worker) (:storm-id worker) callback))]
- (reset!
- (:storm-active-atom worker)
- (and (= :active (-> base :status :type)) @(:worker-active-flag worker)))
- (reset! (:storm-component->debug-atom worker) (-> base :component->debug))
- (log-debug "Event debug options " @(:storm-component->debug-atom worker)))))
+ (.set (:storm-active-atom worker) (and (= :active (-> base :status :type)) @(:worker-active-flag worker)))
+ (.set (:storm-component->debug-atom worker) (map-val thriftify-debugoptions (-> base :component->debug)))
+ (log-debug "Event debug options " (.get (:storm-component->debug-atom worker))))))
;; TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
(defn mk-transfer-tuples-handler [worker]
@@ -514,7 +514,7 @@
^IConnection socket (:receiver worker)]
(log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port worker))
(.registerRecv socket (DeserializingConnectionCallback. (:storm-conf worker)
- (worker-context worker)
+ (StormCommon/makeWorkerContext (Utils/convertClojureMapToJavaMap worker))
transfer-local-fn))))
(defn- close-resources [worker]
@@ -605,7 +605,7 @@
(defn run-worker-start-hooks [worker]
(let [topology (:topology worker)
topo-conf (:storm-conf worker)
- worker-topology-context (worker-context worker)
+ worker-topology-context (StormCommon/makeWorkerContext (Utils/convertClojureMapToJavaMap worker))
hooks (.get_worker_hooks topology)]
(dofor [hook hooks]
(let [hook-bytes (Utils/toByteArray hook)
@@ -680,7 +680,9 @@
_ (run-worker-start-hooks worker)
- _ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e initial-credentials)))
+ _ (if (ConfigUtils/isLocalMode storm-conf)
+ (reset! executors (dofor [e (:executors worker)] (local-executor/mk-local-executor worker e initial-credentials)))
+ (reset! executors (dofor [e (:executors worker)] (.execute (Executor/mkExecutor worker e initial-credentials)))))
transfer-tuples (mk-transfer-tuples-handler worker)
@@ -699,7 +701,7 @@
(.start backpressure-thread))
callback (fn cb []
(let [throttle-on (.topologyBackpressure storm-cluster-state storm-id cb)]
- (reset! (:throttle-on worker) throttle-on)))
+ (.set (:throttle-on worker) throttle-on)))
_ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
(.topologyBackpressure storm-cluster-state storm-id callback))
@@ -766,14 +768,14 @@
(let [new-creds (clojurify-crdentials (.credentials (:storm-cluster-state worker) storm-id nil))]
(when-not (= new-creds @credentials) ;;This does not have to be atomic, worst case we update when one is not needed
(AuthUtils/updateSubject subject auto-creds new-creds)
- (dofor [e @executors] (.credentials-changed e new-creds))
+ (dofor [e @executors] (.credenetialsChanged e new-creds))
(reset! credentials new-creds))))
check-throttle-changed (fn []
(let [callback (fn cb []
(let [throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id cb)]
- (reset! (:throttle-on worker) throttle-on)))
+ (.set (:throttle-on worker) throttle-on)))
new-throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id callback)]
- (reset! (:throttle-on worker) new-throttle-on)))
+ (.set (:throttle-on worker) new-throttle-on)))
check-log-config-changed (fn []
(let [log-config (.topologyLogConfig (:storm-cluster-state worker) storm-id nil)]
(process-log-config-change latest-log-config original-log-levels log-config)
@@ -809,14 +811,6 @@
ret
))))))
-(defmethod mk-suicide-fn
- :local [conf]
- (fn [] (Utils/exitProcess 1 "Worker died")))
-
-(defmethod mk-suicide-fn
- :distributed [conf]
- (fn [] (Utils/exitProcess 1 "Worker died")))
-
(defn -main [storm-id assignment-id port-str worker-id]
(let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
(Utils/setupDefaultUncaughtExceptionHandler)
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index d67b48d..dc676d6 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -17,15 +17,17 @@
(ns org.apache.storm.testing
(:require [org.apache.storm.daemon
[nimbus :as nimbus]
+ [local-executor :as local-executor]
[local-supervisor :as local-supervisor]
[common :as common]
- [worker :as worker]
- [executor :as executor]])
+ [worker :as worker]])
(:import [org.apache.commons.io FileUtils]
[org.apache.storm.utils]
[org.apache.storm.zookeeper Zookeeper]
[org.apache.storm ProcessSimulator]
- [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager])
+ [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager]
+ [org.apache.storm.executor Executor]
+ [java.util.concurrent.atomic AtomicBoolean])
(:import [java.io File])
(:import [java.util HashMap ArrayList])
(:import [java.util.concurrent.atomic AtomicInteger])
@@ -697,12 +699,12 @@
;; of tuple emission (and not on a separate thread later) for
;; topologies to be tracked correctly. This is because "transferred" *must*
;; be incremented before "processing".
- executor/mk-executor-transfer-fn
- (let [old# executor/mk-executor-transfer-fn]
+ local-executor/local-transfer-executor-tuple
+ (let [old# local-executor/local-transfer-executor-tuple]
(fn [& args#]
(let [transferrer# (apply old# args#)]
(fn [& args2#]
- ;; (log-message "Transferring: " transfer-args#)
+ ;; (log-message "Transferring: " args2#)
(increment-global! id# "transferred" 1)
(apply transferrer# args2#)))))]
(with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
@@ -759,7 +761,7 @@
{}
(HashMap.)
(HashMap.)
- (atom false))]
+ (AtomicBoolean. false))]
(TupleImpl. context values 1 stream)))
(defmacro with-timeout
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/Constants.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Constants.java b/storm-core/src/jvm/org/apache/storm/Constants.java
index b2c642e..9436db1 100644
--- a/storm-core/src/jvm/org/apache/storm/Constants.java
+++ b/storm-core/src/jvm/org/apache/storm/Constants.java
@@ -22,7 +22,7 @@ import clojure.lang.RT;
public class Constants {
- public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";
+ public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";
public static final long SYSTEM_TASK_ID = -1;
public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]");
@@ -32,5 +32,27 @@ public class Constants {
public static final String METRICS_STREAM_ID = "__metrics";
public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
public static final String CREDENTIALS_CHANGED_STREAM_ID = "__credentials";
+
+ public static final Object TOPOLOGY = "topology";
+ public static final String SYSTEM_TOPOLOGY = "system-topology";
+ public static final String STORM_CONF = "storm-conf";
+ public static final String STORM_ID = "storm-id";
+ public static final String WORKER_ID = "worker-id";
+ public static final String CONF = "conf";
+ public static final String PORT = "port";
+ public static final String TASK_TO_COMPONENT = "task->component";
+ public static final String COMPONENT_TO_SORTED_TASKS = "component->sorted-tasks";
+ public static final String COMPONENT_TO_STREAM_TO_FIELDS = "component->stream->fields";
+ public static final String TASK_IDS = "task-ids";
+ public static final String DEFAULT_SHARED_RESOURCES = "default-shared-resources";
+ public static final String USER_SHARED_RESOURCES = "user-shared-resources";
+ public static final String USER_TIMER = "user-timer";
+ public static final String TRANSFER_FN = "transfer-fn";
+ public static final String SUICIDE_FN = "suicide-fn";
+ public static final String THROTTLE_ON = "throttle-on";
+ public static final String EXECUTOR_RECEIVE_QUEUE_MAP = "executor-receive-queue-map";
+ public static final String STORM_ACTIVE_ATOM = "storm-active-atom";
+ public static final String COMPONENT_TO_DEBUG_ATOM = "storm-component->debug-atom";
+ public static final Object LOAD_MAPPING = "load-mapping";
}
[2/5] storm git commit: STORM-1277 port
backtype.storm.daemon.executor to java
Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index cfc435f..391c2d3 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -39,6 +39,8 @@ import org.apache.storm.metric.filter.FilterByMetricName;
import org.apache.storm.metric.util.DataPointExpander;
import org.apache.storm.security.auth.IAuthorizer;
import org.apache.storm.task.IBolt;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.IPredicate;
import org.apache.storm.utils.ThriftTopologyUtils;
@@ -47,6 +49,7 @@ import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -65,6 +68,7 @@ public class StormCommon {
* Provide an instance of this class for delegates to use. To mock out
* delegated methods, provide an instance of a subclass that overrides the
* implementation of the delegated method.
+ *
* @param common a StormCommon instance
* @return the previously set instance
*/
@@ -90,6 +94,7 @@ public class StormCommon {
public static final String TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE = "expandMapType";
public static final String TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR = "metricNameSeparator";
+ @SuppressWarnings("unchecked")
public static String getStormId(final IStormClusterState stormClusterState, final String topologyName) {
List<String> activeTopologys = stormClusterState.activeStorms();
IPredicate pred = new IPredicate<String>() {
@@ -108,7 +113,7 @@ public class StormCommon {
protected Map<String, StormBase> topologyBasesImpl(IStormClusterState stormClusterState) {
List<String> activeTopologys = stormClusterState.activeStorms();
- Map<String, StormBase> stormBases = new HashMap<String, StormBase>();
+ Map<String, StormBase> stormBases = new HashMap<>();
for (String topologyId : activeTopologys) {
StormBase base = stormClusterState.stormBase(topologyId, null);
stormBases.put(topologyId, base);
@@ -122,11 +127,12 @@ public class StormCommon {
}
}
+ @SuppressWarnings("unchecked")
private static void validateIds(StormTopology topology) throws InvalidTopologyException {
- List<String> componentIds = new ArrayList<String>();
+ List<String> componentIds = new ArrayList<>();
for (StormTopology._Fields field : Thrift.getTopologyFields()) {
- if (ThriftTopologyUtils.isWorkerHook(field) == false) {
+ if (!ThriftTopologyUtils.isWorkerHook(field)) {
Object value = topology.getFieldValue(field);
Map<String, Object> componentMap = (Map<String, Object>) value;
componentIds.addAll(componentMap.keySet());
@@ -149,7 +155,7 @@ public class StormCommon {
}
List<String> offending = Utils.getRepeat(componentIds);
- if (offending.isEmpty() == false) {
+ if (!offending.isEmpty()) {
throw new InvalidTopologyException("Duplicate component ids: " + offending);
}
}
@@ -162,19 +168,21 @@ public class StormCommon {
}
}
+ @SuppressWarnings("unchecked")
public static Map<String, Object> allComponents(StormTopology topology) {
- Map<String, Object> components = new HashMap<String, Object>();
+ Map<String, Object> components = new HashMap<>();
List<StormTopology._Fields> topologyFields = Arrays.asList(Thrift.getTopologyFields());
for (StormTopology._Fields field : topologyFields) {
- if (ThriftTopologyUtils.isWorkerHook(field) == false) {
+ if (!ThriftTopologyUtils.isWorkerHook(field)) {
components.putAll(((Map) topology.getFieldValue(field)));
}
}
return components;
}
+ @SuppressWarnings("unchecked")
public static Map componentConf(Object component) {
- Map<Object, Object> conf = new HashMap<Object, Object>();
+ Map<Object, Object> conf = new HashMap<>();
ComponentCommon common = getComponentCommon(component);
String jconf = common.get_json_conf();
if (jconf != null) {
@@ -183,6 +191,7 @@ public class StormCommon {
return conf;
}
+ @SuppressWarnings("unchecked")
public static void validateBasic(StormTopology topology) throws InvalidTopologyException {
validateIds(topology);
@@ -191,7 +200,7 @@ public class StormCommon {
if (spoutComponents != null) {
for (Object obj : spoutComponents.values()) {
ComponentCommon common = getComponentCommon(obj);
- if (isEmptyInputs(common) == false) {
+ if (!isEmptyInputs(common)) {
throw new InvalidTopologyException("May not declare inputs for a spout");
}
}
@@ -211,7 +220,7 @@ public class StormCommon {
}
private static Set<String> getStreamOutputFields(Map<String, StreamInfo> streams) {
- Set<String> outputFields = new HashSet<String>();
+ Set<String> outputFields = new HashSet<>();
for (StreamInfo streamInfo : streams.values()) {
outputFields.addAll(streamInfo.get_output_fields());
}
@@ -227,24 +236,27 @@ public class StormCommon {
for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
String sourceStreamId = input.getKey().get_streamId();
String sourceComponentId = input.getKey().get_componentId();
- if(componentMap.keySet().contains(sourceComponentId) == false) {
- throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent component [" + sourceComponentId + "]");
+ if (!componentMap.keySet().contains(sourceComponentId)) {
+ throw new InvalidTopologyException("Component: [" + componentId +
+ "] subscribes from non-existent component [" + sourceComponentId + "]");
}
ComponentCommon sourceComponent = getComponentCommon(componentMap.get(sourceComponentId));
- if (sourceComponent.get_streams().containsKey(sourceStreamId) == false) {
- throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent stream: " +
+ if (!sourceComponent.get_streams().containsKey(sourceStreamId)) {
+ throw new InvalidTopologyException("Component: [" + componentId +
+ "] subscribes from non-existent stream: " +
"[" + sourceStreamId + "] of component [" + sourceComponentId + "]");
}
Grouping grouping = input.getValue();
if (Thrift.groupingType(grouping) == Grouping._Fields.FIELDS) {
- List<String> fields = new ArrayList<String>(grouping.get_fields());
+ List<String> fields = new ArrayList<>(grouping.get_fields());
Map<String, StreamInfo> streams = sourceComponent.get_streams();
Set<String> sourceOutputFields = getStreamOutputFields(streams);
fields.removeAll(sourceOutputFields);
if (fields.size() != 0) {
- throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from stream: [" + sourceStreamId +"] of component " +
+ throw new InvalidTopologyException("Component: [" + componentId +
+ "] subscribes from stream: [" + sourceStreamId + "] of component " +
"[" + sourceComponentId + "] + with non-existent fields: " + fields);
}
}
@@ -253,18 +265,22 @@ public class StormCommon {
}
public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) {
- Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
+ Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
Set<String> boltIds = topology.get_bolts().keySet();
Set<String> spoutIds = topology.get_spouts().keySet();
- for(String id : spoutIds) {
- inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ for (String id : spoutIds) {
+ inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID),
+ Thrift.prepareFieldsGrouping(Arrays.asList("id")));
}
- for(String id : boltIds) {
- inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
- inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
- inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ for (String id : boltIds) {
+ inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID),
+ Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID),
+ Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
+ Thrift.prepareFieldsGrouping(Arrays.asList("id")));
}
return inputs;
}
@@ -272,10 +288,12 @@ public class StormCommon {
public static IBolt makeAckerBolt() {
return _instance.makeAckerBoltImpl();
}
+
public IBolt makeAckerBoltImpl() {
return new Acker();
}
+ @SuppressWarnings("unchecked")
public static void addAcker(Map conf, StormTopology topology) {
int ackerNum = Utils.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);
@@ -285,13 +303,13 @@ public class StormCommon {
outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
- Map<String, Object> ackerConf = new HashMap<String, Object>();
+ Map<String, Object> ackerConf = new HashMap<>();
ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);
- for(Bolt bolt : topology.get_bolts().values()) {
+ for (Bolt bolt : topology.get_bolts().values()) {
ComponentCommon common = bolt.get_common();
common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
@@ -301,12 +319,17 @@ public class StormCommon {
for (SpoutSpec spout : topology.get_spouts().values()) {
ComponentCommon common = spout.get_common();
Map spoutConf = componentConf(spout);
- spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+ spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
+ Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
common.set_json_conf(JSONValue.toJSONString(spoutConf));
- common.put_to_streams(Acker.ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
- common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping());
- common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping());
- common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareDirectGrouping());
+ common.put_to_streams(Acker.ACKER_INIT_STREAM_ID,
+ Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
+ common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID),
+ Thrift.prepareDirectGrouping());
+ common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID),
+ Thrift.prepareDirectGrouping());
+ common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
+ Thrift.prepareDirectGrouping());
}
topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
@@ -341,9 +364,8 @@ public class StormCommon {
}
public static List<String> eventLoggerBoltFields() {
- List<String> fields = Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID, EventLoggerBolt.FIELD_TS,
- EventLoggerBolt.FIELD_VALUES);
- return fields;
+ return Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID,
+ EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES);
}
public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
@@ -352,34 +374,38 @@ public class StormCommon {
allIds.addAll(topology.get_bolts().keySet());
allIds.addAll(topology.get_spouts().keySet());
- for(String id : allIds) {
- inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
+ for (String id : allIds) {
+ inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),
+ Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
}
return inputs;
}
public static void addEventLogger(Map conf, StormTopology topology) {
- Integer numExecutors = Utils.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
- HashMap<String, Object> componentConf = new HashMap<String, Object>();
+ Integer numExecutors = Utils.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
+ Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+ HashMap<String, Object> componentConf = new HashMap<>();
componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
- Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);
+ Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(
+ eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);
- for(Object component : allComponents(topology).values()) {
+ for (Object component : allComponents(topology).values()) {
ComponentCommon common = getComponentCommon(component);
common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
}
topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
}
+ @SuppressWarnings("unchecked")
public static Map<String, Bolt> metricsConsumerBoltSpecs(Map conf, StormTopology topology) {
- Map<String, Bolt> metricsConsumerBolts = new HashMap<String, Bolt>();
+ Map<String, Bolt> metricsConsumerBolts = new HashMap<>();
- Set<String> componentIdsEmitMetrics = new HashSet<String>();
+ Set<String> componentIdsEmitMetrics = new HashSet<>();
componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);
- Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
+ Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
for (String componentId : componentIdsEmitMetrics) {
inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
}
@@ -433,13 +459,14 @@ public class StormCommon {
}
}
+ @SuppressWarnings("unused")
public static void addSystemComponents(Map conf, StormTopology topology) {
- Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
+ Map<String, StreamInfo> outputStreams = new HashMap<>();
outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("rate_secs")));
outputStreams.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
outputStreams.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(Arrays.asList("creds")));
- Map<String, Object> boltConf = new HashMap<String, Object>();
+ Map<String, Object> boltConf = new HashMap<>();
boltConf.put(Config.TOPOLOGY_TASKS, 0);
Bolt systemBoltSpec = Thrift.prepareSerializedBoltDetails(null, new SystemBolt(), outputStreams, 0, boltConf);
@@ -468,20 +495,12 @@ public class StormCommon {
public static boolean hasAckers(Map stormConf) {
Object ackerNum = stormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
- if (ackerNum == null || Utils.getInt(ackerNum) > 0) {
- return true;
- } else {
- return false;
- }
+ return ackerNum == null || Utils.getInt(ackerNum) > 0;
}
public static boolean hasEventLoggers(Map stormConf) {
Object eventLoggerNum = stormConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS);
- if (eventLoggerNum == null || Utils.getInt(eventLoggerNum) > 0) {
- return true;
- } else {
- return false;
- }
+ return eventLoggerNum == null || Utils.getInt(eventLoggerNum) > 0;
}
public static int numStartExecutors(Object component) throws InvalidTopologyException {
@@ -492,15 +511,16 @@ public class StormCommon {
public static Map<Integer, String> stormTaskInfo(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
return _instance.stormTaskInfoImpl(userTopology, stormConf);
}
+
/*
* Returns map from task -> componentId
*/
protected Map<Integer, String> stormTaskInfoImpl(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
- Map<Integer, String> taskIdToComponentId = new HashMap<Integer, String>();
+ Map<Integer, String> taskIdToComponentId = new HashMap<>();
StormTopology systemTopology = systemTopology(stormConf, userTopology);
Map<String, Object> components = allComponents(systemTopology);
- Map<String, Integer> componentIdToTaskNum = new TreeMap<String, Integer>();
+ Map<String, Integer> componentIdToTaskNum = new TreeMap<>();
for (Map.Entry<String, Object> entry : components.entrySet()) {
Map conf = componentConf(entry.getValue());
Object taskNum = conf.get(Config.TOPOLOGY_TASKS);
@@ -521,7 +541,7 @@ public class StormCommon {
}
public static List<Integer> executorIdToTasks(List<Long> executorId) {
- List<Integer> taskIds = new ArrayList<Integer>();
+ List<Integer> taskIds = new ArrayList<>();
int taskId = executorId.get(0).intValue();
while (taskId <= executorId.get(1).intValue()) {
taskIds.add(taskId);
@@ -530,22 +550,24 @@ public class StormCommon {
return taskIds;
}
- public static Map<Integer, NodeInfo> taskToNodeport(Map<List<Long>, NodeInfo> executorToNodeport) {
- Map<Integer, NodeInfo> tasksToNodeport = new HashMap<Integer, NodeInfo>();
- for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodeport.entrySet()) {
+ public static Map<Integer, NodeInfo> taskToNodeport(Map<List<Long>, NodeInfo> executorToNodePort) {
+ Map<Integer, NodeInfo> tasksToNodePort = new HashMap<>();
+ for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
List<Integer> taskIds = executorIdToTasks(entry.getKey());
for (Integer taskId : taskIds) {
- tasksToNodeport.put(taskId, entry.getValue());
+ tasksToNodePort.put(taskId, entry.getValue());
}
}
- return tasksToNodeport;
+ return tasksToNodePort;
}
- public static IAuthorizer mkAuthorizationHandler(String klassName, Map conf) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+ public static IAuthorizer mkAuthorizationHandler(String klassName, Map conf)
+ throws IllegalAccessException, InstantiationException, ClassNotFoundException {
return _instance.mkAuthorizationHandlerImpl(klassName, conf);
}
- protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf)
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException {
IAuthorizer aznHandler = null;
if (klassName != null) {
Class aznClass = Class.forName(klassName);
@@ -554,10 +576,35 @@ public class StormCommon {
if (aznHandler != null) {
aznHandler.prepare(conf);
}
- LOG.debug("authorization class name:{}, class:{}, handler:{}",klassName, aznClass, aznHandler);
+ LOG.debug("authorization class name:{}, class:{}, handler:{}", klassName, aznClass, aznHandler);
}
}
return aznHandler;
}
+
+ @SuppressWarnings("unchecked")
+ public static WorkerTopologyContext makeWorkerContext(Map<String, Object> workerData) {
+ try {
+ StormTopology stormTopology = (StormTopology) workerData.get(Constants.SYSTEM_TOPOLOGY);
+ Map stormConf = (Map) workerData.get(Constants.STORM_CONF);
+ Map<Integer, String> taskToComponent = (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT);
+ Map<String, List<Integer>> componentToSortedTasks =
+ (Map<String, List<Integer>>) workerData.get(Constants.COMPONENT_TO_SORTED_TASKS);
+ Map<String, Map<String, Fields>> componentToStreamToFields =
+ (Map<String, Map<String, Fields>>) workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS);
+ String stormId = (String) workerData.get(Constants.STORM_ID);
+ Map conf = (Map) workerData.get(Constants.CONF);
+ Integer port = (Integer) workerData.get(Constants.PORT);
+ String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, stormId));
+ String pidDir = ConfigUtils.workerPidsRoot(conf, stormId);
+ List<Integer> workerTasks = (List<Integer>) workerData.get(Constants.TASK_IDS);
+ Map<String, Object> defaultResources = (Map<String, Object>) workerData.get(Constants.DEFAULT_SHARED_RESOURCES);
+ Map<String, Object> userResources = (Map<String, Object>) workerData.get(Constants.USER_SHARED_RESOURCES);
+ return new WorkerTopologyContext(stormTopology, stormConf, taskToComponent, componentToSortedTasks,
+ componentToStreamToFields, stormId, codeDir, pidDir, port, workerTasks, defaultResources, userResources);
+ } catch (IOException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Task.java b/storm-core/src/jvm/org/apache/storm/daemon/Task.java
index 60b570a..2e846ff 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Task.java
@@ -18,9 +18,11 @@
package org.apache.storm.daemon;
import org.apache.storm.Config;
+import org.apache.storm.Constants;
import org.apache.storm.Thrift;
import org.apache.storm.daemon.metrics.BuiltinMetrics;
import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.executor.Executor;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentObject;
import org.apache.storm.generated.JavaObject;
@@ -32,7 +34,6 @@ import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.hooks.ITaskHook;
import org.apache.storm.hooks.info.EmitInfo;
-import org.apache.storm.metric.api.IMetric;
import org.apache.storm.spout.ShellSpout;
import org.apache.storm.stats.CommonStats;
import org.apache.storm.task.ShellBolt;
@@ -57,7 +58,7 @@ public class Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
- private Map executorData;
+ private Executor executor;
private Map workerData;
private TopologyContext systemTopologyContext;
private TopologyContext userTopologyContext;
@@ -65,7 +66,7 @@ public class Task {
private LoadMapping loadMapping;
private Integer taskId;
private String componentId;
- private Object taskObject; // Spout/Bolt object
+ private Object taskObject; // Spout/Bolt object
private Map stormConf;
private Callable<Boolean> emitSampler;
private CommonStats executorStats;
@@ -73,20 +74,20 @@ public class Task {
private BuiltinMetrics builtInMetrics;
private boolean debug;
- public Task(Map executorData, Integer taskId) throws IOException {
+ public Task(Executor executor, Integer taskId) throws IOException {
this.taskId = taskId;
- this.executorData = executorData;
- this.workerData = (Map) executorData.get("worker");
- this.stormConf = (Map) executorData.get("storm-conf");
- this.componentId = (String) executorData.get("component-id");
- this.streamComponentToGrouper = (Map<String, Map<String, LoadAwareCustomStreamGrouping>>) executorData.get("stream->component->grouper");
- this.executorStats = (CommonStats) executorData.get("stats");
- this.builtInMetrics = BuiltinMetricsUtil.mkData((String) executorData.get("type"), this.executorStats);
- this.workerTopologyContext = (WorkerTopologyContext) executorData.get("worker-context");
+ this.executor = executor;
+ this.workerData = executor.getWorkerData();
+ this.stormConf = executor.getStormConf();
+ this.componentId = executor.getComponentId();
+ this.streamComponentToGrouper = executor.getStreamToComponentToGrouper();
+ this.executorStats = executor.getStats();
+ this.builtInMetrics = BuiltinMetricsUtil.mkData(executor.getType(), this.executorStats);
+ this.workerTopologyContext = executor.getWorkerTopologyContext();
this.emitSampler = ConfigUtils.mkStatsSampler(stormConf);
- this.loadMapping = (LoadMapping) workerData.get("load-mapping");
- this.systemTopologyContext = mkTopologyContext((StormTopology) workerData.get("system-topology"));
- this.userTopologyContext = mkTopologyContext((StormTopology) workerData.get("topology"));
+ this.loadMapping = (LoadMapping) workerData.get(Constants.LOAD_MAPPING);
+ this.systemTopologyContext = mkTopologyContext((StormTopology) workerData.get(Constants.SYSTEM_TOPOLOGY));
+ this.userTopologyContext = mkTopologyContext((StormTopology) workerData.get(Constants.TOPOLOGY));
this.taskObject = mkTaskObject();
this.debug = stormConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) stormConf.get(Config.TOPOLOGY_DEBUG);
this.addTaskHooks();
@@ -119,13 +120,14 @@ public class Task {
if (null != outTaskId) {
return Collections.singletonList(outTaskId);
}
- return null;
+ return new ArrayList<>(0);
}
public List<Integer> getOutgoingTasks(String stream, List<Object> values) {
if (debug) {
LOG.info("Emitting: {} {} {}", componentId, stream, values);
}
+
List<Integer> outTasks = new ArrayList<>();
if (!streamComponentToGrouper.containsKey(stream)) {
throw new IllegalArgumentException("Unknown stream ID: " + stream);
@@ -164,7 +166,7 @@ public class Task {
return componentId;
}
- public TopologyContext getUserContext() throws IOException {
+ public TopologyContext getUserContext() {
return userTopologyContext;
}
@@ -177,25 +179,25 @@ public class Task {
}
private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
- Map conf = (Map) workerData.get("conf");
+ Map conf = (Map) workerData.get(Constants.CONF);
return new TopologyContext(
- topology,
- (Map) workerData.get("storm-conf"),
- (Map<Integer, String>) workerData.get("task->component"),
- (Map<String, List<Integer>>) workerData.get("component->sorted-tasks"),
- (Map<String, Map<String, Fields>>) workerData.get("component->stream->fields"),
- (String) workerData.get("storm-id"),
- ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, (String) workerData.get("storm-id"))),
- ConfigUtils.workerPidsRoot(conf, (String) workerData.get("worker-id")),
- taskId,
- (Integer) workerData.get("port"),
- (List<Integer>) workerData.get("task-ids"),
- (Map<String, Object>) workerData.get("default-shared-resources"),
- (Map<String, Object>) workerData.get("user-shared-resources"),
- (Map<String, Object>) executorData.get("shared-executor-data"),
- (Map<Integer, Map<Integer, Map<String, IMetric>>>) executorData.get("interval->task->metric-registry"),
- (clojure.lang.Atom) executorData.get("open-or-prepare-was-called?")
- );
+ topology,
+ (Map) workerData.get(Constants.STORM_CONF),
+ (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT),
+ (Map<String, List<Integer>>) workerData.get(Constants.COMPONENT_TO_SORTED_TASKS),
+ (Map<String, Map<String, Fields>>) workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS),
+ (String) workerData.get(Constants.STORM_ID),
+ ConfigUtils.supervisorStormResourcesPath(
+ ConfigUtils.supervisorStormDistRoot(conf, (String) workerData.get(Constants.STORM_ID))),
+ ConfigUtils.workerPidsRoot(conf, (String) workerData.get(Constants.WORKER_ID)),
+ taskId,
+ (Integer) workerData.get(Constants.PORT),
+ (List<Integer>) workerData.get(Constants.TASK_IDS),
+ (Map<String, Object>) workerData.get(Constants.DEFAULT_SHARED_RESOURCES),
+ (Map<String, Object>) workerData.get(Constants.USER_SHARED_RESOURCES),
+ executor.getSharedExecutorData(),
+ executor.getIntervalToTaskToMetricToRegistry(),
+ executor.getOpenOrPrepareWasCalled());
}
private Object mkTaskObject() {
@@ -203,8 +205,8 @@ public class Task {
Map<String, SpoutSpec> spouts = topology.get_spouts();
Map<String, Bolt> bolts = topology.get_bolts();
Map<String, StateSpoutSpec> stateSpouts = topology.get_state_spouts();
- Object result = null;
- ComponentObject componentObject = null;
+ Object result;
+ ComponentObject componentObject;
if (spouts.containsKey(componentId)) {
componentObject = spouts.get(componentId).get_spout_object();
} else if (bolts.containsKey(componentId)) {
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/Executor.java b/storm-core/src/jvm/org/apache/storm/executor/Executor.java
new file mode 100644
index 0000000..614d44d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/Executor.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler<Object> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Executor.class);
+
+ protected final Map workerData;
+ protected final WorkerTopologyContext workerTopologyContext;
+ protected final List<Long> executorId;
+ protected final List<Integer> taskIds;
+ protected final String componentId;
+ protected final AtomicBoolean openOrPrepareWasCalled;
+ protected final Map stormConf;
+ protected final Map conf;
+ protected final String stormId;
+ protected final HashMap sharedExecutorData;
+ protected final AtomicBoolean stormActive;
+ protected final AtomicReference<Map<String, DebugOptions>> stormComponentDebug;
+ protected final Runnable suicideFn;
+ protected final IStormClusterState stormClusterState;
+ protected final Map<Integer, String> taskToComponent;
+ protected CommonStats stats;
+ protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;
+ protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamToComponentToGrouper;
+ protected final ReportErrorAndDie reportErrorDie;
+ protected final Callable<Boolean> sampler;
+ protected final AtomicBoolean backpressure;
+ protected ExecutorTransfer executorTransfer;
+ protected final String type;
+ protected final AtomicBoolean throttleOn;
+ protected IFn transferFn;
+
+ protected final IReportError reportError;
+ protected final Random rand;
+ protected final DisruptorQueue transferQueue;
+ protected final DisruptorQueue receiveQueue;
+ protected Map<Integer, Task> idToTask;
+ protected final Map<String, String> credentials;
+ protected final Boolean isDebug;
+ protected final Boolean hasEventLoggers;
+ protected String hostname;
+
+ protected Executor(Map workerData, List<Long> executorId, Map<String, String> credentials) {
+ this.workerData = workerData;
+ this.executorId = executorId;
+ this.workerTopologyContext = StormCommon.makeWorkerContext(workerData);
+ this.taskIds = StormCommon.executorIdToTasks(executorId);
+ this.componentId = workerTopologyContext.getComponentId(taskIds.get(0));
+ this.openOrPrepareWasCalled = new AtomicBoolean(false);
+ this.stormConf = normalizedComponentConf((Map) workerData.get(Constants.STORM_CONF), workerTopologyContext, componentId);
+ this.receiveQueue = (DisruptorQueue) (((Map) workerData.get(Constants.EXECUTOR_RECEIVE_QUEUE_MAP)).get(executorId));
+ this.stormId = (String) workerData.get(Constants.STORM_ID);
+ this.conf = (Map) workerData.get(Constants.CONF);
+ this.sharedExecutorData = new HashMap();
+ this.stormActive = (AtomicBoolean) workerData.get(Constants.STORM_ACTIVE_ATOM);
+ this.stormComponentDebug = (AtomicReference<Map<String, DebugOptions>>) workerData.get(Constants.COMPONENT_TO_DEBUG_ATOM);
+
+ this.transferQueue = mkExecutorBatchQueue(stormConf, executorId);
+ this.transferFn = (IFn) workerData.get(Constants.TRANSFER_FN);
+ this.executorTransfer = new ExecutorTransfer(workerTopologyContext, transferQueue, stormConf, transferFn);
+
+ this.suicideFn = (Runnable) workerData.get(Constants.SUICIDE_FN);
+ try {
+ this.stormClusterState = ClusterUtils.mkStormClusterState(workerData.get("state-store"), Utils.getWorkerACL(stormConf),
+ new ClusterStateContext(DaemonType.WORKER));
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+
+ StormTopology topology = workerTopologyContext.getRawTopology();
+ Map<String, SpoutSpec> spouts = topology.get_spouts();
+ Map<String, Bolt> bolts = topology.get_bolts();
+ if (spouts.containsKey(componentId)) {
+ this.type = StatsUtil.SPOUT;
+ this.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(stormConf));
+ } else if (bolts.containsKey(componentId)) {
+ this.type = StatsUtil.BOLT;
+ this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(stormConf));
+ } else {
+ throw new RuntimeException("Could not find " + componentId + " in " + topology);
+ }
+
+ this.intervalToTaskToMetricToRegistry = new HashMap<>();
+ this.taskToComponent = (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT);
+ this.streamToComponentToGrouper = outboundComponents(workerTopologyContext, componentId, stormConf);
+ this.reportError = new ReportError(stormConf, stormClusterState, stormId, componentId, workerTopologyContext);
+ this.reportErrorDie = new ReportErrorAndDie(reportError, suicideFn);
+ this.sampler = ConfigUtils.mkStatsSampler(stormConf);
+ this.backpressure = new AtomicBoolean(false);
+ this.throttleOn = (AtomicBoolean) workerData.get(Constants.THROTTLE_ON);
+ this.isDebug = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
+ this.rand = new Random(Utils.secureRandomLong());
+ this.credentials = credentials;
+ this.hasEventLoggers = StormCommon.hasEventLoggers(stormConf);
+
+ try {
+ this.hostname = Utils.hostname(stormConf);
+ } catch (UnknownHostException ignored) {
+ this.hostname = "";
+ }
+ }
+
+ public static Executor mkExecutor(Map workerData, List<Long> executorId, Map<String, String> credentials) {
+ Executor executor;
+
+ Map<String, Object> convertedWorkerData = Utils.convertClojureMapToJavaMap(workerData);
+ WorkerTopologyContext workerTopologyContext = StormCommon.makeWorkerContext(convertedWorkerData);
+ List<Integer> taskIds = StormCommon.executorIdToTasks(executorId);
+ String componentId = workerTopologyContext.getComponentId(taskIds.get(0));
+
+ String type = getExecutorType(workerTopologyContext, componentId);
+ if (StatsUtil.SPOUT.equals(type)) {
+ executor = new SpoutExecutor(convertedWorkerData, executorId, credentials);
+ executor.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()));
+ } else {
+ executor = new BoltExecutor(convertedWorkerData, executorId, credentials);
+ executor.stats = new BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()));
+ }
+
+ Map<Integer, Task> idToTask = new HashMap<>();
+ for (Integer taskId : taskIds) {
+ try {
+ Task task = new Task(executor, taskId);
+ executor.sendUnanchored(
+ task, StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer());
+ idToTask.put(taskId, task);
+ } catch (IOException ex) {
+ throw Utils.wrapInRuntime(ex);
+ }
+ }
+ executor.init(idToTask);
+
+ return executor;
+ }
+
+ private static String getExecutorType(WorkerTopologyContext workerTopologyContext, String componentId) {
+ StormTopology topology = workerTopologyContext.getRawTopology();
+ Map<String, SpoutSpec> spouts = topology.get_spouts();
+ Map<String, Bolt> bolts = topology.get_bolts();
+ if (spouts.containsKey(componentId)) {
+ return StatsUtil.SPOUT;
+ } else if (bolts.containsKey(componentId)) {
+ return StatsUtil.BOLT;
+ } else {
+ throw new RuntimeException("Could not find " + componentId + " in " + topology);
+ }
+ }
+
+ /**
+ * separated from mkExecutor in order to replace executor transfer in executor data for testing
+ */
+ public ExecutorShutdown execute() throws Exception {
+ LOG.info("Loading executor tasks " + componentId + ":" + executorId);
+
+ registerBackpressure();
+ Utils.SmartThread systemThreads =
+ Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie);
+
+ String handlerName = componentId + "-executor" + executorId;
+ Utils.SmartThread handlers =
+ Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);
+ setupTicks(StatsUtil.SPOUT.equals(type));
+
+ LOG.info("Finished loading executor " + componentId + ":" + executorId);
+ return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
+ }
+
+ public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
+
+ public abstract void init(Map<Integer, Task> idToTask);
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
+ ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event;
+ for (AddressedTuple addressedTuple : addressedTuples) {
+ TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+ int taskId = addressedTuple.getDest();
+ if (isDebug) {
+ LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
+ }
+ if (taskId != AddressedTuple.BROADCAST_DEST) {
+ tupleActionFn(taskId, tuple);
+ } else {
+ for (Integer t : taskIds) {
+ tupleActionFn(t, tuple);
+ }
+ }
+ }
+ }
+
+ public void metricsTick(Task taskData, TupleImpl tuple) {
+ try {
+ Integer interval = tuple.getInteger(0);
+ int taskId = taskData.getTaskId();
+ Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = intervalToTaskToMetricToRegistry.get(interval);
+ Map<String, IMetric> nameToRegistry = null;
+ if (taskToMetricToRegistry != null) {
+ nameToRegistry = taskToMetricToRegistry.get(taskId);
+ }
+ if (nameToRegistry != null) {
+ IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
+ hostname, workerTopologyContext.getThisWorkerPort(),
+ componentId, taskId, Time.currentTimeSecs(), interval);
+ List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
+ for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {
+ IMetric metric = entry.getValue();
+ Object value = metric.getValueAndReset();
+ if (value != null) {
+ IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
+ dataPoints.add(dataPoint);
+ }
+ }
+ if (!dataPoints.isEmpty()) {
+ sendUnanchored(taskData, Constants.METRICS_STREAM_ID,
+ new Values(taskInfo, dataPoints), executorTransfer);
+ }
+ }
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ protected void setupMetrics() {
+ for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
+ StormTimer timerTask = (StormTimer) workerData.get(Constants.USER_TIMER);
+ timerTask.scheduleRecurring(interval, interval, new Runnable() {
+ @Override
+ public void run() {
+ TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(interval),
+ (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
+ List<AddressedTuple> metricsTickTuple =
+ Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+ receiveQueue.publish(metricsTickTuple);
+ }
+ });
+ }
+ }
+
+ public void sendUnanchored(Task task, String stream, List<Object> values, ExecutorTransfer transfer) {
+ Tuple tuple = task.getTuple(stream, values);
+ List<Integer> tasks = task.getOutgoingTasks(stream, values);
+ for (Integer t : tasks) {
+ transfer.transfer(t, tuple);
+ }
+ }
+
+ /**
+ * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
+ */
+ public void sendToEventLogger(Executor executor, Task taskData, List values,
+ String componentId, Object messageId, Random random) {
+ Map<String, DebugOptions> componentDebug = executor.getStormComponentDebug().get();
+ DebugOptions debugOptions = componentDebug.get(componentId);
+ if (debugOptions == null) {
+ debugOptions = componentDebug.get(executor.getStormId());
+ }
+ double spct = ((debugOptions != null) && (debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0;
+ if (spct > 0 && (random.nextDouble() * 100) < spct) {
+ sendUnanchored(taskData, StormCommon.EVENTLOGGER_STREAM_ID,
+ new Values(componentId, messageId, System.currentTimeMillis(), values),
+ executor.getExecutorTransfer());
+ }
+ }
+
+ private void registerBackpressure() {
+ receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback() {
+ @Override
+ public void highWaterMark() throws Exception {
+ if (!backpressure.get()) {
+ backpressure.set(true);
+ LOG.debug("executor " + executorId + " is congested, set backpressure flag true");
+ WorkerBackpressureThread.notifyBackpressureChecker(workerData.get("backpressure-trigger"));
+ }
+ }
+
+ @Override
+ public void lowWaterMark() throws Exception {
+ if (backpressure.get()) {
+ backpressure.set(false);
+ LOG.debug("executor " + executorId + " is not-congested, set backpressure flag false");
+ WorkerBackpressureThread.notifyBackpressureChecker(workerData.get("backpressure-trigger"));
+ }
+ }
+ });
+ receiveQueue.setHighWaterMark(Utils.getDouble(stormConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
+ receiveQueue.setLowWaterMark(Utils.getDouble(stormConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
+ receiveQueue.setEnableBackpressure(Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false));
+ }
+
+ protected void setupTicks(boolean isSpout) {
+ final Integer tickTimeSecs = Utils.getInt(stormConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
+ boolean enableMessageTimeout = (Boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
+ if (tickTimeSecs != null) {
+ if (Utils.isSystemId(componentId) || (!enableMessageTimeout && isSpout)) {
+ LOG.info("Timeouts disabled for executor " + componentId + ":" + executorId);
+ } else {
+ StormTimer timerTask = (StormTimer) workerData.get(Constants.USER_TIMER);
+ timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, new Runnable() {
+ @Override
+ public void run() {
+ TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
+ (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID);
+ List<AddressedTuple> tickTuple =
+ Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+ receiveQueue.publish(tickTuple);
+ }
+ });
+ }
+ }
+ }
+
+
+ private DisruptorQueue mkExecutorBatchQueue(Map stormConf, List<Long> executorId) {
+ int sendSize = Utils.getInt(stormConf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
+ int waitTimeOutMs = Utils.getInt(stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS));
+ int batchSize = Utils.getInt(stormConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE));
+ int batchTimeOutMs = Utils.getInt(stormConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
+ return new DisruptorQueue("executor" + executorId + "-send-queue", ProducerType.SINGLE,
+ sendSize, waitTimeOutMs, batchSize, batchTimeOutMs);
+ }
+
+ /**
+ * Returns map of stream id to component id to grouper
+ */
+ private Map<String, Map<String, LoadAwareCustomStreamGrouping>> outboundComponents(
+ WorkerTopologyContext workerTopologyContext, String componentId, Map stormConf) {
+ Map<String, Map<String, LoadAwareCustomStreamGrouping>> ret = new HashMap<>();
+
+ Map<String, Map<String, Grouping>> outputGroupings = workerTopologyContext.getTargets(componentId);
+ for (Map.Entry<String, Map<String, Grouping>> entry : outputGroupings.entrySet()) {
+ String streamId = entry.getKey();
+ Map<String, Grouping> componentGrouping = entry.getValue();
+ Fields outFields = workerTopologyContext.getComponentOutputFields(componentId, streamId);
+ Map<String, LoadAwareCustomStreamGrouping> componentGrouper = new HashMap<String, LoadAwareCustomStreamGrouping>();
+ for (Map.Entry<String, Grouping> cg : componentGrouping.entrySet()) {
+ String component = cg.getKey();
+ Grouping grouping = cg.getValue();
+ List<Integer> outTasks = workerTopologyContext.getComponentTasks(component);
+ LoadAwareCustomStreamGrouping grouper = GrouperFactory.mkGrouper(
+ workerTopologyContext, componentId, streamId, outFields, grouping, outTasks, stormConf);
+ componentGrouper.put(component, grouper);
+ }
+ if (componentGrouper.size() > 0) {
+ ret.put(streamId, componentGrouper);
+ }
+ }
+
+ for (String stream : workerTopologyContext.getComponentCommon(componentId).get_streams().keySet()) {
+ if (!ret.containsKey(stream)) {
+ ret.put(stream, null);
+ }
+ }
+
+ return ret;
+ }
+
+ private Map normalizedComponentConf(Map stormConf, WorkerTopologyContext topologyContext, String componentId) {
+ List<Object> keysToRemove = ConfigUtils.All_CONFIGS();
+ keysToRemove.remove(Config.TOPOLOGY_DEBUG);
+ keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING);
+ keysToRemove.remove(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
+ keysToRemove.remove(Config.TOPOLOGY_TRANSACTIONAL_ID);
+ keysToRemove.remove(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ keysToRemove.remove(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS);
+ keysToRemove.remove(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME);
+ keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER);
+ keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER_CONFIG);
+ keysToRemove.remove(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
+
+ Map<Object, Object> componentConf;
+ String specJsonConf = topologyContext.getComponentCommon(componentId).get_json_conf();
+ if (specJsonConf != null) {
+ componentConf = (Map<Object, Object>) JSONValue.parse(specJsonConf);
+ for (Object p : keysToRemove) {
+ componentConf.remove(p);
+ }
+ } else {
+ componentConf = new HashMap<>();
+ }
+
+ Map<Object, Object> ret = new HashMap<>();
+ ret.putAll(stormConf);
+ ret.putAll(componentConf);
+
+ return ret;
+ }
+
+ // =============================================================================
+ // ============================ getter methods =================================
+ // =============================================================================
+
+ public List<Long> getExecutorId() {
+ return executorId;
+ }
+
+ public List<Integer> getTaskIds() {
+ return taskIds;
+ }
+
+ public String getComponentId() {
+ return componentId;
+ }
+
+ public AtomicBoolean getOpenOrPrepareWasCalled() {
+ return openOrPrepareWasCalled;
+ }
+
+ public Map getStormConf() {
+ return stormConf;
+ }
+
+ public String getStormId() {
+ return stormId;
+ }
+
+ public CommonStats getStats() {
+ return stats;
+ }
+
+ public AtomicBoolean getThrottleOn() {
+ return throttleOn;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public Boolean getIsDebug() {
+ return isDebug;
+ }
+
+ public ExecutorTransfer getExecutorTransfer() {
+ return executorTransfer;
+ }
+
+ public IReportError getReportError() {
+ return reportError;
+ }
+
+ public WorkerTopologyContext getWorkerTopologyContext() {
+ return workerTopologyContext;
+ }
+
+ public Callable<Boolean> getSampler() {
+ return sampler;
+ }
+
+ public AtomicReference<Map<String, DebugOptions>> getStormComponentDebug() {
+ return stormComponentDebug;
+ }
+
+ public DisruptorQueue getReceiveQueue() {
+ return receiveQueue;
+ }
+
+ public AtomicBoolean getBackpressure() {
+ return backpressure;
+ }
+
+ public DisruptorQueue getTransferWorkerQueue() {
+ return transferQueue;
+ }
+
+ public IStormClusterState getStormClusterState() {
+ return stormClusterState;
+ }
+
+ public Map getWorkerData() {
+ return workerData;
+ }
+
+ public Map<String, Map<String, LoadAwareCustomStreamGrouping>> getStreamToComponentToGrouper() {
+ return streamToComponentToGrouper;
+ }
+
+ public HashMap getSharedExecutorData() {
+ return sharedExecutorData;
+ }
+
+ public Map<Integer, Map<Integer, Map<String, IMetric>>> getIntervalToTaskToMetricToRegistry() {
+ return intervalToTaskToMetricToRegistry;
+ }
+
+ public IFn getTransferFn() {
+ return transferFn;
+ }
+
+ @VisibleForTesting
+ public void setLocalExecutorTransfer(ExecutorTransfer executorTransfer) {
+ this.executorTransfer = executorTransfer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
new file mode 100644
index 0000000..795a33c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.Constants;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutorShutdown.class);
+ private final Executor executor;
+ private final List<Utils.SmartThread> threads;
+ private final Map<Integer, Task> taskDatas;
+
+ public ExecutorShutdown(Executor executor, List<Utils.SmartThread> threads, Map<Integer, Task> taskDatas) {
+ this.executor = executor;
+ this.threads = threads;
+ this.taskDatas = taskDatas;
+ }
+
+ @Override
+ public ExecutorStats renderStats() {
+ return executor.getStats().renderStats();
+ }
+
+ @Override
+ public List<Long> getExecutorId() {
+ return executor.getExecutorId();
+ }
+
+ @Override
+ public void credenetialsChanged(Credentials credentials) {
+ TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), new Values(credentials), (int) Constants.SYSTEM_TASK_ID,
+ Constants.CREDENTIALS_CHANGED_STREAM_ID);
+ List<AddressedTuple> addressedTuple = Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+ executor.getReceiveQueue().publish(addressedTuple);
+ }
+
+ @Override
+ public boolean getBackPressureFlag() {
+ return executor.getBackpressure().get();
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ LOG.info("Shutting down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
+ executor.getReceiveQueue().haltWithInterrupt();
+ executor.getTransferWorkerQueue().haltWithInterrupt();
+ for (Utils.SmartThread t : threads) {
+ t.interrupt();
+ t.join();
+ }
+ executor.getStats().cleanupStats();
+ for (Task task : taskDatas.values()) {
+ TopologyContext userContext = task.getUserContext();
+ for (ITaskHook hook : userContext.getHooks()) {
+ hook.cleanup();
+ }
+ }
+ executor.getStormClusterState().disconnect();
+ if (executor.getOpenOrPrepareWasCalled().get()) {
+ for (Task task : taskDatas.values()) {
+ Object object = task.getTaskObject();
+ if (object instanceof ISpout) {
+ ((ISpout) object).close();
+ } else if (object instanceof IBolt) {
+ ((IBolt) object).cleanup();
+ } else {
+ LOG.error("unknown component object");
+ }
+ }
+ }
+ LOG.info("Shut down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/ExecutorTransfer.java b/storm-core/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
new file mode 100644
index 0000000..8707319
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.lmax.disruptor.EventHandler;
+import org.apache.storm.Config;
+import org.apache.storm.serialization.KryoTupleSerializer;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableObject;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class ExecutorTransfer implements EventHandler, Callable {
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
+
+ private final DisruptorQueue batchTransferQueue;
+ private final KryoTupleSerializer serializer;
+ private final MutableObject cachedEmit;
+ private final IFn transferFn;
+ private final boolean isDebug;
+
+ public ExecutorTransfer(WorkerTopologyContext workerTopologyContext, DisruptorQueue batchTransferQueue, Map stormConf, IFn transferFn) {
+ this.batchTransferQueue = batchTransferQueue;
+ this.serializer = new KryoTupleSerializer(stormConf, workerTopologyContext);
+ this.cachedEmit = new MutableObject(new ArrayList<>());
+ this.transferFn = transferFn;
+ this.isDebug = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
+ }
+
+ public void transfer(int task, Tuple tuple) {
+ AddressedTuple val = new AddressedTuple(task, tuple);
+ if (isDebug) {
+ LOG.info("TRANSFERRING tuple {}", val);
+ }
+ batchTransferQueue.publish(val);
+ }
+
+ @VisibleForTesting
+ public DisruptorQueue getBatchTransferQueue() {
+ return this.batchTransferQueue;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ batchTransferQueue.consumeBatchWhenAvailable(this);
+ return 0L;
+ }
+
+ public String getName() {
+ return batchTransferQueue.getName();
+ }
+
+ @Override
+ public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
+ ArrayList cachedEvents = (ArrayList) cachedEmit.getObject();
+ cachedEvents.add(event);
+ if (endOfBatch) {
+ transferFn.invoke(serializer, cachedEvents);
+ cachedEmit.setObject(new ArrayList<>());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/IRunningExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/IRunningExecutor.java b/storm-core/src/jvm/org/apache/storm/executor/IRunningExecutor.java
new file mode 100644
index 0000000..43d297d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/IRunningExecutor.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ExecutorStats;
+
+import java.util.List;
+
+public interface IRunningExecutor {
+
+ ExecutorStats renderStats();
+ List<Long> getExecutorId();
+ void credenetialsChanged(Credentials credentials);
+ boolean getBackPressureFlag();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/TupleInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/TupleInfo.java b/storm-core/src/jvm/org/apache/storm/executor/TupleInfo.java
new file mode 100644
index 0000000..4b6d0fa
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/TupleInfo.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class TupleInfo implements Serializable {
+
+ private static final long serialVersionUID = -3348670497595864118L;
+
+ private int taskId;
+ private Object messageId;
+ private String stream;
+ private List<Object> values;
+ private long timestamp;
+ private String id;
+
+ public Object getMessageId() {
+ return messageId;
+ }
+
+ public void setMessageId(Object messageId) {
+ this.messageId = messageId;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getStream() {
+ return stream;
+ }
+
+ public void setStream(String stream) {
+ this.stream = stream;
+ }
+
+ public List<Object> getValues() {
+ return values;
+ }
+
+ public void setValues(List<Object> values) {
+ this.values = values;
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this,
+ ToStringStyle.SHORT_PREFIX_STYLE);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public int getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(int taskId) {
+ this.taskId = taskId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
new file mode 100644
index 0000000..2fceb28
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import clojure.lang.Atom;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class BoltExecutor extends Executor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class);
+
+ private final Callable<Boolean> executeSampler;
+
+ public BoltExecutor(Map workerData, List<Long> executorId, Map<String, String> credentials) {
+ super(workerData, executorId, credentials);
+ this.executeSampler = ConfigUtils.mkStatsSampler(stormConf);
+ }
+
+ @Override
+ public void init(Map<Integer, Task> idToTask) {
+ this.idToTask = idToTask;
+ LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet());
+ for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
+ Task taskData = entry.getValue();
+ IBolt boltObject = (IBolt) taskData.getTaskObject();
+ TopologyContext userContext = taskData.getUserContext();
+ taskData.getBuiltInMetrics().registerAll(stormConf, userContext);
+ if (boltObject instanceof ICredentialsListener) {
+ ((ICredentialsListener) boltObject).setCredentials(credentials);
+ }
+ if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
+ Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue,
+ "transfer", (DisruptorQueue) workerData.get("transfer-queue"));
+ BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext);
+
+ Map cachedNodePortToSocket = (Map) ((Atom) workerData.get("cached-node+port->socket")).deref();
+ BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, stormConf, userContext);
+ BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.get("receiver"), stormConf, userContext);
+ } else {
+ Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+ BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext);
+ }
+
+ IOutputCollector outputCollector = new BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, hasEventLoggers, isDebug);
+ boltObject.prepare(stormConf, userContext, new OutputCollector(outputCollector));
+ }
+ openOrPrepareWasCalled.set(true);
+ LOG.info("Prepared bolt {}:{}", componentId, idToTask.keySet());
+ setupMetrics();
+ }
+
+ @Override
+ public Callable<Object> call() throws Exception {
+ while (!stormActive.get()) {
+ Utils.sleep(100);
+ }
+ return new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ receiveQueue.consumeBatchWhenAvailable(BoltExecutor.this);
+ return 0L;
+ }
+ };
+ }
+
+ @Override
+ public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
+ String streamId = tuple.getSourceStreamId();
+ if (Constants.CREDENTIALS_CHANGED_STREAM_ID.equals(streamId)) {
+ Object taskObject = idToTask.get(taskId).getTaskObject();
+ if (taskObject instanceof ICredentialsListener) {
+ ((ICredentialsListener) taskObject).setCredentials((Map<String, String>) tuple.getValue(0));
+ }
+ } else if (Constants.METRICS_TICK_STREAM_ID.equals(streamId)) {
+ metricsTick(idToTask.get(taskId), tuple);
+ } else {
+ IBolt boltObject = (IBolt) idToTask.get(taskId).getTaskObject();
+ boolean isSampled = sampler.call();
+ boolean isExecuteSampler = executeSampler.call();
+ Long now = (isSampled || isExecuteSampler) ? System.currentTimeMillis() : null;
+ if (isSampled) {
+ tuple.setProcessSampleStartTime(now);
+ }
+ if (isExecuteSampler) {
+ tuple.setExecuteSampleStartTime(now);
+ }
+ boltObject.execute(tuple);
+
+ Long ms = tuple.getExecuteSampleStartTime();
+ long delta = (ms != null) ? Time.deltaMs(ms) : 0;
+ if (isDebug) {
+ LOG.info("Execute done TUPLE {} TASK: {} DELTA: {}", tuple, taskId, delta);
+ }
+ new BoltExecuteInfo(tuple, taskId, delta).applyOn(idToTask.get(taskId).getUserContext());
+ if (delta != 0) {
+ ((BoltExecutorStats) stats).boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
new file mode 100644
index 0000000..34de025
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BoltOutputCollectorImpl implements IOutputCollector {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
+
+ private final BoltExecutor executor;
+ private final Task taskData;
+ private final int taskId;
+ private final Random random;
+ private final boolean isEventLoggers;
+ private final boolean isDebug;
+
+ public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random,
+ boolean isEventLoggers, boolean isDebug) {
+ this.executor = executor;
+ this.taskData = taskData;
+ this.taskId = taskId;
+ this.random = random;
+ this.isEventLoggers = isEventLoggers;
+ this.isDebug = isDebug;
+ }
+
+ public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ return boltEmit(streamId, anchors, tuple, null);
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ boltEmit(streamId, anchors, tuple, taskId);
+ }
+
+ private List<Integer> boltEmit(String streamId, Collection<Tuple> anchors, List<Object> values, Integer targetTaskId) {
+ List<Integer> outTasks;
+ if (targetTaskId != null) {
+ outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values);
+ } else {
+ outTasks = taskData.getOutgoingTasks(streamId, values);
+ }
+
+ for (Integer t : outTasks) {
+ Map<Long, Long> anchorsToIds = new HashMap<>();
+ if (anchors != null) {
+ for (Tuple a : anchors) {
+ Set<Long> rootIds = a.getMessageId().getAnchorsToIds().keySet();
+ if (rootIds.size() > 0) {
+ long edgeId = MessageId.generateId(random);
+ ((TupleImpl) a).updateAckVal(edgeId);
+ for (Long root_id : rootIds) {
+ putXor(anchorsToIds, root_id, edgeId);
+ }
+ }
+ }
+ }
+ MessageId msgId = MessageId.makeId(anchorsToIds);
+ TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
+ executor.getExecutorTransfer().transfer(t, tupleExt);
+ }
+ if (isEventLoggers) {
+ executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random);
+ }
+ return outTasks;
+ }
+
+ @Override
+ public void ack(Tuple input) {
+ long ackValue = ((TupleImpl) input).getAckVal();
+ Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds();
+ for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
+ executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID,
+ new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
+ executor.getExecutorTransfer());
+ }
+ long delta = tupleTimeDelta((TupleImpl) input);
+ if (isDebug) {
+ LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
+ }
+ BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
+ boltAckInfo.applyOn(taskData.getUserContext());
+ if (delta != 0) {
+ ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
+ input.getSourceComponent(), input.getSourceStreamId(), delta);
+ }
+ }
+
+ @Override
+ public void fail(Tuple input) {
+ Set<Long> roots = input.getMessageId().getAnchors();
+ for (Long root : roots) {
+ executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
+ new Values(root), executor.getExecutorTransfer());
+ }
+ long delta = tupleTimeDelta((TupleImpl) input);
+ if (isDebug) {
+ LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
+ }
+ BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
+ boltFailInfo.applyOn(taskData.getUserContext());
+ if (delta != 0) {
+ ((BoltExecutorStats) executor.getStats()).boltFailedTuple(
+ input.getSourceComponent(), input.getSourceStreamId(), delta);
+ }
+ }
+
+ @Override
+ public void resetTimeout(Tuple input) {
+ Set<Long> roots = input.getMessageId().getAnchors();
+ for (Long root : roots) {
+ executor.sendUnanchored(taskData, Acker.ACKER_RESET_TIMEOUT_STREAM_ID,
+ new Values(root), executor.getExecutorTransfer());
+ }
+ }
+
+ @Override
+ public void reportError(Throwable error) {
+ executor.getReportError().report(error);
+ }
+
+ private long tupleTimeDelta(TupleImpl tuple) {
+ Long ms = tuple.getProcessSampleStartTime();
+ if (ms != null)
+ return Time.deltaMs(ms);
+ return 0;
+ }
+
+ private void putXor(Map<Long, Long> pending, Long key, Long id) {
+ Long curr = pending.get(key);
+ if (curr == null) {
+ curr = 0l;
+ }
+ pending.put(key, Utils.bitXor(curr, id));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a5e19d9b/storm-core/src/jvm/org/apache/storm/executor/error/IReportError.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/error/IReportError.java b/storm-core/src/jvm/org/apache/storm/executor/error/IReportError.java
new file mode 100644
index 0000000..73451f8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/executor/error/IReportError.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+public interface IReportError {
+ void report(Throwable error);
+}
[4/5] storm git commit: Merge branch 'STORM-1277'
Posted by ka...@apache.org.
Merge branch 'STORM-1277'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7d450f3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7d450f3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7d450f3
Branch: refs/heads/master
Commit: c7d450f3fa871ff35452ac72b011e86c34446264
Parents: 44068c4 a5e19d9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Aug 8 17:14:48 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Aug 8 17:14:48 2016 +0900
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/common.clj | 18 +-
.../clj/org/apache/storm/daemon/executor.clj | 839 -------------------
.../org/apache/storm/daemon/local_executor.clj | 42 +
.../src/clj/org/apache/storm/daemon/worker.clj | 70 +-
storm-core/src/clj/org/apache/storm/testing.clj | 16 +-
.../src/jvm/org/apache/storm/Constants.java | 24 +-
.../org/apache/storm/daemon/StormCommon.java | 175 ++--
.../src/jvm/org/apache/storm/daemon/Task.java | 76 +-
.../jvm/org/apache/storm/executor/Executor.java | 576 +++++++++++++
.../apache/storm/executor/ExecutorShutdown.java | 111 +++
.../apache/storm/executor/ExecutorTransfer.java | 87 ++
.../apache/storm/executor/IRunningExecutor.java | 31 +
.../org/apache/storm/executor/TupleInfo.java | 90 ++
.../storm/executor/bolt/BoltExecutor.java | 138 +++
.../executor/bolt/BoltOutputCollectorImpl.java | 171 ++++
.../storm/executor/error/IReportError.java | 22 +
.../storm/executor/error/ReportError.java | 76 ++
.../storm/executor/error/ReportErrorAndDie.java | 47 ++
.../storm/executor/spout/SpoutExecutor.java | 255 ++++++
.../spout/SpoutOutputCollectorImpl.java | 147 ++++
.../apache/storm/stats/BoltExecutorStats.java | 1 +
.../jvm/org/apache/storm/stats/CommonStats.java | 5 +-
.../apache/storm/stats/SpoutExecutorStats.java | 1 +
.../org/apache/storm/task/TopologyContext.java | 32 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 29 +
.../storm/utils/WorkerBackpressureThread.java | 11 +-
.../org/apache/storm/integration_test.clj | 11 +-
.../test/clj/org/apache/storm/grouping_test.clj | 2 +-
28 files changed, 2077 insertions(+), 1026 deletions(-)
----------------------------------------------------------------------