You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/04/03 16:38:05 UTC

[01/13] storm git commit: New method in the Tuple interface "boolean isTuple()" for easier handling of TickTuples.

Repository: storm
Updated Branches:
  refs/heads/master 2aaa71809 -> fd64c3d2d


New method in the Tuple interface "boolean isTuple()" for easier handling of TickTuples.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6c6bec9b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6c6bec9b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6c6bec9b

Branch: refs/heads/master
Commit: 6c6bec9b1b2b07b5dfb5000297c11911eda5389d
Parents: 24b5eef
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 1 11:50:48 2014 +0200
Committer: Niels Basjes <nb...@bol.com>
Committed: Wed Oct 1 11:50:48 2014 +0200

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/tuple/Tuple.java     | 9 +++++++--
 storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java | 8 +++++++-
 2 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6c6bec9b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index 113b300..b3f5e56 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -153,7 +153,7 @@ public interface Tuple {
      * Gets the id of the component that created this tuple.
      */
     public String getSourceComponent();
-    
+
     /**
      * Gets the id of the task that created this tuple.
      */
@@ -163,7 +163,12 @@ public interface Tuple {
      * Gets the id of the stream that this tuple was emitted to.
      */
     public String getSourceStreamId();
-    
+
+    /**
+     * Returns if this tuple is a tick tuple or not.
+     */
+    public boolean isTick();
+
     /**
      * Gets the message id that associated with this tuple.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/6c6bec9b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 818eff1..7ff2c8c 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -17,6 +17,7 @@
  */
 package backtype.storm.tuple;
 
+import backtype.storm.Constants;
 import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.task.GeneralTopologyContext;
 import backtype.storm.utils.IndifferentAccessMap;
@@ -212,7 +213,12 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public String getSourceStreamId() {
         return streamId;
     }
-    
+
+    public boolean isTick() {
+        return this.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
+               this.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
+    }
+
     public MessageId getMessageId() {
         return id;
     }


[09/13] storm git commit: Resolve NPE that can occur if there is no SourceComponent in a Tuple

Posted by bo...@apache.org.
Resolve NPE that can occur if there is no SourceComponent in a Tuple


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4d2804aa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4d2804aa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4d2804aa

Branch: refs/heads/master
Commit: 4d2804aacb0a0abb60b5760a2f8bb3eb657ab67b
Parents: 401ebeb
Author: Niels Basjes <ni...@basjes.nl>
Authored: Thu Dec 11 12:35:01 2014 +0100
Committer: Niels Basjes <ni...@basjes.nl>
Committed: Thu Dec 11 12:35:01 2014 +0100

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4d2804aa/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 7ff2c8c..40ad11c 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -215,8 +215,8 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     }
 
     public boolean isTick() {
-        return this.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
-               this.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
+        return Constants.SYSTEM_COMPONENT_ID.equals(this.getSourceComponent()) &&
+               Constants.SYSTEM_TICK_STREAM_ID.equals(this.getSourceStreamId());
     }
 
     public MessageId getMessageId() {


[02/13] storm git commit: KafkaBolt no longer tries to map/process/send Tick Tuples to Kafka.

Posted by bo...@apache.org.
KafkaBolt no longer tries to map/process/send Tick Tuples to Kafka.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/73e54a8f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/73e54a8f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/73e54a8f

Branch: refs/heads/master
Commit: 73e54a8f8b0ef9a62955c7c1cee20925d359772b
Parents: 6c6bec9
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 1 11:51:42 2014 +0200
Committer: Niels Basjes <nb...@bol.com>
Committed: Wed Oct 1 11:51:42 2014 +0200

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/73e54a8f/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index b6c3de4..2025766 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -89,6 +89,10 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
 
     @Override
     public void execute(Tuple input) {
+        if (input.isTick()) {
+          return; // Do not try to send ticks to Kafka
+        }
+
         K key = null;
         V message = null;
         String topic = null;


[12/13] storm git commit: Merge branch 'STORM-512' of https://github.com/nielsbasjes/storm into STORM-512

Posted by bo...@apache.org.
Merge branch 'STORM-512' of https://github.com/nielsbasjes/storm into STORM-512

STORM-512: KafkaBolt doesn't handle ticks properly


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/25eb1418
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/25eb1418
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/25eb1418

Branch: refs/heads/master
Commit: 25eb1418b87840b3b07e2045877fef7a0b2f28da
Parents: 2aaa718 1ca5f76
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Apr 2 14:34:04 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Apr 2 14:34:04 2015 -0500

----------------------------------------------------------------------
 .../storm/starter/bolt/AbstractRankerBolt.java  |  4 +--
 .../storm/starter/bolt/RollingCountBolt.java    |  4 +--
 .../jvm/storm/starter/util/TupleHelpers.java    | 33 ------------------
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |  5 +++
 .../jvm/backtype/storm/utils/TupleUtils.java    | 35 ++++++++++++++++++++
 .../trident/topology/TridentBoltExecutor.java   |  3 +-
 6 files changed, 46 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/25eb1418/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------


[07/13] storm git commit: Code cleanup in mocking storm-starter tests

Posted by bo...@apache.org.
Code cleanup in mocking storm-starter tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4e7a7e10
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4e7a7e10
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4e7a7e10

Branch: refs/heads/master
Commit: 4e7a7e102723deb0f827946db28a88f7276b113e
Parents: 71bd4a4
Author: Niels Basjes <ni...@basjes.nl>
Authored: Tue Dec 9 13:56:11 2014 +0100
Committer: Niels Basjes <ni...@basjes.nl>
Committed: Tue Dec 9 13:56:11 2014 +0100

----------------------------------------------------------------------
 .../test/jvm/storm/starter/tools/MockTupleHelpers.java           | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4e7a7e10/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 3180fd3..374288e 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -29,9 +29,7 @@ public final class MockTupleHelpers {
   }
 
   public static Tuple mockTickTuple() {
-    Tuple tuple = mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
-    when(tuple.isTick()).thenReturn(true);
-    return tuple;
+    return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
   }
 
   public static Tuple mockTuple(String componentId, String streamId) {


[13/13] storm git commit: Added STORM-512 to Changelog and removed unneeded import

Posted by bo...@apache.org.
Added STORM-512 to Changelog and removed unneeded import


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd64c3d2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd64c3d2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd64c3d2

Branch: refs/heads/master
Commit: fd64c3d2dc61838c1e5c705426b8165068e12c3e
Parents: 25eb141
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Apr 2 14:57:27 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Apr 2 14:57:27 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md                                                        | 1 +
 .../src/jvm/storm/starter/bolt/RollingCountAggBolt.java             | 1 -
 2 files changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fd64c3d2/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0656691..e543a84 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,7 @@
  * STORM-703: With hash key option for RedisMapState, only get values for keys in batch
  * STORM-691: Add basic lookup / persist bolts
  * STORM-727: Storm tests should succeed even if a storm process is running locally.
+ * STORM-512: KafkaBolt doesn't handle ticks properly
 
 ## 0.10.0
  * STORM-681: Auto insert license header with genthrift.sh

http://git-wip-us.apache.org/repos/asf/storm/blob/fd64c3d2/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
index e513b09..e222a97 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
@@ -28,7 +28,6 @@ import backtype.storm.tuple.Values;
 import org.apache.log4j.Logger;
 import storm.starter.tools.NthLastModifiedTimeTracker;
 import storm.starter.tools.SlidingWindowCounter;
-import storm.starter.util.TupleHelpers;
 
 import java.util.HashMap;
 import java.util.Map;


[11/13] storm git commit: Reducing the differences

Posted by bo...@apache.org.
Reducing the differences


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1ca5f765
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1ca5f765
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1ca5f765

Branch: refs/heads/master
Commit: 1ca5f7659ea807f1ae83694788e889ac5cae6669
Parents: 59fb8de
Author: Niels Basjes <ni...@basjes.nl>
Authored: Mon Dec 15 10:58:49 2014 +0100
Committer: Niels Basjes <ni...@basjes.nl>
Committed: Mon Dec 15 10:58:49 2014 +0100

----------------------------------------------------------------------
 .../test/jvm/storm/starter/tools/MockTupleHelpers.java           | 1 -
 storm-core/src/jvm/backtype/storm/tuple/Tuple.java               | 4 ++--
 storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java           | 3 +--
 .../src/jvm/storm/trident/topology/TridentBoltExecutor.java      | 2 +-
 4 files changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1ca5f765/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index eeaeeae..b253350 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -37,5 +37,4 @@ public final class MockTupleHelpers {
     when(tuple.getSourceStreamId()).thenReturn(streamId);
     return tuple;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1ca5f765/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index c644fec..113b300 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -153,7 +153,7 @@ public interface Tuple {
      * Gets the id of the component that created this tuple.
      */
     public String getSourceComponent();
-
+    
     /**
      * Gets the id of the task that created this tuple.
      */
@@ -163,7 +163,7 @@ public interface Tuple {
      * Gets the id of the stream that this tuple was emitted to.
      */
     public String getSourceStreamId();
-
+    
     /**
      * Gets the message id that associated with this tuple.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/1ca5f765/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 7829327..818eff1 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -17,7 +17,6 @@
  */
 package backtype.storm.tuple;
 
-import backtype.storm.Constants;
 import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.task.GeneralTopologyContext;
 import backtype.storm.utils.IndifferentAccessMap;
@@ -213,7 +212,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public String getSourceStreamId() {
         return streamId;
     }
-
+    
     public MessageId getMessageId() {
         return id;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/1ca5f765/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index 41741a1..a23e555 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -300,7 +300,7 @@ public class TridentBoltExecutor implements IRichBolt {
     
     @Override
     public void execute(Tuple tuple) {
-        if (TupleUtils.isTick(tuple)) {
+        if(TupleUtils.isTick(tuple)) {
             long now = System.currentTimeMillis();
             if(now - _lastRotate > _messageTimeoutMs) {
                 _batches.rotate();


[03/13] storm git commit: Use isTick in all relevant places to avoid code duplication.

Posted by bo...@apache.org.
Use isTick in all relevant places to avoid code duplication.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8888ae63
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8888ae63
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8888ae63

Branch: refs/heads/master
Commit: 8888ae631360ad124af9c480d2f8b621b9c51727
Parents: 73e54a8
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 1 11:53:17 2014 +0200
Committer: Niels Basjes <nb...@bol.com>
Committed: Wed Oct 1 11:53:17 2014 +0200

----------------------------------------------------------------------
 storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8888ae63/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index 4dfccc6..da4c1a5 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -299,7 +299,7 @@ public class TridentBoltExecutor implements IRichBolt {
     
     @Override
     public void execute(Tuple tuple) {
-        if(tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+        if(tuple.isTick()) {
             long now = System.currentTimeMillis();
             if(now - _lastRotate > _messageTimeoutMs) {
                 _batches.rotate();


[06/13] storm git commit: Fixed the tests in storm-starter that do not use the actual TupleImpl but mock everything themselves

Posted by bo...@apache.org.
Fixed the tests in storm-starter that do not use the actual TupleImpl but mock everything themselves


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/71bd4a43
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/71bd4a43
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/71bd4a43

Branch: refs/heads/master
Commit: 71bd4a435b882120f7542fcae678a52e20600539
Parents: cd47f1d
Author: Niels Basjes <ni...@basjes.nl>
Authored: Tue Dec 9 13:53:05 2014 +0100
Committer: Niels Basjes <ni...@basjes.nl>
Committed: Tue Dec 9 13:53:05 2014 +0100

----------------------------------------------------------------------
 .../test/jvm/storm/starter/tools/MockTupleHelpers.java  | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/71bd4a43/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index b253350..3180fd3 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -19,6 +19,7 @@ package storm.starter.tools;
 
 import backtype.storm.Constants;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
 
 import static org.mockito.Mockito.*;
 
@@ -28,13 +29,22 @@ public final class MockTupleHelpers {
   }
 
   public static Tuple mockTickTuple() {
-    return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
+    Tuple tuple = mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
+    when(tuple.isTick()).thenReturn(true);
+    return tuple;
   }
 
   public static Tuple mockTuple(String componentId, String streamId) {
     Tuple tuple = mock(Tuple.class);
     when(tuple.getSourceComponent()).thenReturn(componentId);
     when(tuple.getSourceStreamId()).thenReturn(streamId);
+    when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
     return tuple;
   }
+  
+  private static boolean isTick(String componentId, String streamId) {
+    return componentId.equals(Constants.SYSTEM_COMPONENT_ID) && 
+           streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
+  }
+
 }


[05/13] storm git commit: Added missing ack for the tick

Posted by bo...@apache.org.
Added missing ack for the tick


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cd47f1d6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cd47f1d6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cd47f1d6

Branch: refs/heads/master
Commit: cd47f1d6e4165f8e3393aa5be01d02f4148c3216
Parents: 59b2a42
Author: Niels Basjes <nb...@bol.com>
Authored: Thu Oct 30 10:27:05 2014 +0100
Committer: Niels Basjes <nb...@bol.com>
Committed: Thu Oct 30 10:27:05 2014 +0100

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cd47f1d6/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 2025766..7de25db 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -90,6 +90,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     @Override
     public void execute(Tuple input) {
         if (input.isTick()) {
+          collector.ack(input);
           return; // Do not try to send ticks to Kafka
         }
 


[08/13] storm git commit: Code cleanup in mocking storm-starter tests

Posted by bo...@apache.org.
Code cleanup in mocking storm-starter tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/401ebebd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/401ebebd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/401ebebd

Branch: refs/heads/master
Commit: 401ebebdf8b201b87ff944cc3c2217cc30acd1fc
Parents: 4e7a7e1
Author: Niels Basjes <ni...@basjes.nl>
Authored: Tue Dec 9 14:00:33 2014 +0100
Committer: Niels Basjes <ni...@basjes.nl>
Committed: Tue Dec 9 14:00:33 2014 +0100

----------------------------------------------------------------------
 .../test/jvm/storm/starter/tools/MockTupleHelpers.java          | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/401ebebd/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 374288e..9e8629c 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -19,7 +19,6 @@ package storm.starter.tools;
 
 import backtype.storm.Constants;
 import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
 
 import static org.mockito.Mockito.*;
 
@@ -39,9 +38,9 @@ public final class MockTupleHelpers {
     when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
     return tuple;
   }
-  
+
   private static boolean isTick(String componentId, String streamId) {
-    return componentId.equals(Constants.SYSTEM_COMPONENT_ID) && 
+    return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
            streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
   }
 


[04/13] storm git commit: Replaced TupleHelpers in the examples with the new tuple.isTick()

Posted by bo...@apache.org.
Replaced TupleHelpers in the examples with the new tuple.isTick()


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/59b2a42a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/59b2a42a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/59b2a42a

Branch: refs/heads/master
Commit: 59b2a42af4419e9217849add60e62a0c863c1ece
Parents: 8888ae6
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 29 15:50:18 2014 +0100
Committer: Niels Basjes <nb...@bol.com>
Committed: Wed Oct 29 15:50:18 2014 +0100

----------------------------------------------------------------------
 .../storm/starter/bolt/AbstractRankerBolt.java  |  3 +-
 .../storm/starter/bolt/RollingCountBolt.java    |  3 +-
 .../jvm/storm/starter/util/TupleHelpers.java    | 33 --------------------
 3 files changed, 2 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/59b2a42a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index cc5c0e7..83c2cfc 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@ -26,7 +26,6 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import org.apache.log4j.Logger;
 import storm.starter.tools.Rankings;
-import storm.starter.util.TupleHelpers;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -78,7 +77,7 @@ public abstract class AbstractRankerBolt extends BaseBasicBolt {
    */
   @Override
   public final void execute(Tuple tuple, BasicOutputCollector collector) {
-    if (TupleHelpers.isTickTuple(tuple)) {
+    if (tuple.isTick()) {
       getLogger().debug("Received tick tuple, triggering emit of current rankings");
       emitRankings(collector);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/59b2a42a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
index f83906c..f023c0b 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
@@ -28,7 +28,6 @@ import backtype.storm.tuple.Values;
 import org.apache.log4j.Logger;
 import storm.starter.tools.NthLastModifiedTimeTracker;
 import storm.starter.tools.SlidingWindowCounter;
-import storm.starter.util.TupleHelpers;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -95,7 +94,7 @@ public class RollingCountBolt extends BaseRichBolt {
 
   @Override
   public void execute(Tuple tuple) {
-    if (TupleHelpers.isTickTuple(tuple)) {
+    if (tuple.isTick()) {
       LOG.debug("Received tick tuple, triggering emit of current window counts");
       emitCurrentWindowCounts();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/59b2a42a/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java b/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
deleted file mode 100644
index 4ea669e..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.util;
-
-import backtype.storm.Constants;
-import backtype.storm.tuple.Tuple;
-
-public final class TupleHelpers {
-
-  private TupleHelpers() {
-  }
-
-  public static boolean isTickTuple(Tuple tuple) {
-    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(
-        Constants.SYSTEM_TICK_STREAM_ID);
-  }
-
-}


[10/13] storm git commit: Refactor to move the isTick to a utility class

Posted by bo...@apache.org.
Refactor to move the isTick to a utility class


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/59fb8ded
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/59fb8ded
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/59fb8ded

Branch: refs/heads/master
Commit: 59fb8ded9ef892792879efc94b12ea46ff924c41
Parents: 4d2804a
Author: Niels Basjes <ni...@basjes.nl>
Authored: Mon Dec 15 10:56:05 2014 +0100
Committer: Niels Basjes <ni...@basjes.nl>
Committed: Mon Dec 15 10:56:05 2014 +0100

----------------------------------------------------------------------
 .../storm/starter/bolt/AbstractRankerBolt.java  |  3 +-
 .../storm/starter/bolt/RollingCountBolt.java    |  3 +-
 .../storm/starter/tools/MockTupleHelpers.java   |  6 ----
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |  4 +--
 .../src/jvm/backtype/storm/tuple/Tuple.java     |  5 ---
 .../src/jvm/backtype/storm/tuple/TupleImpl.java |  5 ---
 .../jvm/backtype/storm/utils/TupleUtils.java    | 35 ++++++++++++++++++++
 .../trident/topology/TridentBoltExecutor.java   |  3 +-
 8 files changed, 43 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index 83c2cfc..64ceb29 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@ -24,6 +24,7 @@ import backtype.storm.topology.base.BaseBasicBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
 import org.apache.log4j.Logger;
 import storm.starter.tools.Rankings;
 
@@ -77,7 +78,7 @@ public abstract class AbstractRankerBolt extends BaseBasicBolt {
    */
   @Override
   public final void execute(Tuple tuple, BasicOutputCollector collector) {
-    if (tuple.isTick()) {
+    if (TupleUtils.isTick(tuple)) {
       getLogger().debug("Received tick tuple, triggering emit of current rankings");
       emitRankings(collector);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
index f023c0b..31f7ee2 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
@@ -25,6 +25,7 @@ import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
 import org.apache.log4j.Logger;
 import storm.starter.tools.NthLastModifiedTimeTracker;
 import storm.starter.tools.SlidingWindowCounter;
@@ -94,7 +95,7 @@ public class RollingCountBolt extends BaseRichBolt {
 
   @Override
   public void execute(Tuple tuple) {
-    if (tuple.isTick()) {
+    if (TupleUtils.isTick(tuple)) {
       LOG.debug("Received tick tuple, triggering emit of current window counts");
       emitCurrentWindowCounts();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 9e8629c..eeaeeae 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -35,13 +35,7 @@ public final class MockTupleHelpers {
     Tuple tuple = mock(Tuple.class);
     when(tuple.getSourceComponent()).thenReturn(componentId);
     when(tuple.getSourceStreamId()).thenReturn(streamId);
-    when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
     return tuple;
   }
 
-  private static boolean isTick(String componentId, String streamId) {
-    return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
-           streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 7de25db..35c0da6 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -22,6 +22,7 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
@@ -89,8 +90,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
 
     @Override
     public void execute(Tuple input) {
-        if (input.isTick()) {
-          collector.ack(input);
+        if (TupleUtils.isTick(input)) {
           return; // Do not try to send ticks to Kafka
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index b3f5e56..c644fec 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -165,11 +165,6 @@ public interface Tuple {
     public String getSourceStreamId();
 
     /**
-     * Returns if this tuple is a tick tuple or not.
-     */
-    public boolean isTick();
-
-    /**
      * Gets the message id that associated with this tuple.
      */
     public MessageId getMessageId();

http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 40ad11c..7829327 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -214,11 +214,6 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
         return streamId;
     }
 
-    public boolean isTick() {
-        return Constants.SYSTEM_COMPONENT_ID.equals(this.getSourceComponent()) &&
-               Constants.SYSTEM_TICK_STREAM_ID.equals(this.getSourceStreamId());
-    }
-
     public MessageId getMessageId() {
         return id;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
new file mode 100644
index 0000000..f9fb2c0
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import backtype.storm.Constants;
+import backtype.storm.tuple.Tuple;
+
+public final class TupleUtils {
+
+  private TupleUtils() {
+    // No instantiation
+  }
+
+  public static boolean isTick(Tuple tuple) {
+    return tuple != null
+           && Constants.SYSTEM_COMPONENT_ID  .equals(tuple.getSourceComponent())
+           && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index da4c1a5..41741a1 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -34,6 +34,7 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.RotatingMap;
+import backtype.storm.utils.TupleUtils;
 import backtype.storm.utils.Utils;
 import java.io.Serializable;
 import java.util.Arrays;
@@ -299,7 +300,7 @@ public class TridentBoltExecutor implements IRichBolt {
     
     @Override
     public void execute(Tuple tuple) {
-        if(tuple.isTick()) {
+        if (TupleUtils.isTick(tuple)) {
             long now = System.currentTimeMillis();
             if(now - _lastRotate > _messageTimeoutMs) {
                 _batches.rotate();