You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/03/14 20:31:43 UTC
[1/3] storm git commit: Remove auto acking/anchoring for bolts in a
stateful topology
Repository: storm
Updated Branches:
refs/heads/1.x-branch 8922f2037 -> 80213bae7
Remove auto acking/anchoring for bolts in a stateful topology
Signed-off-by: P. Taylor Goetz <pt...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2ff57a54
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2ff57a54
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2ff57a54
Branch: refs/heads/1.x-branch
Commit: 2ff57a541f0a38a2b3d537d45e6a2c8955f23b15
Parents: dd15771
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Mar 9 16:45:17 2016 +0530
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Mar 14 15:23:38 2016 -0400
----------------------------------------------------------------------
.../src/jvm/storm/starter/StatefulTopology.java | 1 +
.../topology/CheckpointTupleForwarder.java | 21 ++++-----
.../apache/storm/topology/IStatefulBolt.java | 7 ++-
.../storm/topology/StatefulBoltExecutor.java | 46 ++++++++++++++++----
.../apache/storm/topology/TopologyBuilder.java | 5 ++-
.../topology/StatefulBoltExecutorTest.java | 1 +
6 files changed, 58 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2ff57a54/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
index d09ceea..ba513dd 100644
--- a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
@@ -90,6 +90,7 @@ public class StatefulTopology {
LOG.debug("{} sum = {}", name, sum);
kvState.put("sum", sum);
collector.emit(input, new Values(sum));
+ collector.ack(input);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/2ff57a54/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
index cbb3215..11d0384 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
@@ -51,7 +51,7 @@ public class CheckpointTupleForwarder implements IRichBolt {
private final Map<TransactionRequest, Integer> transactionRequestCount;
private int checkPointInputTaskCount;
private long lastTxid = Long.MIN_VALUE;
- protected AnchoringOutputCollector collector;
+ private AnchoringOutputCollector collector;
public CheckpointTupleForwarder(IRichBolt bolt) {
this.bolt = bolt;
@@ -60,9 +60,13 @@ public class CheckpointTupleForwarder implements IRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = new AnchoringOutputCollector(collector);
+ init(context, collector);
bolt.prepare(stormConf, context, this.collector);
- checkPointInputTaskCount = getCheckpointInputTaskCount(context);
+ }
+
+ protected void init(TopologyContext context, OutputCollector collector) {
+ this.collector = new AnchoringOutputCollector(collector);
+ this.checkPointInputTaskCount = getCheckpointInputTaskCount(context);
}
@Override
@@ -114,7 +118,6 @@ public class CheckpointTupleForwarder implements IRichBolt {
* @param input the input tuple
*/
protected void handleTuple(Tuple input) {
- collector.setContext(input);
bolt.execute(input);
}
@@ -224,24 +227,18 @@ public class CheckpointTupleForwarder implements IRichBolt {
protected static class AnchoringOutputCollector extends OutputCollector {
- private Tuple inputTuple;
-
AnchoringOutputCollector(IOutputCollector delegate) {
super(delegate);
}
- void setContext(Tuple inputTuple) {
- this.inputTuple = inputTuple;
- }
-
@Override
public List<Integer> emit(String streamId, List<Object> tuple) {
- return emit(streamId, inputTuple, tuple);
+ throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
}
@Override
public void emitDirect(int taskId, String streamId, List<Object> tuple) {
- emitDirect(taskId, streamId, inputTuple, tuple);
+ throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2ff57a54/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
index 1c2c5fc..ed55e1d 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
@@ -20,7 +20,12 @@ package org.apache.storm.topology;
import org.apache.storm.state.State;
/**
- * A bolt abstraction for supporting stateful computation.
+ * A bolt abstraction for supporting stateful computation. The state of the bolt is
+ * periodically checkpointed.
+ *
+ * <p>The framework provides at-least once guarantee for the
+ * state updates. The stateful bolts are expected to anchor the tuples while emitting
+ * and ack the input tuples once its processed.</p>
*/
public interface IStatefulBolt<T extends State> extends IStatefulComponent<T>, IRichBolt {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2ff57a54/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
index c9c36ee..237305e 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
@@ -28,8 +28,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import static org.apache.storm.spout.CheckPointState.Action;
import static org.apache.storm.spout.CheckPointState.Action.COMMIT;
@@ -47,7 +51,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
private boolean boltInitialized = false;
private List<Tuple> pendingTuples = new ArrayList<>();
private List<Tuple> preparedTuples = new ArrayList<>();
- private List<Tuple> executedTuples = new ArrayList<>();
+ private AckTrackingOutputCollector collector;
public StatefulBoltExecutor(IStatefulBolt<T> bolt) {
super(bolt);
@@ -63,7 +67,9 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
// package access for unit tests
void prepare(Map stormConf, TopologyContext context, OutputCollector collector, State state) {
- super.prepare(stormConf, context, collector);
+ init(context, collector);
+ this.collector = new AckTrackingOutputCollector(collector);
+ bolt.prepare(stormConf, context, this.collector);
this.state = state;
}
@@ -74,8 +80,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
if (boltInitialized) {
bolt.prePrepare(txid);
state.prepareCommit(txid);
- preparedTuples.addAll(executedTuples);
- executedTuples.clear();
+ preparedTuples.addAll(collector.ackedTuples());
} else {
/*
* May be the task restarted in the middle and the state needs be initialized.
@@ -93,7 +98,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
bolt.preRollback();
state.rollback();
fail(preparedTuples);
- fail(executedTuples);
+ fail(collector.ackedTuples());
} else if (action == INITSTATE) {
if (!boltInitialized) {
bolt.initState((T) state);
@@ -109,7 +114,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
}
}
collector.emit(CheckpointSpout.CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
- collector.ack(checkpointTuple);
+ collector.delegate.ack(checkpointTuple);
}
@Override
@@ -123,16 +128,14 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
}
private void doExecute(Tuple tuple) {
- collector.setContext(tuple);
bolt.execute(tuple);
- executedTuples.add(tuple);
}
private void ack(List<Tuple> tuples) {
if (!tuples.isEmpty()) {
LOG.debug("Acking {} tuples", tuples.size());
for (Tuple tuple : tuples) {
- collector.ack(tuple);
+ collector.delegate.ack(tuple);
}
tuples.clear();
}
@@ -148,4 +151,29 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
}
}
+ private static class AckTrackingOutputCollector extends AnchoringOutputCollector {
+ private final OutputCollector delegate;
+ private final Queue<Tuple> ackedTuples;
+
+ AckTrackingOutputCollector(OutputCollector delegate) {
+ super(delegate);
+ this.delegate = delegate;
+ this.ackedTuples = new ConcurrentLinkedQueue<>();
+ }
+
+ List<Tuple> ackedTuples() {
+ List<Tuple> result = new ArrayList<>();
+ Iterator<Tuple> it = ackedTuples.iterator();
+ while(it.hasNext()) {
+ result.add(it.next());
+ it.remove();
+ }
+ return result;
+ }
+
+ @Override
+ public void ack(Tuple input) {
+ ackedTuples.add(input);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2ff57a54/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index af41553..5b7d499 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -234,7 +234,10 @@ public class TopologyBuilder {
* state (of computation) to be saved. When this bolt is initialized, the {@link IStatefulBolt#initState(State)} method
* is invoked after {@link IStatefulBolt#prepare(Map, TopologyContext, OutputCollector)} but before {@link IStatefulBolt#execute(Tuple)}
* with its previously saved state.
- *
+ * <p>
+ * The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology
+ * are expected to anchor the tuples while emitting and ack the input tuples once its processed.
+ * </p>
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the stateful bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
http://git-wip-us.apache.org/repos/asf/storm/blob/2ff57a54/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java b/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
index 69c541b..6606491 100644
--- a/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
+++ b/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
@@ -170,6 +170,7 @@ public class StatefulBoltExecutorTest {
Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(COMMIT);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
executor.execute(mockCheckpointTuple);
+ mockOutputCollector.ack(mockTuple);
Mockito.verify(mockState, Mockito.times(1)).commit(new Long(100));
Mockito.verify(mockBolt, Mockito.times(2)).execute(mockTuple);
Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockTuple);
[2/3] storm git commit: Fix stateful topology acking behavior
Posted by pt...@apache.org.
Fix stateful topology acking behavior
Right now the acking is automatically taken care of for the non-stateful bolts in a stateful topology.
This leads to double acking if BaseRichBolts are part of the topology.
For the non-stateful bolts, its better to let the bolt do the acking rather than automatically acking.
Signed-off-by: P. Taylor Goetz <pt...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dd157719
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dd157719
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dd157719
Branch: refs/heads/1.x-branch
Commit: dd1577193d991489a7c46c50dcfa86db9fac5af2
Parents: 8922f20
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Mar 7 11:30:35 2016 +0530
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Mar 14 15:23:38 2016 -0400
----------------------------------------------------------------------
.../storm/starter/spout/RandomIntegerSpout.java | 15 ++++++++++++++-
.../storm/topology/CheckpointTupleForwarder.java | 1 -
2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dd157719/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java
index f6a35bf..e031f6e 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java
@@ -24,6 +24,8 @@ import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Random;
@@ -33,6 +35,7 @@ import java.util.Random;
* every 100 ms. The ts field can be used in tuple time based windowing.
*/
public class RandomIntegerSpout extends BaseRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(RandomIntegerSpout.class);
private SpoutOutputCollector collector;
private Random rand;
private long msgId = 0;
@@ -51,6 +54,16 @@ public class RandomIntegerSpout extends BaseRichSpout {
@Override
public void nextTuple() {
Utils.sleep(100);
- collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000), ++msgId));
+ collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000), ++msgId), msgId);
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ LOG.debug("Got ACK for msgId : " + msgId);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ LOG.debug("Got FAIL for msgId : " + msgId);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/dd157719/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
index 675be57..cbb3215 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
@@ -116,7 +116,6 @@ public class CheckpointTupleForwarder implements IRichBolt {
protected void handleTuple(Tuple input) {
collector.setContext(input);
bolt.execute(input);
- collector.ack(input);
}
/**
[3/3] storm git commit: add STORM-1608 to changelog
Posted by pt...@apache.org.
add STORM-1608 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/80213bae
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/80213bae
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/80213bae
Branch: refs/heads/1.x-branch
Commit: 80213bae74d2ca3c9079f26e3070d71b1256c70a
Parents: 2ff57a5
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Mar 14 15:24:34 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Mar 14 15:24:34 2016 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/80213bae/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fa32a0a..48254bd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.0.0
+ * STORM-1608: Fix stateful topology acking behavior
* STORM-1609: Netty Client is not best effort delivery on failed Connection
* STORM-1620: Update curator to fix CURATOR-209
* STORM-1469: Decommission SimpleTransportPlugin and configuration