You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ar...@apache.org on 2017/05/30 10:26:18 UTC
[1/2] storm git commit: STORM-2516: Fix timing issues with
testPrepareLateTupleStreamWithoutBuilder
Repository: storm
Updated Branches:
refs/heads/1.x-branch 64aa427e5 -> 4ed0b54d1
STORM-2516: Fix timing issues with testPrepareLateTupleStreamWithoutBuilder
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/babf3f1d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/babf3f1d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/babf3f1d
Branch: refs/heads/1.x-branch
Commit: babf3f1d7fe18154b01af05dd70624ff44bface7
Parents: 64aa427
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Tue May 16 21:27:41 2017 +0200
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue May 30 15:55:46 2017 +0530
----------------------------------------------------------------------
.../windowing/WaterMarkEventGenerator.java | 28 +++++++++++++-------
.../topology/WindowedBoltExecutorTest.java | 26 +++++++++---------
2 files changed, 31 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/babf3f1d/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java b/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
index e1df72c..d4f431f 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
@@ -15,12 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.windowing;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.topology.FailedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.storm.windowing;
import java.util.Map;
import java.util.Set;
@@ -30,6 +26,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.topology.FailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tracks tuples across input streams and periodically emits watermark events.
@@ -46,15 +47,22 @@ public class WaterMarkEventGenerator<T> implements Runnable {
private final ScheduledExecutorService executorService;
private final int interval;
private ScheduledFuture<?> executorFuture;
- private long lastWaterMarkTs = 0;
+ private volatile long lastWaterMarkTs;
- public WaterMarkEventGenerator(WindowManager<T> windowManager, int interval,
- int eventTsLag, Set<GlobalStreamId> inputStreams) {
+ /**
+ * Creates a new WatermarkEventGenerator.
+ * @param windowManager The window manager this generator will submit watermark events to
+ * @param intervalMs The generator will check if it should generate a watermark event with this interval
+ * @param eventTsLagMs The max allowed lag behind the last watermark event before an event is considered late
+ * @param inputStreams The input streams this generator is expected to handle
+ */
+ public WaterMarkEventGenerator(WindowManager<T> windowManager, int intervalMs,
+ int eventTsLagMs, Set<GlobalStreamId> inputStreams) {
this.windowManager = windowManager;
streamToTs = new ConcurrentHashMap<>();
executorService = Executors.newSingleThreadScheduledExecutor();
- this.interval = interval;
- this.eventTsLag = eventTsLag;
+ this.interval = intervalMs;
+ this.eventTsLag = eventTsLagMs;
this.inputStreams = inputStreams;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/babf3f1d/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java b/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
index a051a6b..76fdf33 100644
--- a/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
+++ b/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
@@ -28,7 +28,6 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Time;
import org.apache.storm.windowing.TupleWindow;
import org.junit.Before;
import org.junit.Test;
@@ -52,7 +51,7 @@ import static org.junit.Assert.fail;
* Unit tests for {@link WindowedBoltExecutor}
*/
public class WindowedBoltExecutorTest {
-
+
private WindowedBoltExecutor executor;
private TestWindowedBolt testWindowedBolt;
@@ -124,8 +123,8 @@ public class WindowedBoltExecutorTest {
@Test
public void testExecuteWithTs() throws Exception {
- long[] timstamps = {603, 605, 607, 618, 626, 636};
- for (long ts : timstamps) {
+ long[] timestamps = {603, 605, 607, 618, 626, 636};
+ for (long ts : timestamps) {
executor.execute(getTuple("s1", new Fields("ts"), new Values(ts)));
}
//Thread.sleep(120);
@@ -172,7 +171,7 @@ public class WindowedBoltExecutorTest {
@Test
- public void testPrepareLateTUpleStreamWithoutBuilder() throws Exception {
+ public void testPrepareLateTupleStreamWithoutBuilder() throws Exception {
Map<String, Object> conf = new HashMap<>();
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
@@ -209,20 +208,21 @@ public class WindowedBoltExecutorTest {
conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
- conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10);
+ //Trigger manually to avoid timing issues
+ conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 1_000_000);
executor.prepare(conf, context, outputCollector);
- long[] timstamps = {603, 605, 607, 618, 626, 636, 600};
- List<Tuple> tuples = new ArrayList<>(timstamps.length);
+ long[] timestamps = {603, 605, 607, 618, 626, 636, 600};
+ List<Tuple> tuples = new ArrayList<>(timestamps.length);
- executor.waterMarkEventGenerator.run();
- for (long ts : timstamps) {
+ for (long ts : timestamps) {
Tuple tuple = getTuple("s1", new Fields("ts"), new Values(ts));
tuples.add(tuple);
executor.execute(tuple);
- Time.sleep(10);
- }
-
+
+ //Update the watermark to this timestamp
+ executor.waterMarkEventGenerator.run();
+ }
System.out.println(testWindowedBolt.tupleWindows);
Tuple tuple = tuples.get(tuples.size() - 1);
Mockito.verify(outputCollector).emit("$late", Arrays.asList(tuple), new Values(tuple));
[2/2] storm git commit: Added STORM-2516 to CHANGELOG.md
Posted by ar...@apache.org.
Added STORM-2516 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4ed0b54d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4ed0b54d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4ed0b54d
Branch: refs/heads/1.x-branch
Commit: 4ed0b54d1fa3a00a5f20fab42f644fb3015f45c2
Parents: babf3f1
Author: Arun Mahadevan <ar...@apache.org>
Authored: Tue May 30 15:56:14 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue May 30 15:56:14 2017 +0530
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4ed0b54d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 004d888..0d28a2f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.1
+ * STORM-2516: Fix timing issues with testPrepareLateTupleStreamWithoutBuilder
* STORM-2489: Overlap and data loss on WindowedBolt based on Duration
* STORM-2528: Bump log4j version to 2.8.2
* STORM-2527: Initialize java.sql.DriverManager earlier to avoid deadlock