You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/09/20 07:43:41 UTC

[1/2] storm git commit: [STORM-2731] - Simple checks in Storm Windowing

Repository: storm
Updated Branches:
  refs/heads/master 62dec63b5 -> 50d55a951


[STORM-2731] - Simple checks in Storm Windowing


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/91ac5c80
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/91ac5c80
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/91ac5c80

Branch: refs/heads/master
Commit: 91ac5c801a0f000a4ae2313437d4de79ea5bd2bc
Parents: 32389d7
Author: Jerry Peng <je...@Jerrys-MacBook-Pro.local>
Authored: Thu Sep 7 15:32:39 2017 -0700
Committer: Jerry Peng <je...@Jerrys-MacBook-Pro.local>
Committed: Tue Sep 19 13:58:40 2017 -0700

----------------------------------------------------------------------
 .../storm/topology/base/BaseWindowedBolt.java   | 32 ++++++++++++++++++--
 1 file changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/91ac5c80/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java b/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
index c445e9d..918760a 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
@@ -89,7 +89,14 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
         public final int value;
 
         public Duration(int value, TimeUnit timeUnit) {
-            this.value = (int) timeUnit.toMillis(value);
+            if (value < 0) {
+                throw new IllegalArgumentException("Duration cannot be negative");
+            }
+            long longVal = timeUnit.toMillis(value);
+            if (longVal > (long) Integer.MAX_VALUE) {
+                throw new IllegalArgumentException("Duration is too long");
+            }
+            this.value = (int)longVal;
         }
 
         /**
@@ -165,12 +172,14 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
                     '}';
         }
     }
-
     protected BaseWindowedBolt() {
         windowConfiguration = new HashMap<>();
     }
 
     private BaseWindowedBolt withWindowLength(Count count) {
+        if (count == null) {
+            throw new IllegalArgumentException("Window length count cannot be set null");
+        }
         if (count.value <= 0) {
             throw new IllegalArgumentException("Window length must be positive [" + count + "]");
         }
@@ -179,6 +188,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
     }
 
     private BaseWindowedBolt withWindowLength(Duration duration) {
+        if (duration == null) {
+            throw new IllegalArgumentException("Window length duration cannot be set null");
+        }
         if (duration.value <= 0) {
             throw new IllegalArgumentException("Window length must be positive [" + duration + "]");
         }
@@ -188,6 +200,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
     }
 
     private BaseWindowedBolt withSlidingInterval(Count count) {
+        if (count == null) {
+            throw new IllegalArgumentException("Sliding interval count cannot be set null");
+        }
         if (count.value <= 0) {
             throw new IllegalArgumentException("Sliding interval must be positive [" + count + "]");
         }
@@ -196,6 +211,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
     }
 
     private BaseWindowedBolt withSlidingInterval(Duration duration) {
+        if (duration == null) {
+            throw new IllegalArgumentException("Sliding interval duration cannot be set null");
+        }
         if (duration.value <= 0) {
             throw new IllegalArgumentException("Sliding interval must be positive [" + duration + "]");
         }
@@ -282,6 +300,7 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
     /**
      * Specify a field in the tuple that represents the timestamp as a long value. If this
      * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
+     * The field MUST contain a timestamp in milliseconds
      *
      * @param fieldName the name of the field that contains the timestamp
      */
@@ -295,6 +314,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
      * @param timestampExtractor the {@link TimestampExtractor} implementation
      */
     public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor) {
+        if (timestampExtractor == null) {
+            throw new IllegalArgumentException("Timestamp extractor cannot be set to null");
+        }
         if (this.timestampExtractor != null) {
             throw new IllegalArgumentException("Window is already configured with a timestamp extractor: " + timestampExtractor);
         }
@@ -316,6 +338,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
      * @param streamId the name of the stream used to emit late tuples on
      */
     public BaseWindowedBolt withLateTupleStream(String streamId) {
+        if (streamId == null) {
+            throw new IllegalArgumentException("Cannot set late tuple stream id to null");
+        }
         windowConfiguration.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, streamId);
         return this;
     }
@@ -339,6 +364,9 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
      * @param interval the interval at which watermark events are generated
      */
     public BaseWindowedBolt withWatermarkInterval(Duration interval) {
+        if (interval == null) {
+            throw new IllegalArgumentException("Watermark interval cannot be set null");
+        }
         windowConfiguration.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, interval.value);
         return this;
     }


[2/2] storm git commit: Merge branch 'STORM-2731' of https://github.com/jerrypeng/storm into STORM-2731-merge

Posted by sr...@apache.org.
Merge branch 'STORM-2731' of https://github.com/jerrypeng/storm into STORM-2731-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/50d55a95
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/50d55a95
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/50d55a95

Branch: refs/heads/master
Commit: 50d55a951bcf36afadf281f56099fe31d4387025
Parents: 62dec63 91ac5c8
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Sep 20 07:59:33 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Sep 20 07:59:33 2017 +0200

----------------------------------------------------------------------
 .../storm/topology/base/BaseWindowedBolt.java   | 32 ++++++++++++++++++--
 1 file changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------