You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/15 23:27:39 UTC

[19/27] storm git commit: Refactor to move the isTick to a utility class

Refactor to move the isTick to a utility class


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e49d91b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e49d91b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e49d91b

Branch: refs/heads/0.10.x-branch
Commit: 0e49d91be5b765eccf3e05fe9c44f53762f17ddf
Parents: 5faa782
Author: Niels Basjes <ni...@basjes.nl>
Authored: Mon Dec 15 10:56:05 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400

----------------------------------------------------------------------
 .../storm/starter/bolt/AbstractRankerBolt.java  |  3 +-
 .../storm/starter/bolt/RollingCountBolt.java    |  3 +-
 .../storm/starter/tools/MockTupleHelpers.java   |  6 ----
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |  4 +--
 .../src/jvm/backtype/storm/tuple/Tuple.java     |  5 ---
 .../src/jvm/backtype/storm/tuple/TupleImpl.java |  5 ---
 .../jvm/backtype/storm/utils/TupleUtils.java    | 35 ++++++++++++++++++++
 .../trident/topology/TridentBoltExecutor.java   |  3 +-
 8 files changed, 43 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index 83c2cfc..64ceb29 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@ -24,6 +24,7 @@ import backtype.storm.topology.base.BaseBasicBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
 import org.apache.log4j.Logger;
 import storm.starter.tools.Rankings;
 
@@ -77,7 +78,7 @@ public abstract class AbstractRankerBolt extends BaseBasicBolt {
    */
   @Override
   public final void execute(Tuple tuple, BasicOutputCollector collector) {
-    if (tuple.isTick()) {
+    if (TupleUtils.isTick(tuple)) {
       getLogger().debug("Received tick tuple, triggering emit of current rankings");
       emitRankings(collector);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
index f023c0b..31f7ee2 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
@@ -25,6 +25,7 @@ import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
 import org.apache.log4j.Logger;
 import storm.starter.tools.NthLastModifiedTimeTracker;
 import storm.starter.tools.SlidingWindowCounter;
@@ -94,7 +95,7 @@ public class RollingCountBolt extends BaseRichBolt {
 
   @Override
   public void execute(Tuple tuple) {
-    if (tuple.isTick()) {
+    if (TupleUtils.isTick(tuple)) {
       LOG.debug("Received tick tuple, triggering emit of current window counts");
       emitCurrentWindowCounts();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 9e8629c..eeaeeae 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -35,13 +35,7 @@ public final class MockTupleHelpers {
     Tuple tuple = mock(Tuple.class);
     when(tuple.getSourceComponent()).thenReturn(componentId);
     when(tuple.getSourceStreamId()).thenReturn(streamId);
-    when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
     return tuple;
   }
 
-  private static boolean isTick(String componentId, String streamId) {
-    return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
-           streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 0a1e5fe..a8c4321 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -22,6 +22,7 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
@@ -89,8 +90,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
 
     @Override
     public void execute(Tuple input) {
-        if (input.isTick()) {
-          collector.ack(input);
+        if (TupleUtils.isTick(input)) {
           return; // Do not try to send ticks to Kafka
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index 7ea93b9..a31b52b 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -54,11 +54,6 @@ public interface Tuple extends ITuple{
     public String getSourceStreamId();
 
     /**
-     * Returns if this tuple is a tick tuple or not.
-     */
-    public boolean isTick();
-
-    /**
      * Gets the message id that associated with this tuple.
      */
     public MessageId getMessageId();

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 40ad11c..7829327 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -214,11 +214,6 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
         return streamId;
     }
 
-    public boolean isTick() {
-        return Constants.SYSTEM_COMPONENT_ID.equals(this.getSourceComponent()) &&
-               Constants.SYSTEM_TICK_STREAM_ID.equals(this.getSourceStreamId());
-    }
-
     public MessageId getMessageId() {
         return id;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
new file mode 100644
index 0000000..f9fb2c0
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import backtype.storm.Constants;
+import backtype.storm.tuple.Tuple;
+
+public final class TupleUtils {
+
+  private TupleUtils() {
+    // No instantiation
+  }
+
+  public static boolean isTick(Tuple tuple) {
+    return tuple != null
+           && Constants.SYSTEM_COMPONENT_ID  .equals(tuple.getSourceComponent())
+           && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index da4c1a5..41741a1 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -34,6 +34,7 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.RotatingMap;
+import backtype.storm.utils.TupleUtils;
 import backtype.storm.utils.Utils;
 import java.io.Serializable;
 import java.util.Arrays;
@@ -299,7 +300,7 @@ public class TridentBoltExecutor implements IRichBolt {
     
     @Override
     public void execute(Tuple tuple) {
-        if(tuple.isTick()) {
+        if (TupleUtils.isTick(tuple)) {
             long now = System.currentTimeMillis();
             if(now - _lastRotate > _messageTimeoutMs) {
                 _batches.rotate();