You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ar...@apache.org on 2016/06/09 05:57:07 UTC

[1/2] storm git commit: STORM-1873 Configurable behaviour for late tuples

Repository: storm
Updated Branches:
  refs/heads/1.x-branch e99961311 -> 81b398d6b


STORM-1873 Configurable behaviour for late tuples


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c49b6f7d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c49b6f7d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c49b6f7d

Branch: refs/heads/1.x-branch
Commit: c49b6f7d94d05b2872cd0edd6f1becef1ac1aae5
Parents: e999613
Author: Balazs Kossovics <ba...@s4m.io>
Authored: Thu Jun 2 17:43:10 2016 +0200
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Thu Jun 9 11:25:04 2016 +0530

----------------------------------------------------------------------
 docs/Windowing.md                               | 24 +++++-
 .../clj/org/apache/storm/daemon/executor.clj    |  1 +
 storm-core/src/jvm/org/apache/storm/Config.java |  8 ++
 .../storm/topology/WindowedBoltExecutor.java    | 25 +++++-
 .../topology/base/BaseStatefulWindowedBolt.java |  8 ++
 .../storm/topology/base/BaseWindowedBolt.java   | 14 ++++
 .../topology/WindowedBoltExecutorTest.java      | 87 +++++++++++++++++++-
 7 files changed, 161 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/docs/Windowing.md
----------------------------------------------------------------------
diff --git a/docs/Windowing.md b/docs/Windowing.md
index 9f5869f..82212ea 100644
--- a/docs/Windowing.md
+++ b/docs/Windowing.md
@@ -149,10 +149,6 @@ The value for the above `fieldName` will be looked up from the incoming tuple an
 If the field is not present in the tuple an exception will be thrown. Along with the timestamp field name, a time lag parameter 
 can also be specified which indicates the max time limit for tuples with out of order timestamps. 
 
