You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/04/03 16:38:05 UTC
[01/13] storm git commit: New method in the Tuple interface "boolean
isTuple()" for easier handling of TickTuples.
Repository: storm
Updated Branches:
refs/heads/master 2aaa71809 -> fd64c3d2d
New method in the Tuple interface "boolean isTuple()" for easier handling of TickTuples.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6c6bec9b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6c6bec9b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6c6bec9b
Branch: refs/heads/master
Commit: 6c6bec9b1b2b07b5dfb5000297c11911eda5389d
Parents: 24b5eef
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 1 11:50:48 2014 +0200
Committer: Niels Basjes <nb...@bol.com>
Committed: Wed Oct 1 11:50:48 2014 +0200
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/tuple/Tuple.java | 9 +++++++--
storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java | 8 +++++++-
2 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6c6bec9b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index 113b300..b3f5e56 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -153,7 +153,7 @@ public interface Tuple {
* Gets the id of the component that created this tuple.
*/
public String getSourceComponent();
-
+
/**
* Gets the id of the task that created this tuple.
*/
@@ -163,7 +163,12 @@ public interface Tuple {
* Gets the id of the stream that this tuple was emitted to.
*/
public String getSourceStreamId();
-
+
+ /**
+ * Returns if this tuple is a tick tuple or not.
+ */
+ public boolean isTick();
+
/**
* Gets the message id that associated with this tuple.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/6c6bec9b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 818eff1..7ff2c8c 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -17,6 +17,7 @@
*/
package backtype.storm.tuple;
+import backtype.storm.Constants;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.utils.IndifferentAccessMap;
@@ -212,7 +213,12 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
public String getSourceStreamId() {
return streamId;
}
-
+
+ public boolean isTick() {
+ return this.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
+ this.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
+ }
+
public MessageId getMessageId() {
return id;
}
[09/13] storm git commit: Resolve NPE that can occur if there is no
SourceComponent in a Tuple
Posted by bo...@apache.org.
Resolve NPE that can occur if there is no SourceComponent in a Tuple
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4d2804aa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4d2804aa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4d2804aa
Branch: refs/heads/master
Commit: 4d2804aacb0a0abb60b5760a2f8bb3eb657ab67b
Parents: 401ebeb
Author: Niels Basjes <ni...@basjes.nl>
Authored: Thu Dec 11 12:35:01 2014 +0100
Committer: Niels Basjes <ni...@basjes.nl>
Committed: Thu Dec 11 12:35:01 2014 +0100
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4d2804aa/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 7ff2c8c..40ad11c 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -215,8 +215,8 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
}
public boolean isTick() {
- return this.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
- this.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
+ return Constants.SYSTEM_COMPONENT_ID.equals(this.getSourceComponent()) &&
+ Constants.SYSTEM_TICK_STREAM_ID.equals(this.getSourceStreamId());
}
public MessageId getMessageId() {
[02/13] storm git commit: KafkaBolt no longer tries to
map/process/send Tick Tuples to Kafka.
Posted by bo...@apache.org.
KafkaBolt no longer tries to map/process/send Tick Tuples to Kafka.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/73e54a8f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/73e54a8f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/73e54a8f
Branch: refs/heads/master
Commit: 73e54a8f8b0ef9a62955c7c1cee20925d359772b
Parents: 6c6bec9
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 1 11:51:42 2014 +0200
Committer: Niels Basjes <nb...@bol.com>
Committed: Wed Oct 1 11:51:42 2014 +0200
----------------------------------------------------------------------
external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/73e54a8f/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index b6c3de4..2025766 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -89,6 +89,10 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
@Override
public void execute(Tuple input) {
+ if (input.isTick()) {
+ return; // Do not try to send ticks to Kafka
+ }
+
K key = null;
V message = null;
String topic = null;
[12/13] storm git commit: Merge branch 'STORM-512' of
https://github.com/nielsbasjes/storm into STORM-512
Posted by bo...@apache.org.
Merge branch 'STORM-512' of https://github.com/nielsbasjes/storm into STORM-512
STORM-512: KafkaBolt doesn't handle ticks properly
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/25eb1418
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/25eb1418
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/25eb1418
Branch: refs/heads/master
Commit: 25eb1418b87840b3b07e2045877fef7a0b2f28da
Parents: 2aaa718 1ca5f76
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Apr 2 14:34:04 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Apr 2 14:34:04 2015 -0500
----------------------------------------------------------------------
.../storm/starter/bolt/AbstractRankerBolt.java | 4 +--
.../storm/starter/bolt/RollingCountBolt.java | 4 +--
.../jvm/storm/starter/util/TupleHelpers.java | 33 ------------------
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 5 +++
.../jvm/backtype/storm/utils/TupleUtils.java | 35 ++++++++++++++++++++
.../trident/topology/TridentBoltExecutor.java | 3 +-
6 files changed, 46 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/25eb1418/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
[07/13] storm git commit: Code cleanup in mocking storm-starter tests
Posted by bo...@apache.org.
Code cleanup in mocking storm-starter tests
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4e7a7e10
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4e7a7e10
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4e7a7e10
Branch: refs/heads/master
Commit: 4e7a7e102723deb0f827946db28a88f7276b113e
Parents: 71bd4a4
Author: Niels Basjes <ni...@basjes.nl>
Authored: Tue Dec 9 13:56:11 2014 +0100
Committer: Niels Basjes <ni...@basjes.nl>
Committed: Tue Dec 9 13:56:11 2014 +0100
----------------------------------------------------------------------
.../test/jvm/storm/starter/tools/MockTupleHelpers.java | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4e7a7e10/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 3180fd3..374288e 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -29,9 +29,7 @@ public final class MockTupleHelpers {
}
public static Tuple mockTickTuple() {
- Tuple tuple = mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
- when(tuple.isTick()).thenReturn(true);
- return tuple;
+ return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
}
public static Tuple mockTuple(String componentId, String streamId) {
[13/13] storm git commit: Added STORM-512 to Changelog and removed
unneeded import
Posted by bo...@apache.org.
Added STORM-512 to Changelog and removed unneeded import
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd64c3d2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd64c3d2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd64c3d2
Branch: refs/heads/master
Commit: fd64c3d2dc61838c1e5c705426b8165068e12c3e
Parents: 25eb141
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Apr 2 14:57:27 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Apr 2 14:57:27 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
.../src/jvm/storm/starter/bolt/RollingCountAggBolt.java | 1 -
2 files changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fd64c3d2/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0656691..e543a84 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,7 @@
* STORM-703: With hash key option for RedisMapState, only get values for keys in batch
* STORM-691: Add basic lookup / persist bolts
* STORM-727: Storm tests should succeed even if a storm process is running locally.
+ * STORM-512: KafkaBolt doesn't handle ticks properly
## 0.10.0
* STORM-681: Auto insert license header with genthrift.sh
http://git-wip-us.apache.org/repos/asf/storm/blob/fd64c3d2/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
index e513b09..e222a97 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
@@ -28,7 +28,6 @@ import backtype.storm.tuple.Values;
import org.apache.log4j.Logger;
import storm.starter.tools.NthLastModifiedTimeTracker;
import storm.starter.tools.SlidingWindowCounter;
-import storm.starter.util.TupleHelpers;
import java.util.HashMap;
import java.util.Map;
[11/13] storm git commit: Reducing the differences
Posted by bo...@apache.org.
Reducing the differences
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1ca5f765
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1ca5f765
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1ca5f765
Branch: refs/heads/master
Commit: 1ca5f7659ea807f1ae83694788e889ac5cae6669
Parents: 59fb8de
Author: Niels Basjes <ni...@basjes.nl>
Authored: Mon Dec 15 10:58:49 2014 +0100
Committer: Niels Basjes <ni...@basjes.nl>
Committed: Mon Dec 15 10:58:49 2014 +0100
----------------------------------------------------------------------
.../test/jvm/storm/starter/tools/MockTupleHelpers.java | 1 -
storm-core/src/jvm/backtype/storm/tuple/Tuple.java | 4 ++--
storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java | 3 +--
.../src/jvm/storm/trident/topology/TridentBoltExecutor.java | 2 +-
4 files changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1ca5f765/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index eeaeeae..b253350 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -37,5 +37,4 @@ public final class MockTupleHelpers {
when(tuple.getSourceStreamId()).thenReturn(streamId);
return tuple;
}
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1ca5f765/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index c644fec..113b300 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -153,7 +153,7 @@ public interface Tuple {
* Gets the id of the component that created this tuple.
*/
public String getSourceComponent();
-
+
/**
* Gets the id of the task that created this tuple.
*/
@@ -163,7 +163,7 @@ public interface Tuple {
* Gets the id of the stream that this tuple was emitted to.
*/
public String getSourceStreamId();
-
+
/**
* Gets the message id that associated with this tuple.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/1ca5f765/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 7829327..818eff1 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -17,7 +17,6 @@
*/
package backtype.storm.tuple;
-import backtype.storm.Constants;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.utils.IndifferentAccessMap;
@@ -213,7 +212,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
public String getSourceStreamId() {
return streamId;
}
-
+
public MessageId getMessageId() {
return id;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1ca5f765/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index 41741a1..a23e555 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -300,7 +300,7 @@ public class TridentBoltExecutor implements IRichBolt {
@Override
public void execute(Tuple tuple) {
- if (TupleUtils.isTick(tuple)) {
+ if(TupleUtils.isTick(tuple)) {
long now = System.currentTimeMillis();
if(now - _lastRotate > _messageTimeoutMs) {
_batches.rotate();
[03/13] storm git commit: Use isTick in all relevant places to avoid
code duplication.
Posted by bo...@apache.org.
Use isTick in all relevant places to avoid code duplication.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8888ae63
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8888ae63
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8888ae63
Branch: refs/heads/master
Commit: 8888ae631360ad124af9c480d2f8b621b9c51727
Parents: 73e54a8
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 1 11:53:17 2014 +0200
Committer: Niels Basjes <nb...@bol.com>
Committed: Wed Oct 1 11:53:17 2014 +0200
----------------------------------------------------------------------
storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8888ae63/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index 4dfccc6..da4c1a5 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -299,7 +299,7 @@ public class TridentBoltExecutor implements IRichBolt {
@Override
public void execute(Tuple tuple) {
- if(tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+ if(tuple.isTick()) {
long now = System.currentTimeMillis();
if(now - _lastRotate > _messageTimeoutMs) {
_batches.rotate();
[06/13] storm git commit: Fixed the tests in storm-starter that do
not use the actual TupleImpl but mock everything themselves
Posted by bo...@apache.org.
Fixed the tests in storm-starter that do not use the actual TupleImpl but mock everything themselves
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/71bd4a43
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/71bd4a43
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/71bd4a43
Branch: refs/heads/master
Commit: 71bd4a435b882120f7542fcae678a52e20600539
Parents: cd47f1d
Author: Niels Basjes <ni...@basjes.nl>
Authored: Tue Dec 9 13:53:05 2014 +0100
Committer: Niels Basjes <ni...@basjes.nl>
Committed: Tue Dec 9 13:53:05 2014 +0100
----------------------------------------------------------------------
.../test/jvm/storm/starter/tools/MockTupleHelpers.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/71bd4a43/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index b253350..3180fd3 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -19,6 +19,7 @@ package storm.starter.tools;
import backtype.storm.Constants;
import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
import static org.mockito.Mockito.*;
@@ -28,13 +29,22 @@ public final class MockTupleHelpers {
}
public static Tuple mockTickTuple() {
- return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
+ Tuple tuple = mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
+ when(tuple.isTick()).thenReturn(true);
+ return tuple;
}
public static Tuple mockTuple(String componentId, String streamId) {
Tuple tuple = mock(Tuple.class);
when(tuple.getSourceComponent()).thenReturn(componentId);
when(tuple.getSourceStreamId()).thenReturn(streamId);
+ when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
return tuple;
}
+
+ private static boolean isTick(String componentId, String streamId) {
+ return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
+ streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
+ }
+
}
[05/13] storm git commit: Added missing ack for the tick
Posted by bo...@apache.org.
Added missing ack for the tick
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cd47f1d6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cd47f1d6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cd47f1d6
Branch: refs/heads/master
Commit: cd47f1d6e4165f8e3393aa5be01d02f4148c3216
Parents: 59b2a42
Author: Niels Basjes <nb...@bol.com>
Authored: Thu Oct 30 10:27:05 2014 +0100
Committer: Niels Basjes <nb...@bol.com>
Committed: Thu Oct 30 10:27:05 2014 +0100
----------------------------------------------------------------------
external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cd47f1d6/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 2025766..7de25db 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -90,6 +90,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
@Override
public void execute(Tuple input) {
if (input.isTick()) {
+ collector.ack(input);
return; // Do not try to send ticks to Kafka
}
[08/13] storm git commit: Code cleanup in mocking storm-starter tests
Posted by bo...@apache.org.
Code cleanup in mocking storm-starter tests
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/401ebebd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/401ebebd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/401ebebd
Branch: refs/heads/master
Commit: 401ebebdf8b201b87ff944cc3c2217cc30acd1fc
Parents: 4e7a7e1
Author: Niels Basjes <ni...@basjes.nl>
Authored: Tue Dec 9 14:00:33 2014 +0100
Committer: Niels Basjes <ni...@basjes.nl>
Committed: Tue Dec 9 14:00:33 2014 +0100
----------------------------------------------------------------------
.../test/jvm/storm/starter/tools/MockTupleHelpers.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/401ebebd/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 374288e..9e8629c 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -19,7 +19,6 @@ package storm.starter.tools;
import backtype.storm.Constants;
import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
import static org.mockito.Mockito.*;
@@ -39,9 +38,9 @@ public final class MockTupleHelpers {
when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
return tuple;
}
-
+
private static boolean isTick(String componentId, String streamId) {
- return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
+ return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
}
[04/13] storm git commit: Replaced TupleHelpers in the examples with
the new tuple.isTick()
Posted by bo...@apache.org.
Replaced TupleHelpers in the examples with the new tuple.isTick()
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/59b2a42a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/59b2a42a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/59b2a42a
Branch: refs/heads/master
Commit: 59b2a42af4419e9217849add60e62a0c863c1ece
Parents: 8888ae6
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 29 15:50:18 2014 +0100
Committer: Niels Basjes <nb...@bol.com>
Committed: Wed Oct 29 15:50:18 2014 +0100
----------------------------------------------------------------------
.../storm/starter/bolt/AbstractRankerBolt.java | 3 +-
.../storm/starter/bolt/RollingCountBolt.java | 3 +-
.../jvm/storm/starter/util/TupleHelpers.java | 33 --------------------
3 files changed, 2 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/59b2a42a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index cc5c0e7..83c2cfc 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@ -26,7 +26,6 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.log4j.Logger;
import storm.starter.tools.Rankings;
-import storm.starter.util.TupleHelpers;
import java.util.HashMap;
import java.util.Map;
@@ -78,7 +77,7 @@ public abstract class AbstractRankerBolt extends BaseBasicBolt {
*/
@Override
public final void execute(Tuple tuple, BasicOutputCollector collector) {
- if (TupleHelpers.isTickTuple(tuple)) {
+ if (tuple.isTick()) {
getLogger().debug("Received tick tuple, triggering emit of current rankings");
emitRankings(collector);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/59b2a42a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
index f83906c..f023c0b 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
@@ -28,7 +28,6 @@ import backtype.storm.tuple.Values;
import org.apache.log4j.Logger;
import storm.starter.tools.NthLastModifiedTimeTracker;
import storm.starter.tools.SlidingWindowCounter;
-import storm.starter.util.TupleHelpers;
import java.util.HashMap;
import java.util.Map;
@@ -95,7 +94,7 @@ public class RollingCountBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
- if (TupleHelpers.isTickTuple(tuple)) {
+ if (tuple.isTick()) {
LOG.debug("Received tick tuple, triggering emit of current window counts");
emitCurrentWindowCounts();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/59b2a42a/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java b/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
deleted file mode 100644
index 4ea669e..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.util;
-
-import backtype.storm.Constants;
-import backtype.storm.tuple.Tuple;
-
-public final class TupleHelpers {
-
- private TupleHelpers() {
- }
-
- public static boolean isTickTuple(Tuple tuple) {
- return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(
- Constants.SYSTEM_TICK_STREAM_ID);
- }
-
-}
[10/13] storm git commit: Refactor to move the isTick to a utility
class
Posted by bo...@apache.org.
Refactor to move the isTick to a utility class
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/59fb8ded
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/59fb8ded
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/59fb8ded
Branch: refs/heads/master
Commit: 59fb8ded9ef892792879efc94b12ea46ff924c41
Parents: 4d2804a
Author: Niels Basjes <ni...@basjes.nl>
Authored: Mon Dec 15 10:56:05 2014 +0100
Committer: Niels Basjes <ni...@basjes.nl>
Committed: Mon Dec 15 10:56:05 2014 +0100
----------------------------------------------------------------------
.../storm/starter/bolt/AbstractRankerBolt.java | 3 +-
.../storm/starter/bolt/RollingCountBolt.java | 3 +-
.../storm/starter/tools/MockTupleHelpers.java | 6 ----
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 4 +--
.../src/jvm/backtype/storm/tuple/Tuple.java | 5 ---
.../src/jvm/backtype/storm/tuple/TupleImpl.java | 5 ---
.../jvm/backtype/storm/utils/TupleUtils.java | 35 ++++++++++++++++++++
.../trident/topology/TridentBoltExecutor.java | 3 +-
8 files changed, 43 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index 83c2cfc..64ceb29 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@ -24,6 +24,7 @@ import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
import org.apache.log4j.Logger;
import storm.starter.tools.Rankings;
@@ -77,7 +78,7 @@ public abstract class AbstractRankerBolt extends BaseBasicBolt {
*/
@Override
public final void execute(Tuple tuple, BasicOutputCollector collector) {
- if (tuple.isTick()) {
+ if (TupleUtils.isTick(tuple)) {
getLogger().debug("Received tick tuple, triggering emit of current rankings");
emitRankings(collector);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
index f023c0b..31f7ee2 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
@@ -25,6 +25,7 @@ import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
import org.apache.log4j.Logger;
import storm.starter.tools.NthLastModifiedTimeTracker;
import storm.starter.tools.SlidingWindowCounter;
@@ -94,7 +95,7 @@ public class RollingCountBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
- if (tuple.isTick()) {
+ if (TupleUtils.isTick(tuple)) {
LOG.debug("Received tick tuple, triggering emit of current window counts");
emitCurrentWindowCounts();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 9e8629c..eeaeeae 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -35,13 +35,7 @@ public final class MockTupleHelpers {
Tuple tuple = mock(Tuple.class);
when(tuple.getSourceComponent()).thenReturn(componentId);
when(tuple.getSourceStreamId()).thenReturn(streamId);
- when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
return tuple;
}
- private static boolean isTick(String componentId, String streamId) {
- return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
- streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
- }
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 7de25db..35c0da6 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -22,6 +22,7 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
@@ -89,8 +90,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
@Override
public void execute(Tuple input) {
- if (input.isTick()) {
- collector.ack(input);
+ if (TupleUtils.isTick(input)) {
return; // Do not try to send ticks to Kafka
}
http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index b3f5e56..c644fec 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -165,11 +165,6 @@ public interface Tuple {
public String getSourceStreamId();
/**
- * Returns if this tuple is a tick tuple or not.
- */
- public boolean isTick();
-
- /**
* Gets the message id that associated with this tuple.
*/
public MessageId getMessageId();
http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 40ad11c..7829327 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -214,11 +214,6 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
return streamId;
}
- public boolean isTick() {
- return Constants.SYSTEM_COMPONENT_ID.equals(this.getSourceComponent()) &&
- Constants.SYSTEM_TICK_STREAM_ID.equals(this.getSourceStreamId());
- }
-
public MessageId getMessageId() {
return id;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
new file mode 100644
index 0000000..f9fb2c0
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import backtype.storm.Constants;
+import backtype.storm.tuple.Tuple;
+
+public final class TupleUtils {
+
+ private TupleUtils() {
+ // No instantiation
+ }
+
+ public static boolean isTick(Tuple tuple) {
+ return tuple != null
+ && Constants.SYSTEM_COMPONENT_ID .equals(tuple.getSourceComponent())
+ && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/59fb8ded/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index da4c1a5..41741a1 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -34,6 +34,7 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.RotatingMap;
+import backtype.storm.utils.TupleUtils;
import backtype.storm.utils.Utils;
import java.io.Serializable;
import java.util.Arrays;
@@ -299,7 +300,7 @@ public class TridentBoltExecutor implements IRichBolt {
@Override
public void execute(Tuple tuple) {
- if(tuple.isTick()) {
+ if (TupleUtils.isTick(tuple)) {
long now = System.currentTimeMillis();
if(now - _lastRotate > _messageTimeoutMs) {
_batches.rotate();