You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/04/20 10:23:44 UTC
[1/4] storm git commit: [STORM-1714] StatefulBolts ends up as normal
bolts while using TopologyBuilder.setBolt without parallelism
Repository: storm
Updated Branches:
refs/heads/1.x-branch a7370a62f -> 3399a4a46
[STORM-1714] StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3b879d62
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3b879d62
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3b879d62
Branch: refs/heads/1.x-branch
Commit: 3b879d62d68ac75310d12784d8d1ad4213fdd38e
Parents: a7370a6
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Fri Apr 15 17:36:07 2016 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 20 17:18:12 2016 +0900
----------------------------------------------------------------------
.../topology/CheckpointTupleForwarder.java | 8 ++++
.../apache/storm/topology/IStatefulBolt.java | 19 +++++++-
.../storm/topology/StatefulBoltExecutor.java | 20 ++++++--
.../apache/storm/topology/TopologyBuilder.java | 48 ++++++++++++++++++++
4 files changed, 91 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
index 11d0384..9d21c33 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
@@ -53,6 +53,10 @@ public class CheckpointTupleForwarder implements IRichBolt {
private long lastTxid = Long.MIN_VALUE;
private AnchoringOutputCollector collector;
+ public CheckpointTupleForwarder() {
+ this(null);
+ }
+
public CheckpointTupleForwarder(IRichBolt bolt) {
this.bolt = bolt;
transactionRequestCount = new HashMap<>();
@@ -86,6 +90,10 @@ public class CheckpointTupleForwarder implements IRichBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
bolt.declareOutputFields(declarer);
+ declareCheckpointStream(declarer);
+ }
+
+ protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
index ed55e1d..ef6c837 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
@@ -18,6 +18,11 @@
package org.apache.storm.topology;
import org.apache.storm.state.State;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.Map;
/**
* A bolt abstraction for supporting stateful computation. The state of the bolt is
@@ -27,5 +32,17 @@ import org.apache.storm.state.State;
* state updates. The stateful bolts are expected to anchor the tuples while emitting
* and ack the input tuples once its processed.</p>
*/
-public interface IStatefulBolt<T extends State> extends IStatefulComponent<T>, IRichBolt {
+public interface IStatefulBolt<T extends State> extends IStatefulComponent<T> {
+ /**
+ * @see org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)
+ */
+ void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
+ /**
+ * @see org.apache.storm.task.IBolt#execute(Tuple)
+ */
+ void execute(Tuple input);
+ /**
+ * @see org.apache.storm.task.IBolt#cleanup()
+ */
+ void cleanup();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
index 237305e..9873084 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -40,7 +39,6 @@ import static org.apache.storm.spout.CheckPointState.Action.COMMIT;
import static org.apache.storm.spout.CheckPointState.Action.PREPARE;
import static org.apache.storm.spout.CheckPointState.Action.ROLLBACK;
import static org.apache.storm.spout.CheckPointState.Action.INITSTATE;
-
/**
* Wraps a {@link IStatefulBolt} and manages the state of the bolt.
*/
@@ -54,7 +52,6 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
private AckTrackingOutputCollector collector;
public StatefulBoltExecutor(IStatefulBolt<T> bolt) {
- super(bolt);
this.bolt = bolt;
}
@@ -74,6 +71,23 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
}
@Override
+ public void cleanup() {
+ bolt.cleanup();
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ bolt.declareOutputFields(declarer);
+ super.declareCheckpointStream(declarer);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return bolt.getComponentConfiguration();
+ }
+
+
+ @Override
protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", checkpointTuple, action, txid);
if (action == PREPARE) {
http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 5b7d499..92cad77 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -221,6 +221,20 @@ public class TopologyBuilder {
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the windowed bolt
+ * @return use the returned object to declare the inputs to this component
+ * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
+ */
+ public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException {
+ return setBolt(id, bolt, null);
+ }
+
+ /**
+ * Define a new bolt in this topology. This defines a windowed bolt, intended
+ * for windowing operations. The {@link IWindowedBolt#execute(TupleWindow)} method
+ * is triggered for each window interval with the list of current events in the window.
+ *
+ * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
+ * @param bolt the windowed bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
@@ -240,6 +254,24 @@ public class TopologyBuilder {
* </p>
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the stateful bolt
+ * @return use the returned object to declare the inputs to this component
+ * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
+ */
+ public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt) throws IllegalArgumentException {
+ return setBolt(id, bolt, null);
+ }
+
+ /**
+ * Define a new bolt in this topology. This defines a stateful bolt, that requires its
+ * state (of computation) to be saved. When this bolt is initialized, the {@link IStatefulBolt#initState(State)} method
+ * is invoked after {@link IStatefulBolt#prepare(Map, TopologyContext, OutputCollector)} but before {@link IStatefulBolt#execute(Tuple)}
+ * with its previously saved state.
+ * <p>
+ * The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology
+ * are expected to anchor the tuples while emitting and ack the input tuples once its processed.
+ * </p>
+ * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
+ * @param bolt the stateful bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
@@ -257,6 +289,22 @@ public class TopologyBuilder {
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the stateful windowed bolt
+ * @param <T> the type of the state (e.g. {@link org.apache.storm.state.KeyValueState})
+ * @return use the returned object to declare the inputs to this component
+ * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
+ */
+ public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt) throws IllegalArgumentException {
+ return setBolt(id, bolt, null);
+ }
+
+ /**
+ * Define a new bolt in this topology. This defines a stateful windowed bolt, intended for stateful
+ * windowing operations. The {@link IStatefulWindowedBolt#execute(TupleWindow)} method is triggered
+ * for each window interval with the list of current events in the window. During initialization of
+ * this bolt {@link IStatefulWindowedBolt#initState(State)} is invoked with its previously saved state.
+ *
+ * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
+ * @param bolt the stateful windowed bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
* @param <T> the type of the state (e.g. {@link org.apache.storm.state.KeyValueState})
* @return use the returned object to declare the inputs to this component
[4/4] storm git commit: add STORM-1714 to CHANGELOG.md
Posted by ka...@apache.org.
add STORM-1714 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3399a4a4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3399a4a4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3399a4a4
Branch: refs/heads/1.x-branch
Commit: 3399a4a46bffaa708c30c6f3e8b15e666c6c6151
Parents: 0ba4bb5
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Apr 20 17:23:28 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 20 17:23:28 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3399a4a4/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index dbf2345..96c3fe3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.0.1
+ * STORM-1714: StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism
* STORM-1683: only check non-system streams by default
* STORM-1680: Provide configuration to set min fetch size in KafkaSpout
* STORM-1649: Optimize Kryo instaces creation in HBaseWindowsStore
[3/4] storm git commit: Merge branch 'STORM-1714-1.x' into 1.x-branch
Posted by ka...@apache.org.
Merge branch 'STORM-1714-1.x' into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0ba4bb51
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0ba4bb51
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0ba4bb51
Branch: refs/heads/1.x-branch
Commit: 0ba4bb5100efff705f3b62b78ad476c2591d7eb2
Parents: a7370a6 95e7afc
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Apr 20 17:23:12 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 20 17:23:12 2016 +0900
----------------------------------------------------------------------
.../topology/BaseStatefulBoltExecutor.java | 209 +++++++++++++++++++
.../topology/CheckpointTupleForwarder.java | 159 +-------------
.../apache/storm/topology/IStatefulBolt.java | 19 +-
.../storm/topology/StatefulBoltExecutor.java | 22 +-
.../apache/storm/topology/TopologyBuilder.java | 48 +++++
5 files changed, 301 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
[2/4] storm git commit: [STORM-1714] refactored common logic into
BaseStatefulBoltExecutor
Posted by ka...@apache.org.
[STORM-1714] refactored common logic into BaseStatefulBoltExecutor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/95e7afc2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/95e7afc2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/95e7afc2
Branch: refs/heads/1.x-branch
Commit: 95e7afc29cde1b7f16c1b0ab8ad65bbd9ed9a2af
Parents: 3b879d6
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Apr 18 23:17:10 2016 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 20 17:18:17 2016 +0900
----------------------------------------------------------------------
.../topology/BaseStatefulBoltExecutor.java | 209 +++++++++++++++++++
.../topology/CheckpointTupleForwarder.java | 165 +--------------
.../storm/topology/StatefulBoltExecutor.java | 4 +-
3 files changed, 218 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/95e7afc2/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java
new file mode 100644
index 0000000..b93a061
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.spout.CheckPointState;
+import org.apache.storm.spout.CheckpointSpout;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.spout.CheckPointState.Action.ROLLBACK;
+import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_ACTION;
+import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_TXID;
+import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_STREAM_ID;
+
+/**
+ * Base class that abstracts the common logic for executing bolts in a stateful topology.
+ */
+public abstract class BaseStatefulBoltExecutor implements IRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseStatefulBoltExecutor.class);
+ private final Map<TransactionRequest, Integer> transactionRequestCount;
+ private int checkPointInputTaskCount;
+ private long lastTxid = Long.MIN_VALUE;
+ protected OutputCollector collector;
+
+ public BaseStatefulBoltExecutor() {
+ transactionRequestCount = new HashMap<>();
+ }
+
+ protected void init(TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ this.checkPointInputTaskCount = getCheckpointInputTaskCount(context);
+ }
+
+ /**
+ * returns the total number of input checkpoint streams across
+ * all input tasks to this component.
+ */
+ private int getCheckpointInputTaskCount(TopologyContext context) {
+ int count = 0;
+ for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
+ if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
+ count += context.getComponentTasks(inputStream.get_componentId()).size();
+ }
+ }
+ return count;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if (CheckpointSpout.isCheckpoint(input)) {
+ processCheckpoint(input);
+ } else {
+ handleTuple(input);
+ }
+ }
+
+ /**
+ * Invokes handleCheckpoint once checkpoint tuple is received on
+ * all input checkpoint streams to this component.
+ */
+ private void processCheckpoint(Tuple input) {
+ CheckPointState.Action action = (CheckPointState.Action) input.getValueByField(CHECKPOINT_FIELD_ACTION);
+ long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
+ if (shouldProcessTransaction(action, txid)) {
+ LOG.debug("Processing action {}, txid {}", action, txid);
+ try {
+ if (txid >= lastTxid) {
+ handleCheckpoint(input, action, txid);
+ if (action == ROLLBACK) {
+ lastTxid = txid - 1;
+ } else {
+ lastTxid = txid;
+ }
+ } else {
+ LOG.debug("Ignoring old transaction. Action {}, txid {}", action, txid);
+ collector.ack(input);
+ }
+ } catch (Throwable th) {
+ LOG.error("Got error while processing checkpoint tuple", th);
+ collector.fail(input);
+ collector.reportError(th);
+ }
+ } else {
+ LOG.debug("Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, " +
+ "transactionRequestCount {}", action, txid, checkPointInputTaskCount, transactionRequestCount);
+ collector.ack(input);
+ }
+ }
+
+ /**
+ * Checks if check points have been received from all tasks across
+ * all input streams to this component
+ */
+ private boolean shouldProcessTransaction(CheckPointState.Action action, long txid) {
+ TransactionRequest request = new TransactionRequest(action, txid);
+ Integer count;
+ if ((count = transactionRequestCount.get(request)) == null) {
+ transactionRequestCount.put(request, 1);
+ count = 1;
+ } else {
+ transactionRequestCount.put(request, ++count);
+ }
+ if (count == checkPointInputTaskCount) {
+ transactionRequestCount.remove(request);
+ return true;
+ }
+ return false;
+ }
+
+ protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
+ declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
+ }
+
+ /**
+ * Sub-classes can implement the logic for handling the tuple.
+ *
+ * @param input the input tuple
+ */
+ protected abstract void handleTuple(Tuple input);
+
+ /**
+ * Sub-classes can implement the logic for handling checkpoint tuple.
+ *
+ * @param checkpointTuple the checkpoint tuple
+ * @param action the action (prepare, commit, rollback or initstate)
+ * @param txid the transaction id.
+ */
+ protected abstract void handleCheckpoint(Tuple checkpointTuple, CheckPointState.Action action, long txid);
+
+ protected static class AnchoringOutputCollector extends OutputCollector {
+ AnchoringOutputCollector(IOutputCollector delegate) {
+ super(delegate);
+ }
+
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple) {
+ throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId, List<Object> tuple) {
+ throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
+ }
+ }
+
+ private static class TransactionRequest {
+ private final CheckPointState.Action action;
+
+ private final long txid;
+
+ TransactionRequest(CheckPointState.Action action, long txid) {
+ this.action = action;
+ this.txid = txid;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TransactionRequest that = (TransactionRequest) o;
+
+ if (txid != that.txid) return false;
+ return !(action != null ? !action.equals(that.action) : that.action != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = action != null ? action.hashCode() : 0;
+ result = 31 * result + (int) (txid ^ (txid >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TransactionRequest{" +
+ "action='" + action + '\'' +
+ ", txid=" + txid +
+ '}';
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/95e7afc2/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
index 9d21c33..a510c0c 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
@@ -45,41 +45,18 @@ import static org.apache.storm.spout.CheckpointSpout.*;
* can flow through the entire topology DAG.
* </p>
*/
-public class CheckpointTupleForwarder implements IRichBolt {
+public class CheckpointTupleForwarder extends BaseStatefulBoltExecutor {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointTupleForwarder.class);
private final IRichBolt bolt;
- private final Map<TransactionRequest, Integer> transactionRequestCount;
- private int checkPointInputTaskCount;
- private long lastTxid = Long.MIN_VALUE;
- private AnchoringOutputCollector collector;
-
- public CheckpointTupleForwarder() {
- this(null);
- }
public CheckpointTupleForwarder(IRichBolt bolt) {
this.bolt = bolt;
- transactionRequestCount = new HashMap<>();
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- init(context, collector);
- bolt.prepare(stormConf, context, this.collector);
- }
-
- protected void init(TopologyContext context, OutputCollector collector) {
- this.collector = new AnchoringOutputCollector(collector);
- this.checkPointInputTaskCount = getCheckpointInputTaskCount(context);
}
@Override
- public void execute(Tuple input) {
- if (CheckpointSpout.isCheckpoint(input)) {
- processCheckpoint(input);
- } else {
- handleTuple(input);
- }
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector) {
+ init(context, new AnchoringOutputCollector(outputCollector));
+ bolt.prepare(stormConf, context, collector);
}
@Override
@@ -93,18 +70,13 @@ public class CheckpointTupleForwarder implements IRichBolt {
declareCheckpointStream(declarer);
}
- protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
- declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
- }
-
@Override
public Map<String, Object> getComponentConfiguration() {
return bolt.getComponentConfiguration();
}
/**
- * Forwards the checkpoint tuple downstream. Sub-classes can override
- * with the logic for handling checkpoint tuple.
+ * Forwards the checkpoint tuple downstream.
*
* @param checkpointTuple the checkpoint tuple
* @param action the action (prepare, commit, rollback or initstate)
@@ -116,8 +88,8 @@ public class CheckpointTupleForwarder implements IRichBolt {
}
/**
- * Hands off tuple to the wrapped bolt to execute. Sub-classes can
- * override the behavior.
+ * Hands off tuple to the wrapped bolt to execute.
+ *
* <p>
* Right now tuples continue to get forwarded while waiting for checkpoints to arrive on other streams
* after checkpoint arrives on one of the streams. This can cause duplicates but still at least once.
@@ -128,127 +100,4 @@ public class CheckpointTupleForwarder implements IRichBolt {
protected void handleTuple(Tuple input) {
bolt.execute(input);
}
-
- /**
- * Invokes handleCheckpoint once checkpoint tuple is received on
- * all input checkpoint streams to this component.
- */
- private void processCheckpoint(Tuple input) {
- Action action = (Action) input.getValueByField(CHECKPOINT_FIELD_ACTION);
- long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
- if (shouldProcessTransaction(action, txid)) {
- LOG.debug("Processing action {}, txid {}", action, txid);
- try {
- if (txid >= lastTxid) {
- handleCheckpoint(input, action, txid);
- if (action == ROLLBACK) {
- lastTxid = txid - 1;
- } else {
- lastTxid = txid;
- }
- } else {
- LOG.debug("Ignoring old transaction. Action {}, txid {}", action, txid);
- collector.ack(input);
- }
- } catch (Throwable th) {
- LOG.error("Got error while processing checkpoint tuple", th);
- collector.fail(input);
- collector.reportError(th);
- }
- } else {
- LOG.debug("Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, " +
- "transactionRequestCount {}", action, txid, checkPointInputTaskCount, transactionRequestCount);
- collector.ack(input);
- }
- }
-
- /**
- * returns the total number of input checkpoint streams across
- * all input tasks to this component.
- */
- private int getCheckpointInputTaskCount(TopologyContext context) {
- int count = 0;
- for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
- if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
- count += context.getComponentTasks(inputStream.get_componentId()).size();
- }
- }
- return count;
- }
-
- /**
- * Checks if check points have been received from all tasks across
- * all input streams to this component
- */
- private boolean shouldProcessTransaction(Action action, long txid) {
- TransactionRequest request = new TransactionRequest(action, txid);
- Integer count;
- if ((count = transactionRequestCount.get(request)) == null) {
- transactionRequestCount.put(request, 1);
- count = 1;
- } else {
- transactionRequestCount.put(request, ++count);
- }
- if (count == checkPointInputTaskCount) {
- transactionRequestCount.remove(request);
- return true;
- }
- return false;
- }
-
- private static class TransactionRequest {
- private final Action action;
- private final long txid;
-
- TransactionRequest(Action action, long txid) {
- this.action = action;
- this.txid = txid;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TransactionRequest that = (TransactionRequest) o;
-
- if (txid != that.txid) return false;
- return !(action != null ? !action.equals(that.action) : that.action != null);
-
- }
-
- @Override
- public int hashCode() {
- int result = action != null ? action.hashCode() : 0;
- result = 31 * result + (int) (txid ^ (txid >>> 32));
- return result;
- }
-
- @Override
- public String toString() {
- return "TransactionRequest{" +
- "action='" + action + '\'' +
- ", txid=" + txid +
- '}';
- }
- }
-
-
- protected static class AnchoringOutputCollector extends OutputCollector {
- AnchoringOutputCollector(IOutputCollector delegate) {
- super(delegate);
- }
-
- @Override
- public List<Integer> emit(String streamId, List<Object> tuple) {
- throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
- }
-
- @Override
- public void emitDirect(int taskId, String streamId, List<Object> tuple) {
- throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/95e7afc2/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
index 9873084..01b04ab 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
@@ -42,7 +42,7 @@ import static org.apache.storm.spout.CheckPointState.Action.INITSTATE;
/**
* Wraps a {@link IStatefulBolt} and manages the state of the bolt.
*/
-public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwarder {
+public class StatefulBoltExecutor<T extends State> extends BaseStatefulBoltExecutor {
private static final Logger LOG = LoggerFactory.getLogger(StatefulBoltExecutor.class);
private final IStatefulBolt<T> bolt;
private State state;
@@ -78,7 +78,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
bolt.declareOutputFields(declarer);
- super.declareCheckpointStream(declarer);
+ declareCheckpointStream(declarer);
}
@Override