You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ar...@apache.org on 2017/05/30 10:26:18 UTC

[1/2] storm git commit: STORM-2516: Fix timing issues with testPrepareLateTupleStreamWithoutBuilder

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 64aa427e5 -> 4ed0b54d1


STORM-2516: Fix timing issues with testPrepareLateTupleStreamWithoutBuilder


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/babf3f1d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/babf3f1d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/babf3f1d

Branch: refs/heads/1.x-branch
Commit: babf3f1d7fe18154b01af05dd70624ff44bface7
Parents: 64aa427
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Tue May 16 21:27:41 2017 +0200
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue May 30 15:55:46 2017 +0530

----------------------------------------------------------------------
 .../windowing/WaterMarkEventGenerator.java      | 28 +++++++++++++-------
 .../topology/WindowedBoltExecutorTest.java      | 26 +++++++++---------
 2 files changed, 31 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/babf3f1d/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java b/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
index e1df72c..d4f431f 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
@@ -15,12 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.windowing;
 
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.topology.FailedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.storm.windowing;
 
 import java.util.Map;
 import java.util.Set;
@@ -30,6 +26,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.topology.FailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tracks tuples across input streams and periodically emits watermark events.
@@ -46,15 +47,22 @@ public class WaterMarkEventGenerator<T> implements Runnable {
     private final ScheduledExecutorService executorService;
     private final int interval;
     private ScheduledFuture<?> executorFuture;
-    private long lastWaterMarkTs = 0;
+    private volatile long lastWaterMarkTs;
 
-    public WaterMarkEventGenerator(WindowManager<T> windowManager, int interval,
-                                   int eventTsLag, Set<GlobalStreamId> inputStreams) {
+    /**
+     * Creates a new WatermarkEventGenerator.
+     * @param windowManager The window manager this generator will submit watermark events to
+     * @param intervalMs The generator will check if it should generate a watermark event with this interval
+     * @param eventTsLagMs The max allowed lag behind the last watermark event before an event is considered late
+     * @param inputStreams The input streams this generator is expected to handle
+     */
+    public WaterMarkEventGenerator(WindowManager<T> windowManager, int intervalMs,
+                                   int eventTsLagMs, Set<GlobalStreamId> inputStreams) {
         this.windowManager = windowManager;
         streamToTs = new ConcurrentHashMap<>();
         executorService = Executors.newSingleThreadScheduledExecutor();
-        this.interval = interval;
-        this.eventTsLag = eventTsLag;
+        this.interval = intervalMs;
+        this.eventTsLag = eventTsLagMs;
         this.inputStreams = inputStreams;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/babf3f1d/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java b/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
index a051a6b..76fdf33 100644
--- a/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
+++ b/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
@@ -28,7 +28,6 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Time;
 import org.apache.storm.windowing.TupleWindow;
 import org.junit.Before;
 import org.junit.Test;
@@ -52,7 +51,7 @@ import static org.junit.Assert.fail;
  * Unit tests for {@link WindowedBoltExecutor}
  */
 public class WindowedBoltExecutorTest {
-
+    
     private WindowedBoltExecutor executor;
     private TestWindowedBolt testWindowedBolt;
 
@@ -124,8 +123,8 @@ public class WindowedBoltExecutorTest {
 
     @Test
     public void testExecuteWithTs() throws Exception {
-        long[] timstamps = {603, 605, 607, 618, 626, 636};
-        for (long ts : timstamps) {
+        long[] timestamps = {603, 605, 607, 618, 626, 636};
+        for (long ts : timestamps) {
             executor.execute(getTuple("s1", new Fields("ts"), new Values(ts)));
         }
         //Thread.sleep(120);
@@ -172,7 +171,7 @@ public class WindowedBoltExecutorTest {
 
 
     @Test
-    public void testPrepareLateTUpleStreamWithoutBuilder() throws Exception {
+    public void testPrepareLateTupleStreamWithoutBuilder() throws Exception {
         Map<String, Object> conf = new HashMap<>();
         conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
         conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
@@ -209,20 +208,21 @@ public class WindowedBoltExecutorTest {
         conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
         conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
         conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
-        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10);
+        //Trigger manually to avoid timing issues
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 1_000_000);
         executor.prepare(conf, context, outputCollector);
 
-        long[] timstamps = {603, 605, 607, 618, 626, 636, 600};
-        List<Tuple> tuples = new ArrayList<>(timstamps.length);
+        long[] timestamps = {603, 605, 607, 618, 626, 636, 600};
+        List<Tuple> tuples = new ArrayList<>(timestamps.length);
 
-        executor.waterMarkEventGenerator.run();
-        for (long ts : timstamps) {
+        for (long ts : timestamps) {
             Tuple tuple = getTuple("s1", new Fields("ts"), new Values(ts));
             tuples.add(tuple);
             executor.execute(tuple);
-            Time.sleep(10);
-        }
-
+            
+            //Update the watermark to this timestamp
+            executor.waterMarkEventGenerator.run();
+        } 
         System.out.println(testWindowedBolt.tupleWindows);
         Tuple tuple = tuples.get(tuples.size() - 1);
         Mockito.verify(outputCollector).emit("$late", Arrays.asList(tuple), new Values(tuple));


[2/2] storm git commit: Added STORM-2516 to CHANGELOG.md

Posted by ar...@apache.org.
Added STORM-2516 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4ed0b54d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4ed0b54d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4ed0b54d

Branch: refs/heads/1.x-branch
Commit: 4ed0b54d1fa3a00a5f20fab42f644fb3015f45c2
Parents: babf3f1
Author: Arun Mahadevan <ar...@apache.org>
Authored: Tue May 30 15:56:14 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue May 30 15:56:14 2017 +0530

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4ed0b54d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 004d888..0d28a2f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.1
+ * STORM-2516: Fix timing issues with testPrepareLateTupleStreamWithoutBuilder
  * STORM-2489: Overlap and data loss on WindowedBolt based on Duration
  * STORM-2528: Bump log4j version to 2.8.2
  * STORM-2527: Initialize java.sql.DriverManager earlier to avoid deadlock