You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/09/20 07:43:41 UTC
[1/2] storm git commit: [STORM-2731] - Simple checks in Storm
Windowing
Repository: storm
Updated Branches:
refs/heads/master 62dec63b5 -> 50d55a951
[STORM-2731] - Simple checks in Storm Windowing
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/91ac5c80
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/91ac5c80
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/91ac5c80
Branch: refs/heads/master
Commit: 91ac5c801a0f000a4ae2313437d4de79ea5bd2bc
Parents: 32389d7
Author: Jerry Peng <je...@Jerrys-MacBook-Pro.local>
Authored: Thu Sep 7 15:32:39 2017 -0700
Committer: Jerry Peng <je...@Jerrys-MacBook-Pro.local>
Committed: Tue Sep 19 13:58:40 2017 -0700
----------------------------------------------------------------------
.../storm/topology/base/BaseWindowedBolt.java | 32 ++++++++++++++++++--
1 file changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/91ac5c80/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java b/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
index c445e9d..918760a 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
@@ -89,7 +89,14 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
public final int value;
public Duration(int value, TimeUnit timeUnit) {
- this.value = (int) timeUnit.toMillis(value);
+ if (value < 0) {
+ throw new IllegalArgumentException("Duration cannot be negative");
+ }
+ long longVal = timeUnit.toMillis(value);
+ if (longVal > (long) Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("Duration is too long");
+ }
+ this.value = (int)longVal;
}
/**
@@ -165,12 +172,14 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
'}';
}
}
-
protected BaseWindowedBolt() {
windowConfiguration = new HashMap<>();
}
private BaseWindowedBolt withWindowLength(Count count) {
+ if (count == null) {
+ throw new IllegalArgumentException("Window length count cannot be set null");
+ }
if (count.value <= 0) {
throw new IllegalArgumentException("Window length must be positive [" + count + "]");
}
@@ -179,6 +188,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
}
private BaseWindowedBolt withWindowLength(Duration duration) {
+ if (duration == null) {
+ throw new IllegalArgumentException("Window length duration cannot be set null");
+ }
if (duration.value <= 0) {
throw new IllegalArgumentException("Window length must be positive [" + duration + "]");
}
@@ -188,6 +200,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
}
private BaseWindowedBolt withSlidingInterval(Count count) {
+ if (count == null) {
+ throw new IllegalArgumentException("Sliding interval count cannot be set null");
+ }
if (count.value <= 0) {
throw new IllegalArgumentException("Sliding interval must be positive [" + count + "]");
}
@@ -196,6 +211,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
}
private BaseWindowedBolt withSlidingInterval(Duration duration) {
+ if (duration == null) {
+ throw new IllegalArgumentException("Sliding interval duration cannot be set null");
+ }
if (duration.value <= 0) {
throw new IllegalArgumentException("Sliding interval must be positive [" + duration + "]");
}
@@ -282,6 +300,7 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
/**
* Specify a field in the tuple that represents the timestamp as a long value. If this
* field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
+ * The field MUST contain a timestamp in milliseconds
*
* @param fieldName the name of the field that contains the timestamp
*/
@@ -295,6 +314,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
* @param timestampExtractor the {@link TimestampExtractor} implementation
*/
public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor) {
+ if (timestampExtractor == null) {
+ throw new IllegalArgumentException("Timestamp extractor cannot be set to null");
+ }
if (this.timestampExtractor != null) {
throw new IllegalArgumentException("Window is already configured with a timestamp extractor: " + timestampExtractor);
}
@@ -316,6 +338,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
* @param streamId the name of the stream used to emit late tuples on
*/
public BaseWindowedBolt withLateTupleStream(String streamId) {
+ if (streamId == null) {
+ throw new IllegalArgumentException("Cannot set late tuple stream id to null");
+ }
windowConfiguration.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, streamId);
return this;
}
@@ -339,6 +364,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
* @param interval the interval at which watermark events are generated
*/
public BaseWindowedBolt withWatermarkInterval(Duration interval) {
+ if (interval == null) {
+ throw new IllegalArgumentException("Watermark interval cannot be set null");
+ }
windowConfiguration.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, interval.value);
return this;
}
[2/2] storm git commit: Merge branch 'STORM-2731' of
https://github.com/jerrypeng/storm into STORM-2731-merge
Posted by sr...@apache.org.
Merge branch 'STORM-2731' of https://github.com/jerrypeng/storm into STORM-2731-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/50d55a95
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/50d55a95
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/50d55a95
Branch: refs/heads/master
Commit: 50d55a951bcf36afadf281f56099fe31d4387025
Parents: 62dec63 91ac5c8
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Sep 20 07:59:33 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Sep 20 07:59:33 2017 +0200
----------------------------------------------------------------------
.../storm/topology/base/BaseWindowedBolt.java | 32 ++++++++++++++++++--
1 file changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------