-E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05` no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple
-arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, it will be treated as a late tuple and not processed. Currently the late
- tuples are just logged in the worker log files at INFO level.
-
 ```java
 /**
 * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
@@ -163,6 +159,26 @@ arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, i
 public BaseWindowedBolt withLag(Duration duration)
 ```
 
+E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05` no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple
+arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, it will be treated as a late tuple. Late tuples are not processed by default,
+just logged in the worker log files at INFO level.
+
+```java
+/**
+ * Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the
+ * {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field.
+ * It must be defined on a per-component basis, and in conjunction with the
+ * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown.
+ *
+ * @param streamId the name of the stream used to emit late tuples on
+ */
+public BaseWindowedBolt withLateTupleStream(String streamId)
+
+```
+This behaviour can be changed by specifying the above `streamId`. In this case late tuples are going to be emitted on the specified stream and accessible
+via the field `WindowedBoltExecutor.LATE_TUPLE_FIELD`.
+
+
 ### Watermarks
 For processing tuples with timestamp field, storm internally computes watermarks based on the incoming tuple timestamp. Watermark is 
 the minimum of the latest tuple timestamps (minus the lag) across all the input streams. At a higher level this is similar to the watermark concept

http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 8c835d5..d50e494 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -173,6 +173,7 @@
                         TOPOLOGY-BOLTS-SLIDING-INTERVAL-COUNT
                         TOPOLOGY-BOLTS-SLIDING-INTERVAL-DURATION-MS
                         TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-FIELD-NAME
+                        TOPOLOGY-BOLTS-LATE-TUPLE-STREAM
                         TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-MAX-LAG-MS
                         TOPOLOGY-BOLTS-MESSAGE-ID-FIELD-NAME
                         TOPOLOGY-STATE-PROVIDER

http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 21aa010..d4a5125 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1874,6 +1874,14 @@ public class Config extends HashMap<String, Object> {
     @isString
     public static final String TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME = "topology.bolts.tuple.timestamp.field.name";
 
+    /**
+     * Bolt-specific configuration for windowed bolts to specify the name of the stream on which late tuples are
+     * going to be emitted. This configuration should only be used from the BaseWindowedBolt.withLateTupleStream builder
+     * method, and not as global parameter, otherwise IllegalArgumentException is going to be thrown.
+     */
+    @isString
+    public static final String TOPOLOGY_BOLTS_LATE_TUPLE_STREAM = "topology.bolts.late.tuple.stream";
+
     /*
      * Bolt-specific configuration for windowed bolts to specify the maximum time lag of the tuple timestamp
      * in milliseconds. It means that the tuple timestamps cannot be out of order by more than this amount.

http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index 9439ef2..fe0f4da 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -23,7 +23,9 @@ import org.apache.storm.spout.CheckpointSpout;
 import org.apache.storm.task.IOutputCollector;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import org.apache.storm.windowing.CountEvictionPolicy;
 import org.apache.storm.windowing.CountTriggerPolicy;
 import org.apache.storm.windowing.EvictionPolicy;
@@ -57,12 +59,14 @@ public class WindowedBoltExecutor implements IRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
     private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
     private static final int DEFAULT_MAX_LAG_MS = 0; // no lag
+    public static final String LATE_TUPLE_FIELD = "late_tuple";
     private final IWindowedBolt bolt;
     private transient WindowedOutputCollector windowedOutputCollector;
     private transient WindowLifecycleListener<Tuple> listener;
     private transient WindowManager<Tuple> windowManager;
     private transient int maxLagMs;
     private transient String tupleTsFieldName;
+    private transient String lateTupleStream;
     private transient TriggerPolicy<Tuple> triggerPolicy;
     private transient EvictionPolicy<Tuple> evictionPolicy;
     // package level for unit tests
@@ -163,6 +167,13 @@ public class WindowedBoltExecutor implements IRichBolt {
         // tuple ts
         if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME)) {
             tupleTsFieldName = (String) stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME);
+            // late tuple stream
+            lateTupleStream = (String) stormConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
+            if (lateTupleStream != null) {
+                if (!context.getThisStreams().contains(lateTupleStream)) {
+                    throw new IllegalArgumentException("Stream for late tuples must be defined with the builder method withLateTupleStream");
+                }
+            }
             // max lag
             if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
                 maxLagMs = ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
@@ -178,6 +189,10 @@ public class WindowedBoltExecutor implements IRichBolt {
             }
             waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
                                                                     maxLagMs, getComponentStreams(context));
+        } else {
+            if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
+                throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field");
+            }
         }
         // validate
         validate(stormConf, windowLengthCount, windowLengthDuration,
@@ -268,8 +283,12 @@ public class WindowedBoltExecutor implements IRichBolt {
             if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
                 windowManager.add(input, ts);
             } else {
+                if (lateTupleStream != null) {
+                    windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
+                } else {
+                    LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts);
+                }
                 windowedOutputCollector.ack(input);
-                LOG.info("Received a late tuple {} with ts {}. This will not processed.", input, ts);
             }
         } else {
             windowManager.add(input);
@@ -284,6 +303,10 @@ public class WindowedBoltExecutor implements IRichBolt {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        String lateTupleStream = (String) getComponentConfiguration().get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
+        if (lateTupleStream != null) {
+            declarer.declareStream(lateTupleStream, new Fields(LATE_TUPLE_FIELD));
+        }
         bolt.declareOutputFields(declarer);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java b/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
index 850904b..0c35b9d 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
@@ -103,6 +103,14 @@ public abstract class BaseStatefulWindowedBolt<T extends State> extends BaseWind
         return this;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public BaseStatefulWindowedBolt<T> withLateTupleStream(String streamName) {
+        super.withLateTupleStream(streamName);
+        return this;
+    }
 
     /**
      * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java b/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
index e1725dd..6a0a93c 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
@@ -255,6 +255,20 @@ public abstract class BaseWindowedBolt implements IWindowedBolt {
     }
 
     /**
+     * Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the
+     * {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field.
+     * It must be defined on a per-component basis, and in conjunction with the
+     * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown.
+     *
+     * @param streamId the name of the stream used to emit late tuples on
+     */
+    public BaseWindowedBolt withLateTupleStream(String streamId) {
+        windowConfiguration.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, streamId);
+        return this;
+    }
+
+
+    /**
      * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
      * cannot be out of order by more than this amount.
      *

http://git-wip-us.apache.org/repos/asf/storm/blob/c49b6f7d/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java b/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
index 0ba1e24..f6822ce 100644
--- a/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
+++ b/storm-core/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
@@ -28,18 +28,21 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
 import org.apache.storm.windowing.TupleWindow;
-import org.apache.storm.windowing.WaterMarkEvent;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.*;
 
 /**
@@ -87,6 +90,7 @@ public class WindowedBoltExecutorTest {
 
     private TopologyContext getTopologyContext() {
         TopologyContext context = Mockito.mock(TopologyContext.class);
+
         Map<GlobalStreamId, Grouping> sources = Collections.singletonMap(
                 new GlobalStreamId("s1", "default"),
                 null
@@ -139,4 +143,85 @@ public class WindowedBoltExecutorTest {
         assertArrayEquals(new long[]{618, 626},
                           new long[]{(long) third.get().get(0).getValue(0), (long)third.get().get(1).getValue(0)});
     }
+
+    @Test
+    public void testPrepareLateTupleStreamWithoutTs() throws Exception {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+        conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
+        conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
+        conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10);
+
+        testWindowedBolt = new TestWindowedBolt();
+        executor = new WindowedBoltExecutor(testWindowedBolt);
+        TopologyContext context = getTopologyContext();
+        // emulate the call of withLateTupleStream method
+        Mockito.when(context.getThisStreams()).thenReturn(new HashSet<>(Arrays.asList("default", "$late")));
+        try {
+            executor.prepare(conf, context, getOutputCollector());
+            fail();
+        } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage(), is("Late tuple stream can be defined only when specifying a timestamp field"));
+        }
+    }
+
+
+    @Test
+    public void testPrepareLateTUpleStreamWithoutBuilder() throws Exception {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+        conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
+        conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, "ts");
+        conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10);
+
+        testWindowedBolt = new TestWindowedBolt();
+        executor = new WindowedBoltExecutor(testWindowedBolt);
+        TopologyContext context = getTopologyContext();
+        try {
+            executor.prepare(conf, context, getOutputCollector());
+            fail();
+        } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage(), is("Stream for late tuples must be defined with the builder method withLateTupleStream"));
+        }
+    }
+
+
+    @Test
+    public void testExecuteWithLateTupleStream() throws Exception {
+        testWindowedBolt = new TestWindowedBolt();
+        executor = new WindowedBoltExecutor(testWindowedBolt);
+        TopologyContext context = getTopologyContext();
+        Mockito.when(context.getThisStreams()).thenReturn(new HashSet<>(Arrays.asList("default", "$late")));
+
+        OutputCollector outputCollector = Mockito.mock(OutputCollector.class);
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
+        conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20);
+        conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10);
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_FIELD_NAME, "ts");
+        conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
+        conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5);
+        conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10);
+        executor.prepare(conf, context, outputCollector);
+
+        long[] timstamps = {603, 605, 607, 618, 626, 636, 600};
+        List<Tuple> tuples = new ArrayList<>(timstamps.length);
+
+        executor.waterMarkEventGenerator.run();
+        for (long ts : timstamps) {
+            Tuple tuple = getTuple("s1", new Fields("ts"), new Values(ts));
+            tuples.add(tuple);
+            executor.execute(tuple);
+            Time.sleep(10);
+        }
+
+        System.out.println(testWindowedBolt.tupleWindows);
+        Tuple tuple = tuples.get(tuples.size() - 1);
+        Mockito.verify(outputCollector).emit("$late", Arrays.asList(tuple), new Values(tuple));
+    }
 }


[2/2] storm git commit: Added STORM-1873 to Changelog

Posted by ar...@apache.org.
Added STORM-1873 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/81b398d6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/81b398d6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/81b398d6

Branch: refs/heads/1.x-branch
Commit: 81b398d6ba1960cdaed947de8f839534f22fc2c5
Parents: c49b6f7
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Thu Jun 9 11:25:41 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Thu Jun 9 11:25:41 2016 +0530

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/81b398d6/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ca316b2..f0ed69a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-1873: Implement alternative behaviour for late tuples
  * STORM-1719: Introduce REST API: Topology metric stats for stream
  * STORM-1887: Fixed help message for set_log_level command
  * STORM-1878: Flux can now handle IStatefulBolts