You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/14 14:07:39 UTC
[4/9] storm git commit: STORM-2306 - Messaging subsystem redesign.
New Backpressure model.
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 8610040..63ac1a5 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,12 +19,20 @@
package org.apache.storm.executor.bolt;
import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.function.BooleanSupplier;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
-
+import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.metrics.BuiltinBoltMetrics;
+import org.apache.storm.daemon.metrics.BuiltinMetrics;
+import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.daemon.Task;
import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
import org.apache.storm.daemon.worker.WorkerState;
@@ -32,54 +40,94 @@ import org.apache.storm.executor.Executor;
import org.apache.storm.hooks.info.BoltExecuteInfo;
import org.apache.storm.metric.api.IMetricsRegistrant;
import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
+import org.apache.storm.policy.WaitStrategyPark;
import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.StatsUtil;
import org.apache.storm.task.IBolt;
-import org.apache.storm.task.IOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.JCQueue.ExitCondition;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Utils;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class BoltExecutor extends Executor {
private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class);
- private final Callable<Boolean> executeSampler;
+ private final BooleanSupplier executeSampler;
+ private final boolean isSystemBoltExecutor;
+ private final IWaitStrategy consumeWaitStrategy; // employed when no incoming data
+ private final IWaitStrategy backPressureWaitStrategy; // employed when outbound path is congested
+ private BoltOutputCollectorImpl outputCollector;
+ private final BoltExecutorStats stats;
+ private final BuiltinMetrics builtInMetrics;
public BoltExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
- super(workerData, executorId, credentials);
+ super(workerData, executorId, credentials, StatsUtil.BOLT);
this.executeSampler = ConfigUtils.mkStatsSampler(topoConf);
+ this.isSystemBoltExecutor = (executorId == Constants.SYSTEM_EXECUTOR_ID);
+ if (isSystemBoltExecutor) {
+ this.consumeWaitStrategy = makeSystemBoltWaitStrategy();
+ } else {
+ this.consumeWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BOLT_WAIT_STRATEGY));
+ this.consumeWaitStrategy.prepare(topoConf, WAIT_SITUATION.BOLT_WAIT);
+ }
+ this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
+ this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
+ this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(this.getTopoConf()), ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
+ this.builtInMetrics = new BuiltinBoltMetrics(stats);
}
- public void init(Map<Integer, Task> idToTask) {
+ @Override
+ public BoltExecutorStats getStats() {
+ return stats;
+ }
+
+ private static IWaitStrategy makeSystemBoltWaitStrategy() {
+ WaitStrategyPark ws = new WaitStrategyPark();
+ HashMap conf = new HashMap<String, Object>();
+ conf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 5000);
+ ws.prepare(conf, WAIT_SITUATION.BOLT_WAIT);
+ return ws;
+ }
+
+ public void init(ArrayList<Task> idToTask, int idToTaskBase) {
+ executorTransfer.initLocalRecvQueues();
while (!stormActive.get()) {
Utils.sleep(100);
}
- this.errorReportingMetrics.registerAll(topoConf, idToTask.values().iterator().next().getUserContext());
-
- LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet());
- for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
- Task taskData = entry.getValue();
+ if (!componentId.equals(StormCommon.SYSTEM_STREAM_ID)) { // System bolt doesn't call reportError()
+ this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
+ }
+ LOG.info("Preparing bolt {}:{}", componentId, getTaskIds());
+ for (Task taskData : idToTask) {
+ if (taskData == null) {
+ //This happens if the min id is too small
+ continue;
+ }
IBolt boltObject = (IBolt) taskData.getTaskObject();
TopologyContext userContext = taskData.getUserContext();
- taskData.getBuiltInMetrics().registerAll(topoConf, userContext);
+ builtInMetrics.registerAll(topoConf, userContext);
if (boltObject instanceof ICredentialsListener) {
((ICredentialsListener) boltObject).setCredentials(credentials);
}
if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
- Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", sendQueue, "receive", receiveQueue,
- "transfer", workerData.getTransferQueue());
+ Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue, "transfer", workerData.getTransferQueue());
BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
- Map cachedNodePortToSocket = (Map) workerData.getCachedNodeToPortSocket().get();
+ Map cachedNodePortToSocket = workerData.getCachedNodeToPortSocket().get();
BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, topoConf, userContext);
BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), topoConf, userContext);
@@ -93,46 +141,92 @@ public class BoltExecutor extends Executor {
}
}
} else {
- Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", sendQueue, "receive", receiveQueue);
+ Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue);
BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
}
- IOutputCollector outputCollector = new BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, hasEventLoggers, isDebug);
+ this.outputCollector = new BoltOutputCollectorImpl(this, taskData, rand, hasEventLoggers, ackingEnabled, isDebug);
boltObject.prepare(topoConf, userContext, new OutputCollector(outputCollector));
}
openOrPrepareWasCalled.set(true);
- LOG.info("Prepared bolt {}:{}", componentId, idToTask.keySet());
+ LOG.info("Prepared bolt {}:{}", componentId, taskIds);
setupTicks(false);
setupMetrics();
}
@Override
- public Callable<Object> call() throws Exception {
- init(idToTask);
+ public Callable<Long> call() throws Exception {
+ init(idToTask, idToTaskBase);
+
+ return new Callable<Long>() {
+ private ExitCondition tillNoPendingEmits = () -> pendingEmits.isEmpty();
+ int bpIdleCount = 0;
+ int consumeIdleCounter = 0;
- return new Callable<Object>() {
@Override
- public Object call() throws Exception {
- receiveQueue.consumeBatchWhenAvailable(BoltExecutor.this);
+ public Long call() throws Exception {
+ boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
+ if (pendingEmitsIsEmpty) {
+ if (bpIdleCount != 0) {
+ LOG.debug("Ending Back Pressure Wait stretch : {}", bpIdleCount);
+ }
+ bpIdleCount = 0;
+ int consumeCount = receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits);
+ if (consumeCount == 0) {
+ if (consumeIdleCounter == 0) {
+ LOG.debug("Invoking consume wait strategy");
+ }
+ consumeIdleCounter = consumeWaitStrategy.idle(consumeIdleCounter);
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ } else {
+ if (consumeIdleCounter != 0) {
+ LOG.debug("Ending consume wait stretch : {}", consumeIdleCounter);
+ }
+ consumeIdleCounter = 0;
+ }
+ } else {
+ if (bpIdleCount == 0) { // check avoids multiple log msgs when spinning in a idle loop
+ LOG.debug("Experiencing Back Pressure. Entering BackPressure Wait. PendingEmits = {}", pendingEmits.size());
+ }
+ bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
+ }
+
return 0L;
}
+
+ // returns true if pendingEmits is empty
+ private boolean tryFlushPendingEmits() {
+ for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
+ if (executorTransfer.tryTransfer(t, null)) {
+ pendingEmits.poll();
+ } else { // to avoid reordering of emits, stop at first failure
+ return false;
+ }
+ }
+ return true;
+ }
+
};
}
@Override
public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
String streamId = tuple.getSourceStreamId();
- if (Constants.CREDENTIALS_CHANGED_STREAM_ID.equals(streamId)) {
- Object taskObject = idToTask.get(taskId).getTaskObject();
+ if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
+ outputCollector.flush();
+ } else if (Constants.METRICS_TICK_STREAM_ID.equals(streamId)) {
+ metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
+ } else if (Constants.CREDENTIALS_CHANGED_STREAM_ID.equals(streamId)) {
+ Object taskObject = idToTask.get(taskId - idToTaskBase).getTaskObject();
if (taskObject instanceof ICredentialsListener) {
((ICredentialsListener) taskObject).setCredentials((Map<String, String>) tuple.getValue(0));
}
- } else if (Constants.METRICS_TICK_STREAM_ID.equals(streamId)) {
- metricsTick(idToTask.get(taskId), tuple);
} else {
- IBolt boltObject = (IBolt) idToTask.get(taskId).getTaskObject();
- boolean isSampled = sampler.call();
- boolean isExecuteSampler = executeSampler.call();
+ IBolt boltObject = (IBolt) idToTask.get(taskId - idToTaskBase).getTaskObject();
+ boolean isSampled = sampler.getAsBoolean();
+ boolean isExecuteSampler = executeSampler.getAsBoolean();
Long now = (isSampled || isExecuteSampler) ? Time.currentTimeMillis() : null;
if (isSampled) {
tuple.setProcessSampleStartTime(now);
@@ -147,11 +241,14 @@ public class BoltExecutor extends Executor {
if (isDebug) {
LOG.info("Execute done TUPLE {} TASK: {} DELTA: {}", tuple, taskId, delta);
}
- new BoltExecuteInfo(tuple, taskId, delta).applyOn(idToTask.get(taskId).getUserContext());
+ TopologyContext topologyContext = idToTask.get(taskId - idToTaskBase).getUserContext();
+ if (!topologyContext.getHooks().isEmpty()) // perf critical check to avoid unnecessary allocation
+ {
+ new BoltExecuteInfo(tuple, taskId, delta).applyOn(topologyContext);
+ }
if (delta >= 0) {
- ((BoltExecutorStats) stats).boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
+ stats.boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
index 696447e..facbb75 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -6,15 +6,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.executor.bolt;
import java.util.Collection;
@@ -23,18 +24,21 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+
import org.apache.storm.daemon.Acker;
import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.ExecutorTransfer;
import org.apache.storm.hooks.info.BoltAckInfo;
import org.apache.storm.hooks.info.BoltFailInfo;
import org.apache.storm.stats.BoltExecutorStats;
import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,43 +47,60 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
private final BoltExecutor executor;
- private final Task taskData;
+ private final Task task;
private final int taskId;
private final Random random;
private final boolean isEventLoggers;
+ private final ExecutorTransfer xsfer;
+ private boolean ackingEnabled;
private final boolean isDebug;
- public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random,
- boolean isEventLoggers, boolean isDebug) {
+ public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, Random random,
+ boolean isEventLoggers, boolean ackingEnabled, boolean isDebug) {
this.executor = executor;
- this.taskData = taskData;
- this.taskId = taskId;
+ this.task = taskData;
+ this.taskId = taskData.getTaskId();
this.random = random;
this.isEventLoggers = isEventLoggers;
+ this.ackingEnabled = ackingEnabled;
this.isDebug = isDebug;
+ this.xsfer = executor.getExecutorTransfer();
}
public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
- return boltEmit(streamId, anchors, tuple, null);
+ try {
+ return boltEmit(streamId, anchors, tuple, null);
+ } catch (InterruptedException e) {
+ LOG.warn("Thread interrupted when emiting tuple.");
+ throw new RuntimeException(e);
+ }
}
@Override
public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
- boltEmit(streamId, anchors, tuple, taskId);
+ try {
+ boltEmit(streamId, anchors, tuple, taskId);
+ } catch (InterruptedException e) {
+ LOG.warn("Thread interrupted when emiting tuple.");
+ throw new RuntimeException(e);
+ }
}
- private List<Integer> boltEmit(String streamId, Collection<Tuple> anchors, List<Object> values, Integer targetTaskId) {
+ private List<Integer> boltEmit(String streamId, Collection<Tuple> anchors, List<Object> values,
+ Integer targetTaskId) throws InterruptedException {
List<Integer> outTasks;
if (targetTaskId != null) {
- outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values);
+ outTasks = task.getOutgoingTasks(targetTaskId, streamId, values);
} else {
- outTasks = taskData.getOutgoingTasks(streamId, values);
+ outTasks = task.getOutgoingTasks(streamId, values);
}
- for (Integer t : outTasks) {
- Map<Long, Long> anchorsToIds = new HashMap<>();
- if (anchors != null) {
- for (Tuple a : anchors) {
+ for (int i = 0; i < outTasks.size(); ++i) {
+ Integer t = outTasks.get(i);
+ MessageId msgId;
+ if (ackingEnabled && anchors != null) {
+ final Map<Long, Long> anchorsToIds = new HashMap<>();
+ for (Tuple a : anchors) { // perf critical path. would be nice to avoid iterator allocation here and below
Set<Long> rootIds = a.getMessageId().getAnchorsToIds().keySet();
if (rootIds.size() > 0) {
long edgeId = MessageId.generateId(random);
@@ -89,54 +110,63 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
}
}
}
+ msgId = MessageId.makeId(anchorsToIds);
+ } else {
+ msgId = MessageId.makeUnanchored();
}
- MessageId msgId = MessageId.makeId(anchorsToIds);
- TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
- executor.getExecutorTransfer().transfer(t, tupleExt);
+ TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), taskId, streamId, msgId);
+ xsfer.tryTransfer(new AddressedTuple(t, tupleExt), executor.getPendingEmits());
}
if (isEventLoggers) {
- executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random);
+ task.sendToEventLogger(executor, values, executor.getComponentId(), null, random, executor.getPendingEmits());
}
return outTasks;
}
@Override
public void ack(Tuple input) {
+ if (!ackingEnabled) {
+ return;
+ }
long ackValue = ((TupleImpl) input).getAckVal();
Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds();
for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
- executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID,
- new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
- executor.getExecutorTransfer());
+ task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID,
+ new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
+ executor.getExecutorTransfer(), executor.getPendingEmits());
}
long delta = tupleTimeDelta((TupleImpl) input);
if (isDebug) {
LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
}
- BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
- boltAckInfo.applyOn(taskData.getUserContext());
+
+ if (!task.getUserContext().getHooks().isEmpty()) {
+ BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
+ boltAckInfo.applyOn(task.getUserContext());
+ }
if (delta >= 0) {
- ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
- input.getSourceComponent(), input.getSourceStreamId(), delta);
+ executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta);
}
}
@Override
public void fail(Tuple input) {
+ if (!ackingEnabled) {
+ return;
+ }
Set<Long> roots = input.getMessageId().getAnchors();
for (Long root : roots) {
- executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
- new Values(root), executor.getExecutorTransfer());
+ task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID,
+ new Values(root), executor.getExecutorTransfer(), executor.getPendingEmits());
}
long delta = tupleTimeDelta((TupleImpl) input);
if (isDebug) {
LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
}
BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
- boltFailInfo.applyOn(taskData.getUserContext());
+ boltFailInfo.applyOn(task.getUserContext());
if (delta >= 0) {
- ((BoltExecutorStats) executor.getStats()).boltFailedTuple(
- input.getSourceComponent(), input.getSourceStreamId(), delta);
+ executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta);
}
}
@@ -144,8 +174,18 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
public void resetTimeout(Tuple input) {
Set<Long> roots = input.getMessageId().getAnchors();
for (Long root : roots) {
- executor.sendUnanchored(taskData, Acker.ACKER_RESET_TIMEOUT_STREAM_ID,
- new Values(root), executor.getExecutorTransfer());
+ task.sendUnanchored(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, new Values(root),
+ executor.getExecutorTransfer(), executor.getPendingEmits());
+ }
+ }
+
+ @Override
+ public void flush() {
+ try {
+ xsfer.flush();
+ } catch (InterruptedException e) {
+ LOG.warn("Bolt thread interrupted during flush()");
+ throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index 59139b1..5f6773c 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -18,27 +18,36 @@
package org.apache.storm.executor.spout;
import com.google.common.collect.ImmutableMap;
+
import java.util.concurrent.Callable;
+
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.ICredentialsListener;
import org.apache.storm.daemon.Acker;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetrics;
import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.BuiltinSpoutMetrics;
import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.Executor;
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.hooks.info.SpoutAckInfo;
import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.ISpoutWaitStrategy;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.DisruptorQueue;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
@@ -57,6 +66,7 @@ public class SpoutExecutor extends Executor {
private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);
private final ISpoutWaitStrategy spoutWaitStrategy;
+ private final IWaitStrategy backPressureWaitStrategy;
private Integer maxSpoutPending;
private final AtomicBoolean lastActive;
private List<ISpout> spouts;
@@ -66,33 +76,46 @@ public class SpoutExecutor extends Executor {
private final SpoutThrottlingMetrics spoutThrottlingMetrics;
private final boolean hasAckers;
private RotatingMap<Long, TupleInfo> pending;
- private final boolean backPressureEnabled;
+ SpoutOutputCollectorImpl spoutOutputCollector;
+ private final SpoutExecutorStats stats;
+ private final BuiltinMetrics builtInMetrics;
+ private long threadId = 0;
public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) {
- super(workerData, executorId, credentials);
+ super(workerData, executorId, credentials, StatsUtil.SPOUT);
this.spoutWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
this.spoutWaitStrategy.prepare(topoConf);
-
- this.backPressureEnabled = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+ this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
+ this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
this.lastActive = new AtomicBoolean(false);
this.hasAckers = StormCommon.hasAckers(topoConf);
this.emittedCount = new MutableLong(0);
this.emptyEmitStreak = new MutableLong(0);
this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+ this.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(this.getTopoConf()),ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
+ this.builtInMetrics = new BuiltinSpoutMetrics(stats);
}
- public void init(final Map<Integer, Task> idToTask) {
+ @Override
+ public SpoutExecutorStats getStats() {
+ return stats;
+ }
+
+ public void init(final ArrayList<Task> idToTask, int idToTaskBase) {
+ this.threadId = Thread.currentThread().getId();
+ executorTransfer.initLocalRecvQueues();
while (!stormActive.get()) {
Utils.sleep(100);
}
- LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+ LOG.info("Opening spout {}:{}", componentId, taskIds );
this.idToTask = idToTask;
this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
this.spouts = new ArrayList<>();
- for (Task task : idToTask.values()) {
- this.spouts.add((ISpout) task.getTaskObject());
+ for (Task task : idToTask) {
+ if(task!=null)
+ this.spouts.add((ISpout) task.getTaskObject());
}
this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() {
@Override
@@ -101,24 +124,27 @@ public class SpoutExecutor extends Executor {
if (tupleInfo.getTimestamp() != 0) {
timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
}
- failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId()), timeDelta, tupleInfo, "TIMEOUT");
+ failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId() - idToTaskBase), timeDelta, tupleInfo, "TIMEOUT");
}
});
- this.spoutThrottlingMetrics.registerAll(topoConf, idToTask.values().iterator().next().getUserContext());
- this.errorReportingMetrics.registerAll(topoConf, idToTask.values().iterator().next().getUserContext());
+ this.spoutThrottlingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
+ this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
this.outputCollectors = new ArrayList<>();
- for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
- Task taskData = entry.getValue();
+ for (int i=0; i<idToTask.size(); ++i) {
+ Task taskData = idToTask.get(i);
+ if (taskData==null) {
+ continue;
+ }
ISpout spoutObject = (ISpout) taskData.getTaskObject();
- SpoutOutputCollectorImpl spoutOutputCollector = new SpoutOutputCollectorImpl(
- spoutObject, this, taskData, entry.getKey(), emittedCount,
+ spoutOutputCollector = new SpoutOutputCollectorImpl(
+ spoutObject, this, taskData, emittedCount,
hasAckers, rand, hasEventLoggers, isDebug, pending);
SpoutOutputCollector outputCollector = new SpoutOutputCollector(spoutOutputCollector);
this.outputCollectors.add(outputCollector);
- taskData.getBuiltInMetrics().registerAll(topoConf, taskData.getUserContext());
- Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", sendQueue, "receive", receiveQueue);
+ builtInMetrics.registerAll(topoConf, taskData.getUserContext());
+ Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue);
BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, taskData.getUserContext());
if (spoutObject instanceof ICredentialsListener) {
@@ -127,75 +153,152 @@ public class SpoutExecutor extends Executor {
spoutObject.open(topoConf, taskData.getUserContext(), outputCollector);
}
openOrPrepareWasCalled.set(true);
- LOG.info("Opened spout {}:{}", componentId, idToTask.keySet());
+ LOG.info("Opened spout {}:{}", componentId, taskIds);
setupTicks(true);
setupMetrics();
}
@Override
- public Callable<Object> call() throws Exception {
- init(idToTask);
-
- return new Callable<Object>() {
+ public Callable<Long> call() throws Exception {
+ init(idToTask, idToTaskBase);
+ return new Callable<Long>() {
+ int recvqCheckSkips = 0;
+ final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
+ int bpIdleCount = 0;
+ int rmspCount = 0;
@Override
- public Object call() throws Exception {
- receiveQueue.consumeBatch(SpoutExecutor.this);
-
- final long currCount = emittedCount.get();
- final boolean throttleOn = backPressureEnabled && SpoutExecutor.this.throttleOn.get();
- final boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
- final boolean isActive = stormActive.get();
- if (isActive) {
- if (!lastActive.get()) {
- lastActive.set(true);
- LOG.info("Activating spout {}:{}", componentId, idToTask.keySet());
- for (ISpout spout : spouts) {
- spout.activate();
- }
- }
- if (!sendQueue.isFull() && !throttleOn && !reachedMaxSpoutPending) {
- for (ISpout spout : spouts) {
- spout.nextTuple();
- }
+ public Long call() throws Exception {
+ int receiveCount = 0;
+ if (recvqCheckSkips++ == recvqCheckSkipCountMax) {
+ receiveCount = receiveQueue.consume(SpoutExecutor.this);
+ recvqCheckSkips = 0;
+ }
+ long currCount = emittedCount.get();
+ boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
+ boolean isActive = stormActive.get();
+
+ if (!isActive) {
+ inactiveExecute();
+ return 0L;
+ }
+
+ if (!lastActive.get()) {
+ lastActive.set(true);
+ activateSpouts();
+ }
+ boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
+ boolean noEmits = true;
+ long emptyStretch = 0;
+
+ if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
+ for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators.
+ spouts.get(j).nextTuple();
}
- } else {
- if (lastActive.get()) {
- lastActive.set(false);
- LOG.info("Deactivating spout {}:{}", componentId, idToTask.keySet());
- for (ISpout spout : spouts) {
- spout.deactivate();
- }
+ noEmits = (currCount == emittedCount.get());
+ if (noEmits) {
+ emptyEmitStreak.increment();
+ } else {
+ emptyStretch = emptyEmitStreak.get();
+ emptyEmitStreak.set(0);
}
- long start = Time.currentTimeMillis();
- Time.sleep(100);
- spoutThrottlingMetrics.skippedInactiveMs(Time.currentTimeMillis() - start);
}
- if (currCount == emittedCount.get() && isActive) {
- emptyEmitStreak.increment();
- long start = Time.currentTimeMillis();
- spoutWaitStrategy.emptyEmit(emptyEmitStreak.get());
- if (throttleOn) {
- spoutThrottlingMetrics.skippedThrottleMs(Time.currentTimeMillis() - start);
- } else if (reachedMaxSpoutPending) {
- spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
- }
+ if (reachedMaxSpoutPending) {
+ if (rmspCount == 0)
+ LOG.debug("Reached max spout pending");
+ rmspCount++;
} else {
- emptyEmitStreak.set(0);
+ if (rmspCount > 0)
+ LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount);
+ rmspCount = 0;
+ }
+
+ if (receiveCount > 1) {
+ // continue without idling
+ return 0L;
+ }
+ if ( !pendingEmits.isEmpty() ) { // then facing backpressure
+ backPressureWaitStrategy();
+ return 0L;
+ }
+ bpIdleCount = 0;
+ if (noEmits) {
+ spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);
+ return 0L;
}
return 0L;
}
+
+ private void backPressureWaitStrategy() throws InterruptedException {
+ long start = Time.currentTimeMillis();
+ if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop
+ LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");
+ }
+ bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
+ spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start);
+ }
+
+ // returns true if pendingEmits is empty
+ private boolean tryFlushPendingEmits() {
+ for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
+ if (executorTransfer.tryTransfer(t, null)) {
+ pendingEmits.poll();
+ } else { // to avoid reordering of emits, stop at first failure
+ return false;
+ }
+ }
+ return true;
+ }
};
}
+ private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) {
+ emptyEmitStreak.increment();
+ long start = Time.currentTimeMillis();
+ spoutWaitStrategy.emptyEmit(emptyEmitStreak.get());
+ if (reachedMaxSpoutPending) {
+ spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
+ } else {
+ if (emptyStretch > 0) {
+ LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);
+ }
+ }
+ }
+
+ private void activateSpouts() {
+ LOG.info("Activating spout {}:{}", componentId, taskIds);
+ for (ISpout spout : spouts) {
+ spout.activate();
+ }
+ }
+
+ private void deactivateSpouts() {
+ LOG.info("Deactivating spout {}:{}", componentId, taskIds);
+ for (ISpout spout : spouts) {
+ spout.deactivate();
+ }
+ }
+
+ private void inactiveExecute() throws InterruptedException {
+ if (lastActive.get()) {
+ lastActive.set(false);
+ deactivateSpouts();
+ }
+ long start = Time.currentTimeMillis();
+ Time.sleep(100);
+ spoutThrottlingMetrics.skippedInactiveMs(Time.currentTimeMillis() - start);
+ }
+
@Override
public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
String streamId = tuple.getSourceStreamId();
- if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+ if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
+ spoutOutputCollector.flush();
+ } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
pending.rotate();
} else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
- metricsTick(idToTask.get(taskId), tuple);
+ metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
} else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
- Object spoutObj = idToTask.get(taskId).getTaskObject();
+ Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();
if (spoutObj instanceof ICredentialsListener) {
((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
}
@@ -208,26 +311,28 @@ public class SpoutExecutor extends Executor {
} else {
Long id = (Long) tuple.getValue(0);
Long timeDeltaMs = (Long) tuple.getValue(1);
- TupleInfo tupleInfo = (TupleInfo) pending.remove(id);
+ TupleInfo tupleInfo = pending.remove(id);
if (tupleInfo != null && tupleInfo.getMessageId() != null) {
if (taskId != tupleInfo.getTaskId()) {
throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
}
- long startTimeMs = tupleInfo.getTimestamp();
Long timeDelta = null;
- if (startTimeMs != 0) {
- timeDelta = timeDeltaMs;
+ if (hasAckers) {
+ long startTimeMs = tupleInfo.getTimestamp();
+ if (startTimeMs != 0) {
+ timeDelta = timeDeltaMs;
+ }
}
if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
- ackSpoutMsg(this, idToTask.get(taskId), timeDelta, tupleInfo);
+ ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
} else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
- failSpoutMsg(this, idToTask.get(taskId), timeDelta, tupleInfo, "FAIL-STREAM");
+ failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
}
}
}
}
- public void ackSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
+ public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
try {
ISpout spout = (ISpout) taskData.getTaskObject();
int taskId = taskData.getTaskId();
@@ -235,16 +340,18 @@ public class SpoutExecutor extends Executor {
LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId());
}
spout.ack(tupleInfo.getMessageId());
- new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
- if (timeDelta != null) {
- ((SpoutExecutorStats) executor.getStats()).spoutAckedTuple(tupleInfo.getStream(), timeDelta);
+ if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary
+ new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
+ }
+ if (hasAckers && timeDelta!=null) {
+ executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta);
}
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
- public void failSpoutMsg(Executor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
+ public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
try {
ISpout spout = (ISpout) taskData.getTaskObject();
int taskId = taskData.getTaskId();
@@ -254,10 +361,21 @@ public class SpoutExecutor extends Executor {
spout.fail(tupleInfo.getMessageId());
new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
if (timeDelta != null) {
- ((SpoutExecutorStats) executor.getStats()).spoutFailedTuple(tupleInfo.getStream(), timeDelta);
+ executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta);
}
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
+
+
+ public int getSpoutRecvqCheckSkipCount() {
+ if(ackingEnabled)
+ return 0; // always check recQ if ACKing enabled
+ return ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS), 0);
+ }
+
+ public long getThreadId() {
+ return threadId;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
index 1d78d57..11acbc2 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@@ -22,19 +22,25 @@ import org.apache.storm.daemon.Task;
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+/**
+ * Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
+ */
public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
-
+ private static final Logger LOG = LoggerFactory.getLogger(SpoutOutputCollectorImpl.class);
private final SpoutExecutor executor;
private final Task taskData;
private final int taskId;
@@ -44,33 +50,57 @@ public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
private final Boolean isEventLoggers;
private final Boolean isDebug;
private final RotatingMap<Long, TupleInfo> pending;
+ private TupleInfo globalTupleInfo = new TupleInfo(); // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed).
+ private final long spoutExecutorThdId;
@SuppressWarnings("unused")
- public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData, int taskId,
+ public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData,
MutableLong emittedCount, boolean hasAckers, Random random,
Boolean isEventLoggers, Boolean isDebug, RotatingMap<Long, TupleInfo> pending) {
this.executor = executor;
this.taskData = taskData;
- this.taskId = taskId;
+ this.taskId = taskData.getTaskId();
this.emittedCount = emittedCount;
this.hasAckers = hasAckers;
this.random = random;
this.isEventLoggers = isEventLoggers;
this.isDebug = isDebug;
this.pending = pending;
+ this.spoutExecutorThdId = executor.getThreadId();
}
@Override
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
- return sendSpoutMsg(streamId, tuple, messageId, null);
+ try {
+ return sendSpoutMsg(streamId, tuple, messageId, null);
+ } catch (InterruptedException e) {
+ LOG.warn("Spout thread interrupted during emit().");
+ throw new RuntimeException(e);
+ }
}
@Override
public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
- sendSpoutMsg(streamId, tuple, messageId, taskId);
+ try {
+ sendSpoutMsg(streamId, tuple, messageId, taskId);
+ } catch (InterruptedException e) {
+ LOG.warn("Spout thread interrupted during emitDirect().");
+ throw new RuntimeException(e);
+ }
}
@Override
+ public void flush() {
+ try {
+ executor.getExecutorTransfer().flush();
+ } catch (InterruptedException e) {
+ LOG.warn("Spout thread interrupted during flush().");
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ @Override
public long getPendingCount() {
return pending.size();
}
@@ -81,7 +111,7 @@ public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
executor.getReportError().report(error);
}
- private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) {
+ private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws InterruptedException {
emittedCount.increment();
List<Integer> outTasks;
@@ -91,11 +121,14 @@ public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
outTasks = taskData.getOutgoingTasks(stream, values);
}
- List<Long> ackSeq = new ArrayList<>();
- boolean needAck = (messageId != null) && hasAckers;
+ final boolean needAck = (messageId != null) && hasAckers;
+
+ final List<Long> ackSeq = needAck ? new ArrayList<>() : null;
- long rootId = MessageId.generateId(random);
- for (Integer t : outTasks) {
+ final long rootId = needAck ? MessageId.generateId(random) : 0;
+
+ for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators.
+ Integer t = outTasks.get(i);
MessageId msgId;
if (needAck) {
long as = MessageId.generateId(random);
@@ -105,19 +138,16 @@ public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
msgId = MessageId.makeUnanchored();
}
- TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId);
- executor.getExecutorTransfer().transfer(t, tuple);
+ final TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId);
+ AddressedTuple adrTuple = new AddressedTuple(t, tuple);
+ executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits());
}
if (isEventLoggers) {
- executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), messageId, random);
+ taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits());
}
- boolean sample = false;
- try {
- sample = executor.getSampler().call();
- } catch (Exception ignored) {
- }
if (needAck) {
+ boolean sample = executor.samplerCheck();
TupleInfo info = new TupleInfo();
info.setTaskId(this.taskId);
info.setStream(stream);
@@ -131,18 +161,24 @@ public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
pending.put(rootId, info);
List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
- executor.sendUnanchored(taskData, Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer());
+ taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
} else if (messageId != null) {
- TupleInfo info = new TupleInfo();
- info.setStream(stream);
- info.setValues(values);
- info.setMessageId(messageId);
- info.setTimestamp(0);
- Long timeDelta = sample ? 0L : null;
- info.setId("0:");
- executor.ackSpoutMsg(executor, taskData, timeDelta, info);
+ // Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical
+ if (isDebug) {
+ if (spoutExecutorThdId != Thread.currentThread().getId()) {
+ throw new RuntimeException("Detected background thread emitting tuples for the spout. " +
+ "Spout Output Collector should only emit from the main spout executor thread.");
+ }
+ }
+ globalTupleInfo.clear();
+ globalTupleInfo.setStream(stream);
+ globalTupleInfo.setValues(values);
+ globalTupleInfo.setMessageId(messageId);
+ globalTupleInfo.setTimestamp(0);
+ globalTupleInfo.setId("0:");
+ Long timeDelta = 0L;
+ executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo);
}
-
return outTasks;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
index ad8014c..f2c792a 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
@@ -28,20 +28,19 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
+
public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
- private Random random;
private ArrayList<List<Integer>> choices;
private AtomicInteger current;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
- random = new Random();
choices = new ArrayList<List<Integer>>(targetTasks.size());
for (Integer i: targetTasks) {
choices.add(Arrays.asList(i));
}
- Collections.shuffle(choices, random);
current = new AtomicInteger(0);
+ Collections.shuffle(choices, new Random());
}
@Override
@@ -56,8 +55,6 @@ public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
current.set(0);
return choices.get(0);
}
- //race condition with another thread, and we lost
- // try again
- }
+ } // race condition with another thread, and we lost. try again
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
index 73a7f33..3b8cb6d 100644
--- a/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
@@ -33,7 +33,8 @@ public class BoltExecuteInfo {
}
public void applyOn(TopologyContext topologyContext) {
- for (ITaskHook hook : topologyContext.getHooks()) {
+ for (int i = 0; i < topologyContext.getHooks().size(); i++) { // perf critical loop. dont use iterators
+ ITaskHook hook = topologyContext.getHooks().get(i);
hook.boltExecute(this);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
index 5d097d7..8b33465 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
@@ -18,52 +18,67 @@
package org.apache.storm.messaging;
import org.apache.storm.grouping.Load;
+import org.apache.storm.messaging.netty.BackPressureStatus;
+
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
+import java.util.function.Supplier;
public interface IConnection {
+
/**
* Register a callback to be notified when data is ready to be processed.
* @param cb the callback to process the messages.
*/
- public void registerRecv(IConnectionCallback cb);
+ void registerRecv(IConnectionCallback cb);
+
+ /**
+ * Register a response generator to be used to send an initial response when a new client connects.
+ * @param cb the callback to process the connection.
+ */
+ void registerNewConnectionResponse(Supplier<Object> cb);
/**
* Send load metrics to all downstream connections.
* @param taskToLoad a map from the task id to the load for that task.
*/
- public void sendLoadMetrics(Map<Integer, Double> taskToLoad);
-
+ void sendLoadMetrics(Map<Integer, Double> taskToLoad);
+
+ /**
+ * Sends the back pressure metrics to all downstream connections.
+ */
+ void sendBackPressureStatus(BackPressureStatus bpStatus);
+
/**
* send a message with taskId and payload
* @param taskId task ID
* @param payload
*/
- public void send(int taskId, byte[] payload);
+ void send(int taskId, byte[] payload);
/**
* send batch messages
* @param msgs
*/
- public void send(Iterator<TaskMessage> msgs);
+ void send(Iterator<TaskMessage> msgs);
/**
* Get the current load for the given tasks
* @param tasks the tasks to look for.
* @return a Load for each of the tasks it knows about.
*/
- public Map<Integer, Load> getLoad(Collection<Integer> tasks);
+ Map<Integer, Load> getLoad(Collection<Integer> tasks);
/**
* Get the port for this connection
* @return The port this connection is using
*/
- public int getPort();
+ int getPort();
/**
* close this connection
*/
- public void close();
+ void close();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index c5c2261..00b7546 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -18,6 +18,7 @@
package org.apache.storm.messaging;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* This interface needs to be implemented for messaging plugin.
@@ -33,12 +34,12 @@ public interface IContext {
* This method is invoked at the startup of messaging plugin
* @param topoConf storm configuration
*/
- public void prepare(Map<String, Object> topoConf);
+ void prepare(Map<String, Object> topoConf);
/**
* This method is invoked when a worker is unloading a messaging plugin
*/
- public void term();
+ void term();
/**
* This method establishes a server side connection
@@ -46,14 +47,15 @@ public interface IContext {
* @param port port #
* @return server side connection
*/
- public IConnection bind(String storm_id, int port);
+ IConnection bind(String storm_id, int port);
/**
* This method establish a client side connection to a remote server
* @param storm_id topology ID
* @param host remote host
* @param port remote port
+ * @param remoteBpStatus array of booleans reflecting Back Pressure status of remote tasks.
* @return client side connection
*/
- public IConnection connect(String storm_id, String host, int port);
+ IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
index 23e934a..0babd5f 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.messaging.local;
+import org.apache.storm.messaging.netty.BackPressureStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +34,9 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
import org.apache.storm.grouping.Load;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.TaskMessage;
@@ -57,6 +61,11 @@ public class Context implements IContext {
}
@Override
+ public void registerNewConnectionResponse(Supplier<Object> cb) {
+ return;
+ }
+
+ @Override
public void send(int taskId, byte[] payload) {
throw new IllegalArgumentException("SHOULD NOT HAPPEN");
}
@@ -84,6 +93,11 @@ public class Context implements IContext {
}
@Override
+ public void sendBackPressureStatus(BackPressureStatus bpStatus) {
+ throw new RuntimeException("Local Server connection should not send BackPressure status");
+ }
+
+ @Override
public int getPort() {
return port;
}
@@ -130,7 +144,12 @@ public class Context implements IContext {
public void registerRecv(IConnectionCallback cb) {
throw new IllegalArgumentException("SHOULD NOT HAPPEN");
}
-
+
+ @Override
+ public void registerNewConnectionResponse(Supplier<Object> cb) {
+ throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+ }
+
private void flushPending(){
IConnectionCallback serverCb = _server._cb;
if (serverCb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
@@ -180,6 +199,11 @@ public class Context implements IContext {
}
@Override
+ public void sendBackPressureStatus(BackPressureStatus bpStatus) {
+ throw new RuntimeException("Local Client connection should not send BackPressure status");
+ }
+
+ @Override
public int getPort() {
return _server.getPort();
}
@@ -221,7 +245,7 @@ public class Context implements IContext {
}
@Override
- public IConnection connect(String storm_id, String host, int port) {
+ public IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
return new LocalClient(getLocalServer(storm_id, port));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
new file mode 100644
index 0000000..0996ff0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus {
+ public static final short IDENTIFIER = (short)-600;
+ private static final int SIZE_OF_ID = 2; // size if IDENTIFIER
+ private static final int SIZE_OF_INT = 4;
+
+ private static AtomicLong bpCount = new AtomicLong(0);
+
+ public String workerId;
+ public final long id; // monotonically increasing id for correlating sent/recvd msgs. ok if restarts from 0 on crash.
+ public Collection<Integer> bpTasks; // task Ids experiencing BP. can be null
+ public Collection<Integer> nonBpTasks; // task Ids no longer experiencing BP. can be null
+
+ public BackPressureStatus() {
+ this.id = bpCount.incrementAndGet();
+ }
+ public BackPressureStatus(String workerId, Collection<Integer> bpTasks, Collection<Integer> nonBpTasks) {
+ this.workerId = workerId;
+ this.id = bpCount.incrementAndGet();
+ this.bpTasks = bpTasks;
+ this.nonBpTasks = nonBpTasks;
+ }
+
+ @Override
+ public String toString() {
+ return "{worker=" + workerId + ", bpStatusId=" + id + ", bpTasks=" + bpTasks + ", nonBpTasks=" + nonBpTasks + '}';
+ }
+
+ /** Encoded as
+ * -600 ... short(2)
+ * len ... int(4)
+ * payload ... byte[] *
+ */
+ public ChannelBuffer buffer(KryoValuesSerializer ser) throws IOException {
+ byte[] serializedBytes = ser.serializeObject(this);
+ ChannelBuffer buff = ChannelBuffers.buffer(SIZE_OF_ID + SIZE_OF_INT + serializedBytes.length);
+ buff.writeShort(IDENTIFIER);
+ buff.writeInt(serializedBytes.length);
+ buff.writeBytes(serializedBytes);
+ return buff;
+ }
+
+ public static BackPressureStatus read(byte[] bytes, KryoValuesDeserializer deserializer) {
+ return (BackPressureStatus) deserializer.deserializeObject(bytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index fcaf4e5..ac92c6b 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -17,25 +17,19 @@
*/
package org.apache.storm.messaging.netty;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Iterator;
-import java.util.Collection;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Timer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.lang.InterruptedException;
-
import org.apache.storm.Config;
import org.apache.storm.grouping.Load;
import org.apache.storm.messaging.ConnectionWithStatus;
-import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.messaging.IConnectionCallback;
+import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
+import org.apache.storm.policy.WaitStrategyProgressive;
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.StormBoundedExponentialBackoffRetry;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
@@ -48,11 +42,22 @@ import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Timer;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkState;
@@ -77,6 +82,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
private static final long NO_DELAY_MS = 0L;
private static final Timer timer = new Timer("Netty-ChannelAlive-Timer", true);
+ KryoValuesSerializer ser;
+ KryoValuesDeserializer deser;
+
private final Map<String, Object> topoConf;
private final StormBoundedExponentialBackoffRetry retryPolicy;
private final ClientBootstrap bootstrap;
@@ -140,18 +148,22 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
private final MessageBuffer batcher;
- private final Object writeLock = new Object();
+ // wait strategy when the netty channel is not writable
+ private final IWaitStrategy waitStrategy;
@SuppressWarnings("rawtypes")
- Client(Map<String, Object> topoConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port, Context context) {
+ Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port, Context context) {
this.topoConf = topoConf;
closing = false;
this.scheduler = scheduler;
this.context = context;
int bufferSize = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+ int lowWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK));
+ int highWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK));
// if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false
saslChannelReady.set(!ObjectReader.getBoolean(topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
- LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize);
+ LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize: {}, lowWatermark: {}, highWatermark: {}",
+ host, port, bufferSize, lowWatermark, highWatermark);
int messageBatchSize = ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
int maxReconnectionAttempts = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
@@ -160,13 +172,22 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
// Initiate connection to remote destination
- bootstrap = createClientBootstrap(factory, bufferSize, topoConf);
+ bootstrap = createClientBootstrap(factory, bufferSize, lowWatermark, highWatermark, topoConf, remoteBpStatus);
dstHost = host;
dstAddress = new InetSocketAddress(host, port);
dstAddressPrefixedName = prefixedName(dstAddress);
launchChannelAliveThread();
scheduleConnect(NO_DELAY_MS);
batcher = new MessageBuffer(messageBatchSize);
+ String clazz = (String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY);
+ if (clazz == null) {
+ waitStrategy = new WaitStrategyProgressive();
+ } else {
+ waitStrategy = ReflectionUtils.newInstance(clazz);
+ }
+ waitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
+ ser = new KryoValuesSerializer(topoConf);
+ deser = new KryoValuesDeserializer(topoConf);
}
/**
@@ -194,12 +215,17 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
}, 0, CHANNEL_ALIVE_INTERVAL_MS);
}
- private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize, Map<String, Object> topoConf) {
+ private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize,
+ int lowWatermark, int highWatermark,
+ Map<String, Object> topoConf,
+ AtomicBoolean[] remoteBpStatus) {
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("sendBufferSize", bufferSize);
bootstrap.setOption("keepAlive", true);
- bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, topoConf));
+ bootstrap.setOption("writeBufferLowWaterMark", lowWatermark);
+ bootstrap.setOption("writeBufferHighWaterMark", highWatermark);
+ bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, remoteBpStatus, topoConf));
return bootstrap;
}
@@ -262,11 +288,21 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
}
@Override
+ public void registerNewConnectionResponse(Supplier<Object> cb) {
+ throw new UnsupportedOperationException("Client does not accept new connections");
+ }
+
+ @Override
public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
throw new RuntimeException("Client connection should not send load metrics");
}
@Override
+ public void sendBackPressureStatus(BackPressureStatus bpStatus) {
+ throw new RuntimeException("Client connection should not send BackPressure status");
+ }
+
+ @Override
public void send(int taskId, byte[] payload) {
TaskMessage msg = new TaskMessage(taskId, payload);
List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
@@ -281,7 +317,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
public void send(Iterator<TaskMessage> msgs) {
if (closing) {
int numMessages = iteratorSize(msgs);
- LOG.error("discarding {} messages because the Netty client to {} is being closed", numMessages,
+ LOG.error("Dropping {} messages because the Netty client to {} is being closed", numMessages,
dstAddressPrefixedName);
return;
}
@@ -302,31 +338,39 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
dropMessages(msgs);
return;
}
-
- synchronized (writeLock) {
+ try {
while (msgs.hasNext()) {
TaskMessage message = msgs.next();
- MessageBatch full = batcher.add(message);
- if(full != null){
- flushMessages(channel, full);
+ MessageBatch batch = batcher.add(message);
+ if (batch != null) {
+ writeMessage(channel, batch);
}
}
+ MessageBatch batch = batcher.drain();
+ if (batch != null) {
+ writeMessage(channel, batch);
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception when sending message to remote worker.", e);
+ dropMessages(msgs);
}
+ }
- if(channel.isWritable()){
- synchronized (writeLock) {
- // Netty's internal buffer is not full and we still have message left in the buffer.
- // We should write the unfilled MessageBatch immediately to reduce latency
- MessageBatch batch = batcher.drain();
- if(batch != null) {
- flushMessages(channel, batch);
+ private void writeMessage(Channel channel, MessageBatch batch) throws IOException {
+ try {
+ int idleCounter = 0;
+ while (!channel.isWritable()) {
+ if (idleCounter == 0) { // check avoids multiple log msgs when in a idle loop
+ LOG.debug("Experiencing Back Pressure from Netty. Entering BackPressure Wait");
+ }
+ if (!channel.isConnected()) {
+ throw new IOException("Connection disconnected");
}
+ idleCounter = waitStrategy.idle(idleCounter);
}
- } else {
- // Channel's buffer is full, meaning that we have time to wait other messages to arrive, and create a bigger
- // batch. This yields better throughput.
- // We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer
- // because we know `Channel.isWritable` was false after the messages were already in the buffer.
+ flushMessages(channel, batch);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
}
@@ -357,6 +401,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
// We consume the iterator by traversing and thus "emptying" it.
int msgCount = iteratorSize(msgs);
messagesLost.getAndAdd(msgCount);
+ LOG.info("Dropping {} messages", msgCount);
}
private int iteratorSize(Iterator<TaskMessage> msgs) {
@@ -548,13 +593,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
* @param channel
*/
public void notifyInterestChanged(Channel channel) {
- if(channel.isWritable()){
- synchronized (writeLock) {
- // Channel is writable again, write if there are any messages pending
- MessageBatch pending = batcher.drain();
- flushMessages(channel, pending);
- }
- }
+ // NOOP since we are checking channel.isWritable in writeMessage
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
index 486cd03..5a169de 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.Config;
import org.apache.storm.messaging.IConnection;
@@ -34,7 +35,6 @@ public class Context implements IContext {
private Map<String, Object> topoConf;
private Map<String, IConnection> connections;
private NioClientSocketChannelFactory clientChannelFactory;
-
private HashedWheelTimer clientScheduleService;
/**
@@ -72,13 +72,13 @@ public class Context implements IContext {
/**
* establish a connection to a remote server
*/
- public synchronized IConnection connect(String storm_id, String host, int port) {
+ public synchronized IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
IConnection connection = connections.get(key(host,port));
if(connection !=null)
{
return connection;
}
- IConnection client = new Client(topoConf, clientChannelFactory,
+ IConnection client = new Client(topoConf,remoteBpStatus, clientChannelFactory,
clientScheduleService, host, port, this);
connections.put(key(host, client.getPort()), client);
return client;
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
index 9030424..87c16af 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
@@ -21,20 +21,28 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.serialization.KryoValuesDeserializer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
-public class MessageDecoder extends FrameDecoder {
+public class MessageDecoder extends FrameDecoder {
+
+ private KryoValuesDeserializer deser;
+
+ public MessageDecoder(KryoValuesDeserializer deser) {
+ this.deser = deser;
+ }
+
/*
- * Each ControlMessage is encoded as:
- * code (<0) ... short(2)
- * Each TaskMessage is encoded as:
- * task (>=0) ... short(2)
- * len ... int(4)
- * payload ... byte[] *
- */
+ * Each ControlMessage is encoded as:
+ * code (<0) ... short(2)
+ * Each TaskMessage is encoded as:
+ * task (>=0) ... short(2)
+ * len ... int(4)
+ * payload ... byte[] *
+ */
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
// Make sure that we have received at least a short
long available = buf.readableBytes();
@@ -99,7 +107,24 @@ public class MessageDecoder extends FrameDecoder {
return new SaslMessageToken(payload.array());
}
- // case 3: task Message
+ // case 3: BackPressureStatus
+ if (code == BackPressureStatus.IDENTIFIER) {
+ available = buf.readableBytes();
+ if(available < 4)
+ return null;
+ int dataLen = buf.readInt();
+ if (available < 4 + dataLen) {
+ // need more data
+ buf.resetReaderIndex();
+ return null;
+ }
+ byte[] bytes = new byte[dataLen];
+ buf.readBytes(bytes);
+ return BackPressureStatus.read(bytes, deser);
+
+ }
+
+ // case 4: task Message
// Make sure that we have received at least an integer (length)
if (available < 4) {
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
index 0e9fc98..b714c48 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
@@ -17,11 +17,19 @@
*/
package org.apache.storm.messaging.netty;
+import org.apache.storm.serialization.KryoValuesSerializer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
-public class MessageEncoder extends OneToOneEncoder {
+public class MessageEncoder extends OneToOneEncoder {
+
+ private KryoValuesSerializer ser;
+
+ public MessageEncoder(KryoValuesSerializer ser) {
+ this.ser = ser;
+ }
+
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
if (obj instanceof ControlMessage) {
@@ -30,8 +38,12 @@ public class MessageEncoder extends OneToOneEncoder {
if (obj instanceof MessageBatch) {
return ((MessageBatch)obj).buffer();
- }
-
+ }
+
+ if (obj instanceof BackPressureStatus) {
+ return ((BackPressureStatus)obj).buffer(ser);
+ }
+
if (obj instanceof SaslMessageToken) {
return ((SaslMessageToken)obj).buffer();
}