You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/03/14 20:31:29 UTC
[2/4] storm git commit: Remove auto acking/anchoring for bolts in a
stateful topology
Remove auto acking/anchoring for bolts in a stateful topology
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2b2a98f1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2b2a98f1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2b2a98f1
Branch: refs/heads/master
Commit: 2b2a98f14383f7d50b359964fd89e6ea31a9a673
Parents: c0bce3e
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Mar 9 16:45:17 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Wed Mar 9 22:52:09 2016 +0530
----------------------------------------------------------------------
.../src/jvm/storm/starter/StatefulTopology.java | 1 +
.../topology/CheckpointTupleForwarder.java | 21 ++++-----
.../apache/storm/topology/IStatefulBolt.java | 7 ++-
.../storm/topology/StatefulBoltExecutor.java | 46 ++++++++++++++++----
.../apache/storm/topology/TopologyBuilder.java | 5 ++-
.../topology/StatefulBoltExecutorTest.java | 1 +
6 files changed, 58 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2b2a98f1/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
index d09ceea..ba513dd 100644
--- a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
@@ -90,6 +90,7 @@ public class StatefulTopology {
LOG.debug("{} sum = {}", name, sum);
kvState.put("sum", sum);
collector.emit(input, new Values(sum));
+ collector.ack(input);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/2b2a98f1/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
index cbb3215..11d0384 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
@@ -51,7 +51,7 @@ public class CheckpointTupleForwarder implements IRichBolt {
private final Map<TransactionRequest, Integer> transactionRequestCount;
private int checkPointInputTaskCount;
private long lastTxid = Long.MIN_VALUE;
- protected AnchoringOutputCollector collector;
+ private AnchoringOutputCollector collector;
public CheckpointTupleForwarder(IRichBolt bolt) {
this.bolt = bolt;
@@ -60,9 +60,13 @@ public class CheckpointTupleForwarder implements IRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = new AnchoringOutputCollector(collector);
+ init(context, collector);
bolt.prepare(stormConf, context, this.collector);
- checkPointInputTaskCount = getCheckpointInputTaskCount(context);
+ }
+
+ protected void init(TopologyContext context, OutputCollector collector) {
+ this.collector = new AnchoringOutputCollector(collector);
+ this.checkPointInputTaskCount = getCheckpointInputTaskCount(context);
}
@Override
@@ -114,7 +118,6 @@ public class CheckpointTupleForwarder implements IRichBolt {
* @param input the input tuple
*/
protected void handleTuple(Tuple input) {
- collector.setContext(input);
bolt.execute(input);
}
@@ -224,24 +227,18 @@ public class CheckpointTupleForwarder implements IRichBolt {
protected static class AnchoringOutputCollector extends OutputCollector {
- private Tuple inputTuple;
-
AnchoringOutputCollector(IOutputCollector delegate) {
super(delegate);
}
- void setContext(Tuple inputTuple) {
- this.inputTuple = inputTuple;
- }
-
@Override
public List<Integer> emit(String streamId, List<Object> tuple) {
- return emit(streamId, inputTuple, tuple);
+ throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
}
@Override
public void emitDirect(int taskId, String streamId, List<Object> tuple) {
- emitDirect(taskId, streamId, inputTuple, tuple);
+ throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b2a98f1/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
index 1c2c5fc..ed55e1d 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
@@ -20,7 +20,12 @@ package org.apache.storm.topology;
import org.apache.storm.state.State;
/**
- * A bolt abstraction for supporting stateful computation.
+ * A bolt abstraction for supporting stateful computation. The state of the bolt is
+ * periodically checkpointed.
+ *
+ * <p>The framework provides at-least once guarantee for the
+ * state updates. The stateful bolts are expected to anchor the tuples while emitting
+ * and ack the input tuples once its processed.</p>
*/
public interface IStatefulBolt<T extends State> extends IStatefulComponent<T>, IRichBolt {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b2a98f1/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
index c9c36ee..237305e 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
@@ -28,8 +28,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import static org.apache.storm.spout.CheckPointState.Action;
import static org.apache.storm.spout.CheckPointState.Action.COMMIT;
@@ -47,7 +51,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
private boolean boltInitialized = false;
private List<Tuple> pendingTuples = new ArrayList<>();
private List<Tuple> preparedTuples = new ArrayList<>();
- private List<Tuple> executedTuples = new ArrayList<>();
+ private AckTrackingOutputCollector collector;
public StatefulBoltExecutor(IStatefulBolt<T> bolt) {
super(bolt);
@@ -63,7 +67,9 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
// package access for unit tests
void prepare(Map stormConf, TopologyContext context, OutputCollector collector, State state) {
- super.prepare(stormConf, context, collector);
+ init(context, collector);
+ this.collector = new AckTrackingOutputCollector(collector);
+ bolt.prepare(stormConf, context, this.collector);
this.state = state;
}
@@ -74,8 +80,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
if (boltInitialized) {
bolt.prePrepare(txid);
state.prepareCommit(txid);
- preparedTuples.addAll(executedTuples);
- executedTuples.clear();
+ preparedTuples.addAll(collector.ackedTuples());
} else {
/*
* May be the task restarted in the middle and the state needs be initialized.
@@ -93,7 +98,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
bolt.preRollback();
state.rollback();
fail(preparedTuples);
- fail(executedTuples);
+ fail(collector.ackedTuples());
} else if (action == INITSTATE) {
if (!boltInitialized) {
bolt.initState((T) state);
@@ -109,7 +114,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
}
}
collector.emit(CheckpointSpout.CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
- collector.ack(checkpointTuple);
+ collector.delegate.ack(checkpointTuple);
}
@Override
@@ -123,16 +128,14 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
}
private void doExecute(Tuple tuple) {
- collector.setContext(tuple);
bolt.execute(tuple);
- executedTuples.add(tuple);
}
private void ack(List<Tuple> tuples) {
if (!tuples.isEmpty()) {
LOG.debug("Acking {} tuples", tuples.size());
for (Tuple tuple : tuples) {
- collector.ack(tuple);
+ collector.delegate.ack(tuple);
}
tuples.clear();
}
@@ -148,4 +151,29 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
}
}
+ private static class AckTrackingOutputCollector extends AnchoringOutputCollector {
+ private final OutputCollector delegate;
+ private final Queue<Tuple> ackedTuples;
+
+ AckTrackingOutputCollector(OutputCollector delegate) {
+ super(delegate);
+ this.delegate = delegate;
+ this.ackedTuples = new ConcurrentLinkedQueue<>();
+ }
+
+ List<Tuple> ackedTuples() {
+ List<Tuple> result = new ArrayList<>();
+ Iterator<Tuple> it = ackedTuples.iterator();
+ while(it.hasNext()) {
+ result.add(it.next());
+ it.remove();
+ }
+ return result;
+ }
+
+ @Override
+ public void ack(Tuple input) {
+ ackedTuples.add(input);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2b2a98f1/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index af41553..5b7d499 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -234,7 +234,10 @@ public class TopologyBuilder {
* state (of computation) to be saved. When this bolt is initialized, the {@link IStatefulBolt#initState(State)} method
* is invoked after {@link IStatefulBolt#prepare(Map, TopologyContext, OutputCollector)} but before {@link IStatefulBolt#execute(Tuple)}
* with its previously saved state.
- *
+ * <p>
+ * The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology
+ * are expected to anchor the tuples while emitting and ack the input tuples once its processed.
+ * </p>
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the stateful bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
http://git-wip-us.apache.org/repos/asf/storm/blob/2b2a98f1/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java b/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
index 69c541b..6606491 100644
--- a/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
+++ b/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
@@ -170,6 +170,7 @@ public class StatefulBoltExecutorTest {
Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(COMMIT);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
executor.execute(mockCheckpointTuple);
+ mockOutputCollector.ack(mockTuple);
Mockito.verify(mockState, Mockito.times(1)).commit(new Long(100));
Mockito.verify(mockBolt, Mockito.times(2)).execute(mockTuple);
Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockTuple);