You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/15 23:27:39 UTC
[19/27] storm git commit: Refactor to move the isTick to a utility
class
Refactor to move the isTick to a utility class
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e49d91b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e49d91b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e49d91b
Branch: refs/heads/0.10.x-branch
Commit: 0e49d91be5b765eccf3e05fe9c44f53762f17ddf
Parents: 5faa782
Author: Niels Basjes <ni...@basjes.nl>
Authored: Mon Dec 15 10:56:05 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400
----------------------------------------------------------------------
.../storm/starter/bolt/AbstractRankerBolt.java | 3 +-
.../storm/starter/bolt/RollingCountBolt.java | 3 +-
.../storm/starter/tools/MockTupleHelpers.java | 6 ----
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 4 +--
.../src/jvm/backtype/storm/tuple/Tuple.java | 5 ---
.../src/jvm/backtype/storm/tuple/TupleImpl.java | 5 ---
.../jvm/backtype/storm/utils/TupleUtils.java | 35 ++++++++++++++++++++
.../trident/topology/TridentBoltExecutor.java | 3 +-
8 files changed, 43 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index 83c2cfc..64ceb29 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@ -24,6 +24,7 @@ import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
import org.apache.log4j.Logger;
import storm.starter.tools.Rankings;
@@ -77,7 +78,7 @@ public abstract class AbstractRankerBolt extends BaseBasicBolt {
*/
@Override
public final void execute(Tuple tuple, BasicOutputCollector collector) {
- if (tuple.isTick()) {
+ if (TupleUtils.isTick(tuple)) {
getLogger().debug("Received tick tuple, triggering emit of current rankings");
emitRankings(collector);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
index f023c0b..31f7ee2 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
@@ -25,6 +25,7 @@ import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
import org.apache.log4j.Logger;
import storm.starter.tools.NthLastModifiedTimeTracker;
import storm.starter.tools.SlidingWindowCounter;
@@ -94,7 +95,7 @@ public class RollingCountBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
- if (tuple.isTick()) {
+ if (TupleUtils.isTick(tuple)) {
LOG.debug("Received tick tuple, triggering emit of current window counts");
emitCurrentWindowCounts();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 9e8629c..eeaeeae 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -35,13 +35,7 @@ public final class MockTupleHelpers {
Tuple tuple = mock(Tuple.class);
when(tuple.getSourceComponent()).thenReturn(componentId);
when(tuple.getSourceStreamId()).thenReturn(streamId);
- when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
return tuple;
}
- private static boolean isTick(String componentId, String streamId) {
- return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
- streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
- }
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 0a1e5fe..a8c4321 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -22,6 +22,7 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
@@ -89,8 +90,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
@Override
public void execute(Tuple input) {
- if (input.isTick()) {
- collector.ack(input);
+ if (TupleUtils.isTick(input)) {
return; // Do not try to send ticks to Kafka
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index 7ea93b9..a31b52b 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -54,11 +54,6 @@ public interface Tuple extends ITuple{
public String getSourceStreamId();
/**
- * Returns if this tuple is a tick tuple or not.
- */
- public boolean isTick();
-
- /**
* Gets the message id that associated with this tuple.
*/
public MessageId getMessageId();
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 40ad11c..7829327 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -214,11 +214,6 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
return streamId;
}
- public boolean isTick() {
- return Constants.SYSTEM_COMPONENT_ID.equals(this.getSourceComponent()) &&
- Constants.SYSTEM_TICK_STREAM_ID.equals(this.getSourceStreamId());
- }
-
public MessageId getMessageId() {
return id;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
new file mode 100644
index 0000000..f9fb2c0
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import backtype.storm.Constants;
+import backtype.storm.tuple.Tuple;
+
+public final class TupleUtils {
+
+ private TupleUtils() {
+ // No instantiation
+ }
+
+ public static boolean isTick(Tuple tuple) {
+ return tuple != null
+ && Constants.SYSTEM_COMPONENT_ID .equals(tuple.getSourceComponent())
+ && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index da4c1a5..41741a1 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -34,6 +34,7 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.RotatingMap;
+import backtype.storm.utils.TupleUtils;
import backtype.storm.utils.Utils;
import java.io.Serializable;
import java.util.Arrays;
@@ -299,7 +300,7 @@ public class TridentBoltExecutor implements IRichBolt {
@Override
public void execute(Tuple tuple) {
- if(tuple.isTick()) {
+ if (TupleUtils.isTick(tuple)) {
long now = System.currentTimeMillis();
if(now - _lastRotate > _messageTimeoutMs) {
_batches.rotate();