You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/25 16:43:22 UTC
[6/6] apex-malhar git commit: APEXMALHAR-2142 #comment Implement
WindowedStream interface
APEXMALHAR-2142 #comment Implement WindowedStream interface
Add Accumulation interface support to High-Level API
Add name support for all the windowed transforms
Flex types in Composite transformation
Add more documents and logs
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/266b0411
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/266b0411
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/266b0411
Branch: refs/heads/master
Commit: 266b04116760dbd4d5cad6b4102b06153ac96a5f
Parents: 17f6c55
Author: Siyuan Hua <hs...@apache.org>
Authored: Tue Jul 12 11:57:09 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Thu Aug 25 09:26:03 2016 -0700
----------------------------------------------------------------------
demos/highlevelapi/pom.xml | 55 ++
demos/highlevelapi/src/assemble/appPackage.xml | 59 ++
.../sample/cookbook/CombinePerKeyExamples.java | 237 ++++++++
.../stream/sample/cookbook/TriggerExample.java | 578 +++++++++++++++++++
demos/pom.xml | 12 +
stream/pom.xml | 1 -
.../apex/malhar/stream/api/ApexStream.java | 144 ++---
.../stream/api/CompositeStreamTransform.java | 30 +
.../apache/apex/malhar/stream/api/Option.java | 122 ++++
.../apex/malhar/stream/api/WindowedStream.java | 150 +++++
.../malhar/stream/api/function/Function.java | 36 +-
.../malhar/stream/api/impl/ApexStreamImpl.java | 194 +++----
.../stream/api/impl/ApexWindowedStreamImpl.java | 275 +++++++++
.../apex/malhar/stream/api/impl/DagMeta.java | 53 +-
.../malhar/stream/api/impl/IDGenerator.java | 3 +
.../malhar/stream/api/impl/StreamFactory.java | 76 ++-
.../stream/api/impl/TupleWrapperOperator.java | 192 ++++++
.../stream/api/impl/accumulation/Count.java | 61 ++
.../stream/api/impl/accumulation/FoldFn.java | 65 +++
.../stream/api/impl/accumulation/ReduceFn.java | 65 +++
.../stream/api/impl/accumulation/TopN.java | 107 ++++
.../api/operator/AnnonymousClassModifier.java | 3 +
.../api/operator/ByteArrayClassLoader.java | 3 +
.../stream/api/operator/FunctionOperator.java | 112 ++--
.../apex/malhar/stream/api/util/KeyedTuple.java | 34 --
.../apex/malhar/stream/api/util/TupleUtil.java | 21 +-
.../FunctionOperator/FunctionOperatorTest.java | 4 +-
.../stream/api/impl/ApexStreamImplTest.java | 6 +-
.../stream/sample/ApplicationWithStreamAPI.java | 21 +-
.../LocalTestWithoutStreamApplication.java | 54 +-
.../apex/malhar/stream/sample/MyStream.java | 3 +-
.../apex/malhar/stream/sample/MyStreamTest.java | 87 ++-
.../malhar/stream/sample/TupleCollector.java | 2 +
.../apex/malhar/stream/sample/WCInput.java | 90 +++
.../stream/sample/WordCountWithStreamAPI.java | 72 +++
stream/src/test/resources/sampletweets.txt | 207 +++++++
36 files changed, 2839 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/highlevelapi/pom.xml
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/pom.xml b/demos/highlevelapi/pom.xml
new file mode 100644
index 0000000..c669681
--- /dev/null
+++ b/demos/highlevelapi/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>high-level-api-demo</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar High-Level API Demo</name>
+ <description>Apex demo applications that use High-level API to construct a dag</description>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-demos</artifactId>
+ <version>3.5.0-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <skipTests>true</skipTests>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>cglib</groupId>
+ <artifactId>cglib</artifactId>
+ <version>3.2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.9.1</version>
+ </dependency>
+ </dependencies>
+
+
+</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/highlevelapi/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/assemble/appPackage.xml b/demos/highlevelapi/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/demos/highlevelapi/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
new file mode 100644
index 0000000..5d4c628
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * An example that reads the public 'Shakespeare' data, and for each word in
+ * the dataset that is over a given length, generates a string containing the
+ * list of play names in which that word appears
+ *
+ * <p>Concepts: the combine transform, which lets you combine the values in a
+ * key-grouped Collection
+ *
+ */
+public class CombinePerKeyExamples
+{
+ // Use the shakespeare public BigQuery sample
+ private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare";
+ // We'll track words >= this word length across all plays in the table.
+ private static final int MIN_WORD_LENGTH = 9;
+
+ /**
+ * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
+ * outputs word, play_name.
+ */
+ static class ExtractLargeWordsFn implements Function.MapFunction<SampleBean, KeyValPair<String, String>>
+ {
+
+ @Override
+ public KeyValPair<String, String> f(SampleBean input)
+ {
+ String playName = input.getCorpus();
+ String word = input.getWord();
+ if (word.length() >= MIN_WORD_LENGTH) {
+ return new KeyValPair<>(word, playName);
+ } else {
+ return null;
+ }
+ }
+ }
+
+
+ /**
+ * Prepares the output data which is in same bean
+ */
+ static class FormatShakespeareOutputFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, String>>, SampleBean>
+ {
+ @Override
+ public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input)
+ {
+ return new SampleBean(input.getValue().getKey(), input.getValue().getValue(), null);
+ }
+ }
+
+ /**
+ * Reads the public 'Shakespeare' data, and for each word in the dataset
+ * over a given length, generates a string containing the list of play names
+ * in which that word appears.
+ */
+ static class PlaysForWord
+ extends CompositeStreamTransform<SampleBean, SampleBean>
+ {
+
+ @Override
+ public ApexStream<SampleBean> compose(ApexStream<SampleBean> inputStream)
+ {
+ // fix this later
+ return inputStream.map(new ExtractLargeWordsFn())
+ .window(new WindowOption.GlobalWindow())
+ .reduceByKey(new ReduceFn<String>()
+ {
+ @Override
+ public String defaultAccumulatedValue()
+ {
+ return "";
+ }
+
+ @Override
+ public String accumulate(String accumulatedValue, String input)
+ {
+ return accumulatedValue + "," + input;
+ }
+
+ @Override
+ public String merge(String accumulatedValue1, String accumulatedValue2)
+ {
+ return accumulatedValue1 + "," + accumulatedValue2;
+ }
+
+ @Override
+ public String getOutput(String accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public String getRetraction(String value)
+ {
+ return value;
+ }
+ }, new Function.MapFunction<KeyValPair<String, String>, Tuple<KeyValPair<String, String>>>()
+
+ {
+ @Override
+ public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input)
+ {
+ return null;
+ }
+ })
+ .map(new FormatShakespeareOutputFn());
+ }
+ }
+
+
+ public static class SampleBean
+ {
+
+ public SampleBean()
+ {
+
+ }
+
+ public SampleBean(String word, String all_plays, String corpus)
+ {
+ this.word = word;
+ this.all_plays = all_plays;
+ this.corpus = corpus;
+ }
+
+ private String word;
+
+ private String all_plays;
+
+ private String corpus;
+
+ public void setWord(String word)
+ {
+ this.word = word;
+ }
+
+ public String getWord()
+ {
+ return word;
+ }
+
+ public void setCorpus(String corpus)
+ {
+ this.corpus = corpus;
+ }
+
+ public String getCorpus()
+ {
+ return corpus;
+ }
+
+ public void setAll_plays(String all_plays)
+ {
+ this.all_plays = all_plays;
+ }
+
+ public String getAll_plays()
+ {
+ return all_plays;
+ }
+ }
+
+ public static class SampleInput implements InputOperator
+ {
+
+ public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort();
+
+ @Override
+ public void emitTuples()
+ {
+
+ }
+
+ @Override
+ public void beginWindow(long l)
+ {
+
+ }
+
+ @Override
+ public void endWindow()
+ {
+
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
+ }
+
+
+ public static void main(String[] args) throws Exception
+ {
+ SampleInput input = new SampleInput();
+ StreamFactory.fromInput(input, input.beanOutput).addCompositeStreams(new PlaysForWord());
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
new file mode 100644
index 0000000..903f624
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.Date;
+import java.util.Objects;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This example illustrates the basic concepts behind triggering. It shows how to use different
+ * trigger definitions to produce partial (speculative) results before all the data is processed and
+ * to control when updated results are produced for late data. The example performs a streaming
+ * analysis of the data coming in from PubSub and writes the results to BigQuery. It divides the
+ * data into {@link Window windows} to be processed, and demonstrates using various kinds of
+ * {@link org.apache.beam.sdk.transforms.windowing.Trigger triggers} to control when the results for
+ * each window are emitted.
+ *
+ * <p> This example uses a portion of real traffic data from San Diego freeways. It contains
+ * readings from sensor stations set up along each freeway. Each sensor reading includes a
+ * calculation of the 'total flow' across all lanes in that freeway direction.
+ *
+ * <p> Concepts:
+ * <pre>
+ * 1. The default triggering behavior
+ * 2. Late data with the default trigger
+ * 3. How to get speculative estimates
+ * 4. Combining late data and speculative estimates
+ * </pre>
+ *
+ * <p> Before running this example, it will be useful to familiarize yourself with Dataflow triggers
+ * and understand the concept of 'late data',
+ * See: <a href="https://cloud.google.com/dataflow/model/triggers">
+ * https://cloud.google.com/dataflow/model/triggers </a> and
+ * <a href="https://cloud.google.com/dataflow/model/windowing#Advanced">
+ * https://cloud.google.com/dataflow/model/windowing#Advanced </a>
+ *
+ * <p> The example pipeline reads data from a Pub/Sub topic. By default, running the example will
+ * also run an auxiliary pipeline to inject data from the default {@code --input} file to the
+ * {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the injected data so that the
+ * example pipeline can operate on <i>event time</i> (rather than arrival time). The auxiliary
+ * pipeline also randomly simulates late data, by setting the timestamps of some of the data
+ * elements to be in the past. You may override the default {@code --input} with the file of your
+ * choosing or set {@code --input=""} which will disable the automatic Pub/Sub injection, and allow
+ * you to use a separate tool to publish to the given topic.
+ *
+ * <p> The example is configured to use the default Pub/Sub topic and the default BigQuery table
+ * from the example common package (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@code --pubsubTopic}, {@code --bigQueryDataset}, and
+ * {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
+ * the example will try to create them.
+ *
+ * <p> The pipeline outputs its results to a BigQuery table.
+ * Here are some queries you can use to see interesting results:
+ * Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table.
+ * Replace {@code <enter_window_interval>} in the query below with the window interval.
+ *
+ * <p> To see the results of the default trigger,
+ * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after
+ * the window duration, until the first pane of non-late data has been emitted, to see more
+ * interesting results.
+ * {@code SELECT * FROM enter_table_name WHERE trigger_type = "default" ORDER BY window DESC}
+ *
+ * <p> To see the late data i.e. dropped by the default trigger,
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "withAllowedLateness" and
+ * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time}
+ *
+ * <p>To see the the difference between accumulation mode and discarding mode,
+ * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND
+ * (trigger_type = "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY
+ * window DESC, processing_time}
+ *
+ * <p> To see speculative results every minute,
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "speculative" and freeway = "5"
+ * ORDER BY window DESC, processing_time}
+ *
+ * <p> To see speculative results every five minutes after the end of the window
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY"
+ * and freeway = "5" ORDER BY window DESC, processing_time}
+ *
+ * <p> To see the first and the last pane for a freeway in a window for all the trigger types,
+ * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window}
+ *
+ * <p> To reduce the number of results for each query we can add additional where clauses.
+ * For examples, To see the results of the default trigger,
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "default" AND freeway = "5" AND
+ * window = "<enter_window_interval>"}
+ *
+ * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
+ */
+
+public class TriggerExample
+{
+ //Numeric value of fixed window duration, in minutes
+ public static final int WINDOW_DURATION = 30;
+ // Constants used in triggers.
+ // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
+ // ONE_MINUTE is used only with processing time before the end of the window
+ public static final Duration ONE_MINUTE = Duration.standardMinutes(1);
+ // FIVE_MINUTES is used only with processing time after the end of the window
+ public static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
+ // ONE_DAY is used to specify the amount of lateness allowed for the data elements.
+ public static final Duration ONE_DAY = Duration.standardDays(1);
+
+ /**
+ * This transform demonstrates using triggers to control when data is produced for each window
+ * Consider an example to understand the results generated by each type of trigger.
+ * The example uses "freeway" as the key. Event time is the timestamp associated with the data
+ * element and processing time is the time when the data element gets processed in the pipeline.
+ * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window.
+ * Key (freeway) | Value (total_flow) | event time | processing time
+ * 5 | 50 | 10:00:03 | 10:00:47
+ * 5 | 30 | 10:01:00 | 10:01:03
+ * 5 | 30 | 10:02:00 | 11:07:00
+ * 5 | 20 | 10:04:10 | 10:05:15
+ * 5 | 60 | 10:05:00 | 11:03:00
+ * 5 | 20 | 10:05:01 | 11.07:30
+ * 5 | 60 | 10:15:00 | 10:27:15
+ * 5 | 40 | 10:26:40 | 10:26:43
+ * 5 | 60 | 10:27:20 | 10:27:25
+ * 5 | 60 | 10:29:00 | 11:11:00
+ *
+ * <p> Dataflow tracks a watermark which records up to what point in event time the data is
+ * complete. For the purposes of the example, we'll assume the watermark is approximately 15m
+ * behind the current processing time. In practice, the actual value would vary over time based
+ * on the systems knowledge of the current PubSub delay and contents of the backlog (data
+ * that has not yet been processed).
+ *
+ * <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would
+ * close at 10:44:59, when the watermark passes 10:30:00.
+ */
+ static class CalculateTotalFlow
+ extends CompositeStreamTransform<String, SampleBean>
+ {
+ private int windowDuration;
+
+ CalculateTotalFlow(int windowDuration)
+ {
+ this.windowDuration = windowDuration;
+ }
+
+ @Override
+ public ApexStream<SampleBean> compose(ApexStream<String> inputStream)
+ {
+ // Concept #1: The default triggering behavior
+ // By default Dataflow uses a trigger which fires when the watermark has passed the end of the
+ // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+
+ // The system also defaults to dropping late data -- data which arrives after the watermark
+ // has passed the event timestamp of the arriving element. This means that the default trigger
+ // will only fire once.
+
+ // Each pane produced by the default trigger with no allowed lateness will be the first and
+ // last pane in the window, and will be ON_TIME.
+
+ // The results for the example above with the default trigger and zero allowed lateness
+ // would be:
+ // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // 5 | 260 | 6 | true | true | ON_TIME
+
+ // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
+ // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
+ // late, and dropped.
+
+ ApexStream<SampleBean> defaultTriggerResults = inputStream
+ .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+ new TriggerOption().discardingFiredPanes())
+ .addCompositeStreams(new TotalFlow("default"));
+
+ // Concept #2: Late data with the default trigger
+ // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This
+ // leads to each window staying open for ONE_DAY after the watermark has passed the end of the
+ // window. Any late data will result in an additional pane being fired for that same window.
+
+ // The first pane produced will be ON_TIME and the remaining panes will be LATE.
+ // To definitely get the last pane when the window closes, use
+ // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS).
+
+ // The results for the example above with the default trigger and ONE_DAY allowed lateness
+ // would be:
+ // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // 5 | 260 | 6 | true | false | ON_TIME
+ // 5 | 60 | 1 | false | false | LATE
+ // 5 | 30 | 1 | false | false | LATE
+ // 5 | 20 | 1 | false | false | LATE
+ // 5 | 60 | 1 | false | false | LATE
+ ApexStream<SampleBean> withAllowedLatenessResults = inputStream
+ .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+ new TriggerOption().discardingFiredPanes(),
+ Duration.standardDays(1))
+ .addCompositeStreams(new TotalFlow("withAllowedLateness"));
+
+ // Concept #3: How to get speculative estimates
+ // We can specify a trigger that fires independent of the watermark, for instance after
+ // ONE_MINUTE of processing time. This allows us to produce speculative estimates before
+ // all the data is available. Since we don't have any triggers that depend on the watermark
+ // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE.
+
+ // We also use accumulatingFiredPanes to build up the results across each pane firing.
+
+ // The results for the example above for this trigger would be:
+ // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // 5 | 80 | 2 | true | false | EARLY
+ // 5 | 100 | 3 | false | false | EARLY
+ // 5 | 260 | 6 | false | false | EARLY
+ // 5 | 320 | 7 | false | false | LATE
+ // 5 | 370 | 9 | false | false | LATE
+ // 5 | 430 | 10 | false | false | LATE
+
+ ApexStream<SampleBean> speculativeResults = inputStream
+ .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+ //Trigger fires every minute
+ new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
+ // After emitting each pane, it will continue accumulating the elements so that each
+ // approximation includes all of the previous data in addition to the newly arrived
+ // data.
+ .accumulatingFiredPanes(),
+ Duration.standardDays(1))
+ .addCompositeStreams(new TotalFlow("speculative"));
+
+ // Concept #4: Combining late data and speculative estimates
+ // We can put the previous concepts together to get EARLY estimates, an ON_TIME result,
+ // and LATE updates based on late data.
+
+ // Each time a triggering condition is satisfied it advances to the next trigger.
+ // If there are new elements this trigger emits a window under following condition:
+ // > Early approximations every minute till the end of the window.
+ // > An on-time firing when the watermark has passed the end of the window
+ // > Every five minutes of late data.
+
+ // Every pane produced will either be EARLY, ON_TIME or LATE.
+
+ // The results for the example above for this trigger would be:
+ // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // 5 | 80 | 2 | true | false | EARLY
+ // 5 | 100 | 3 | false | false | EARLY
+ // 5 | 260 | 6 | false | false | EARLY
+ // [First pane fired after the end of the window]
+ // 5 | 320 | 7 | false | false | ON_TIME
+ // 5 | 430 | 10 | false | false | LATE
+
+ // For more possibilities of how to build advanced triggers, see {@link Trigger}.
+ ApexStream<SampleBean> sequentialResults = inputStream
+ .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+ // Speculative every ONE_MINUTE
+ new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
+ .withLateFiringsAtEvery(Duration.standardMinutes(5))
+ // After emitting each pane, it will continue accumulating the elements so that each
+ // approximation includes all of the previous data in addition to the newly arrived
+ // data.
+ .accumulatingFiredPanes(),
+ Duration.standardDays(1))
+ .addCompositeStreams(new TotalFlow("sequential"));
+
+ return sequentialResults;
+ }
+
+ }
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // The remaining parts of the pipeline are needed to produce the output for each
+ // concept above. Not directly relevant to understanding the trigger examples.
+
+ /**
+ * Calculate total flow and number of records for each freeway and format the results to TableRow
+ * objects, to save to BigQuery.
+ */
+ static class TotalFlow extends
+ CompositeStreamTransform<String, SampleBean>
+ {
+ private String triggerType;
+
+ public TotalFlow(String triggerType)
+ {
+ this.triggerType = triggerType;
+ }
+
+ @Override
+ public ApexStream<SampleBean> compose(ApexStream<String> inputStream)
+ {
+ if (!(inputStream instanceof WindowedStream)) {
+ throw new RuntimeException("Not supported here");
+ }
+ WindowedStream<String> windowedStream = (WindowedStream<String>)inputStream;
+ ApexStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = windowedStream
+ .groupByKey(new ExtractFlowInfo());
+
+ return flowPerFreeway
+ .map(new Function.MapFunction<KeyValPair<String, Iterable<Integer>>, KeyValPair<String, String>>()
+ {
+ @Override
+ public KeyValPair<String, String> f(KeyValPair<String, Iterable<Integer>> input)
+ {
+ Iterable<Integer> flows = input.getValue();
+ Integer sum = 0;
+ Long numberOfRecords = 0L;
+ for (Integer value : flows) {
+ sum += value;
+ numberOfRecords++;
+ }
+ return new KeyValPair<>(input.getKey(), sum + "," + numberOfRecords);
+ }
+ })
+ .map(new FormatTotalFlow(triggerType));
+ }
+ }
+
+ /**
+ * Format the results of the Total flow calculation to a TableRow, to save to BigQuery.
+ * Adds the triggerType, pane information, processing time and the window timestamp.
+ */
+ static class FormatTotalFlow implements Function.MapFunction<KeyValPair<String, String>, SampleBean>
+ {
+ private String triggerType;
+
+ public FormatTotalFlow(String triggerType)
+ {
+ this.triggerType = triggerType;
+ }
+
+ @Override
+ public SampleBean f(KeyValPair<String, String> input)
+ {
+ String[] values = input.getValue().split(",");
+ //TODO need to have a callback to get the metadata like window id, pane id, timestamps etc.
+ return new SampleBean(triggerType, input.getKey(), Integer.parseInt(values[0]), Long
+ .parseLong(values[1]), null, false, false, null, null, new Date());
+ }
+ }
+
+ public static class SampleBean
+ {
+ public SampleBean()
+ {
+ }
+
+ private String trigger_type;
+
+ private String freeway;
+
+ private int total_flow;
+
+ private long number_of_records;
+
+ private String window;
+
+ private boolean isFirst;
+
+ private boolean isLast;
+
+ private Date timing;
+
+ private Date event_time;
+
+ private Date processing_time;
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SampleBean that = (SampleBean)o;
+ return total_flow == that.total_flow &&
+ number_of_records == that.number_of_records &&
+ isFirst == that.isFirst &&
+ isLast == that.isLast &&
+ Objects.equals(trigger_type, that.trigger_type) &&
+ Objects.equals(freeway, that.freeway) &&
+ Objects.equals(window, that.window) &&
+ Objects.equals(timing, that.timing) &&
+ Objects.equals(event_time, that.event_time) &&
+ Objects.equals(processing_time, that.processing_time);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects
+ .hash(trigger_type, freeway, total_flow, number_of_records, window, isFirst, isLast, timing, event_time,
+ processing_time);
+ }
+
+ public SampleBean(String trigger_type, String freeway, int total_flow, long number_of_records, String window,
+ boolean isFirst, boolean isLast, Date timing, Date event_time, Date processing_time)
+ {
+
+ this.trigger_type = trigger_type;
+ this.freeway = freeway;
+ this.total_flow = total_flow;
+ this.number_of_records = number_of_records;
+ this.window = window;
+ this.isFirst = isFirst;
+ this.isLast = isLast;
+ this.timing = timing;
+ this.event_time = event_time;
+ this.processing_time = processing_time;
+ }
+
+ public String getTrigger_type()
+ {
+ return trigger_type;
+ }
+
+ public void setTrigger_type(String trigger_type)
+ {
+ this.trigger_type = trigger_type;
+ }
+
+ public String getFreeway()
+ {
+ return freeway;
+ }
+
+ public void setFreeway(String freeway)
+ {
+ this.freeway = freeway;
+ }
+
+ public int getTotal_flow()
+ {
+ return total_flow;
+ }
+
+ public void setTotal_flow(int total_flow)
+ {
+ this.total_flow = total_flow;
+ }
+
+ public long getNumber_of_records()
+ {
+ return number_of_records;
+ }
+
+ public void setNumber_of_records(long number_of_records)
+ {
+ this.number_of_records = number_of_records;
+ }
+
+ public String getWindow()
+ {
+ return window;
+ }
+
+ public void setWindow(String window)
+ {
+ this.window = window;
+ }
+
+ public boolean isFirst()
+ {
+ return isFirst;
+ }
+
+ public void setFirst(boolean first)
+ {
+ isFirst = first;
+ }
+
+ public boolean isLast()
+ {
+ return isLast;
+ }
+
+ public void setLast(boolean last)
+ {
+ isLast = last;
+ }
+
+ public Date getTiming()
+ {
+ return timing;
+ }
+
+ public void setTiming(Date timing)
+ {
+ this.timing = timing;
+ }
+
+ public Date getEvent_time()
+ {
+ return event_time;
+ }
+
+ public void setEvent_time(Date event_time)
+ {
+ this.event_time = event_time;
+ }
+
+ public Date getProcessing_time()
+ {
+ return processing_time;
+ }
+
+ public void setProcessing_time(Date processing_time)
+ {
+ this.processing_time = processing_time;
+ }
+ }
+
+ /**
+ * Extract the freeway and total flow in a reading.
+ * Freeway is used as key since we are calculating the total flow for each freeway.
+ */
+ static class ExtractFlowInfo implements Function.MapFunction<String, KeyValPair<String, Integer>>
+ {
+ @Override
+ public KeyValPair<String, Integer> f(String input)
+ {
+ String[] laneInfo = input.split(",");
+ if (laneInfo[0].equals("timestamp")) {
+ // Header row
+ return null;
+ }
+ if (laneInfo.length < 48) {
+ //Skip the invalid input.
+ return null;
+ }
+ String freeway = laneInfo[2];
+ Integer totalFlow = tryIntegerParse(laneInfo[7]);
+ // Ignore the records with total flow 0 to easily understand the working of triggers.
+ // Skip the records with total flow -1 since they are invalid input.
+ if (totalFlow == null || totalFlow <= 0) {
+ return null;
+ }
+ return new KeyValPair<>(freeway, totalFlow);
+ }
+ }
+
+ private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
+
+ public static void main(String[] args) throws Exception
+ {
+ StreamFactory.fromFolder("some folder")
+ .addCompositeStreams(new CalculateTotalFlow(60));
+
+ }
+
+ private static Integer tryIntegerParse(String number)
+ {
+ try {
+ return Integer.parseInt(number);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 12f0f14..e9f2daf 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -192,6 +192,7 @@
<module>r</module>
<module>echoserver</module>
<module>iteration</module>
+ <module>highlevelapi</module>
</modules>
<dependencies>
@@ -231,6 +232,17 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-stream</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/pom.xml
----------------------------------------------------------------------
diff --git a/stream/pom.xml b/stream/pom.xml
index fd663e0..445be92 100755
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -95,6 +95,5 @@
<version>3.2.1</version>
</dependency>
-
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
index 2f65ba9..6d44534 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
@@ -18,11 +18,14 @@
*/
package org.apache.apex.malhar.stream.api;
-
-import java.util.Map;
import java.util.concurrent.Callable;
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.hadoop.classification.InterfaceStability;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context.DAGContext;
@@ -37,6 +40,7 @@ import com.datatorrent.api.Operator;
*
* @since 3.4.0
*/
+@InterfaceStability.Evolving
public interface ApexStream<T>
{
/**
@@ -46,17 +50,7 @@ public interface ApexStream<T>
* @param <O> Type of the output
* @return new stream of type O
*/
- <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mapFunction);
-
- /**
- * Simple map transformation<br>
- * Add an operator to the DAG which convert tuple T to tuple O
- * @param name operator name
- * @param mapFunction map function
- * @param <O> Type of the output
- * @return new stream of type O
- */
- <O, STREAM extends ApexStream<O>> STREAM map(String name, Function.MapFunction<T, O> mapFunction);
+ <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mapFunction, Option... opts);
/**
* Flat map transformation
@@ -65,17 +59,7 @@ public interface ApexStream<T>
* @param <O> Type of the output
* @return new stream of type O
*/
- <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatten);
-
- /**
- * Flat map transformation<br>
- * Add an operator to the DAG which convert tuple T to a collection of tuple O
- * @param name operator name
- * @param flatten
- * @param <O> Type of the output
- * @return new stream of type O
- */
- <O, STREAM extends ApexStream<O>> STREAM flatMap(String name, Function.FlatMapFunction<T, O> flatten);
+ <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatten, Option... opts);
/**
* Filter transformation<br>
@@ -83,76 +67,7 @@ public interface ApexStream<T>
* @param filter filter function
* @return new stream of same type
*/
- <STREAM extends ApexStream<T>> STREAM filter(Function.FilterFunction<T> filter);
-
- /**
- * Filter transformation<br>
- * Add an operator to the DAG which filter out tuple T that cannot satisfy the FilterFunction
- * @param name operator name
- * @param filter filter function
- * @return new stream of same type
- */
- <STREAM extends ApexStream<T>> STREAM filter(String name, Function.FilterFunction<T> filter);
-
- /**
- * Reduce transformation<br>
- * Add an operator to the DAG which merge tuple t1, t2 to new tuple
- * @param reduce reduce function
- * @return new stream of same type
- */
- <STREAM extends ApexStream<T>> STREAM reduce(Function.ReduceFunction<T> reduce);
-
- /**
- * Reduce transformation<br>
- * Add an operator to the DAG which merge tuple t1, t2 to new tuple
- * @param name operator name
- * @param reduce reduce function
- * @return new stream of same type
- */
- <STREAM extends ApexStream<T>> STREAM reduce(String name, Function.ReduceFunction<T> reduce);
-
- /**
- * Fold transformation<br>
- * Add an operator to the DAG which merge tuple T to accumulated result tuple O
- * @param initialValue initial result value
- * @param fold fold function
- * @param <O> Result type
- * @return new stream of type O
- */
- <O, STREAM extends ApexStream<O>> STREAM fold(O initialValue, Function.FoldFunction<T, O> fold);
-
- /**
- * Fold transformation<br>
- * Add an operator to the DAG which merge tuple T to accumulated result tuple O
- * @param name name of the operator
- * @param initialValue initial result value
- * @param fold fold function
- * @param <O> Result type
- * @return new stream of type O
- */
- <O, STREAM extends ApexStream<O>> STREAM fold(String name, O initialValue, Function.FoldFunction<T, O> fold);
-
- /**
- * Count of all tuples
- * @return new stream of Integer
- */
- <STREAM extends ApexStream<Integer>> STREAM count();
-
- /**
- * Count tuples by the key<br>
- * If the input is KeyedTuple it will get the key from getKey method from the tuple<br>
- * If not, use the tuple itself as a key
- * @return new stream of Map
- */
- <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey();
-
- /**
- *
- * Count tuples by the indexed key
- * @param key the index of the field in the tuple that are used as key
- * @return new stream of Map
- */
- <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey(int key);
+ <STREAM extends ApexStream<T>> STREAM filter(Function.FilterFunction<T> filter, Option... opts);
/**
* Extend the dag by adding one operator<br>
@@ -162,18 +77,23 @@ public interface ApexStream<T>
* @param <O> type of the output
* @return new stream of type O
*/
- <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort);
+ <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort, Option... opts);
/**
- * Extend the dag by adding one {@see Operator}
- * @param opName Operator name
+ * Extend the dag by adding one end operator<br>
* @param op Operator added to the stream
* @param inputPort InputPort of the operator that is connected to last exposed OutputPort in the stream
- * @param outputPort OutputPort of the operator will be connected to next operator
* @param <O> type of the output
* @return new stream of type O
*/
- <O, STREAM extends ApexStream<O>> STREAM addOperator(String opName, Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort);
+ <O, STREAM extends ApexStream<O>> STREAM endWith(Operator op, Operator.InputPort<T> inputPort, Option... opts);
+
+ /**
+ * Extend the dag by adding one {@see CompositeStreamTransform}
+ * @param compositeStreamTransform Composite Streams and Transforms
+ * @return new stream of type O
+ */
+ <O, INSTREAM extends ApexStream<T>, OUTSTREAM extends ApexStream<O>> OUTSTREAM addCompositeStreams(CompositeStreamTransform<INSTREAM, OUTSTREAM> compositeStreamTransform);
/**
* Union multiple stream into one
@@ -260,4 +180,30 @@ public interface ApexStream<T>
*/
void run();
+ /**
+ * Chunk tuples into Windows
+ * Window Transform are defined in {@see WindowedStream}
+ * @param windowOption
+ * @return
+ */
+ WindowedStream<T> window(WindowOption windowOption);
+
+ /**
+ * Chunk tuple into windows with window option and trigger option
+ * @param windowOption
+ * @param triggerOption
+ * @return
+ */
+ WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption);
+
+ /**
+ *
+ * Chunk tuple into windows with window option and trigger option and allowed lateness
+ * @param windowOption
+ * @param triggerOption
+ * @param allowLateness
+ * @return
+ */
+ WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption, Duration allowLateness);
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/CompositeStreamTransform.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/CompositeStreamTransform.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/CompositeStreamTransform.java
new file mode 100644
index 0000000..979f44f
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/CompositeStreamTransform.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A group of Streams and transforms in between
+ */
+@InterfaceStability.Evolving
+public abstract class CompositeStreamTransform<INSTREAM extends ApexStream, OUTSTREAM extends ApexStream>
+{
+ public abstract OUTSTREAM compose(INSTREAM inputStream);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java
new file mode 100644
index 0000000..1b8935f
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Attribute;
+
+/**
+ * Options for the operators in the dag
+ */
+@InterfaceStability.Evolving
+public interface Option
+{
+
+ class Options
+ {
+ public static Option name(String name)
+ {
+ return new OpName(name);
+ }
+
+ public static Option prop(String name, Object value)
+ {
+ return new PropSetting(name, value);
+ }
+
+ public static <T> Option attr(Attribute<T> attr, T obj)
+ {
+ return new AttributeSetting<>(attr, obj);
+ }
+ }
+
+ /**
+ * An option used to set the name of the operator
+ */
+ class OpName implements Option
+ {
+
+ private String name;
+
+ public OpName(String name)
+ {
+ this.name = name;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+ }
+
+ /**
+ * An option used to set the property value of the operator
+ */
+ class PropSetting implements Option
+ {
+
+ private String name;
+
+ private Object val;
+
+ public PropSetting(String name, Object val)
+ {
+ this.name = name;
+ this.val = val;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public Object getVal()
+ {
+ return val;
+ }
+ }
+
+ /**
+ * An option used to set the {@link Attribute}
+ * @param <T>
+ */
+ class AttributeSetting<T> implements Option
+ {
+ private Attribute<T> attr;
+
+ private T value;
+
+ public AttributeSetting(Attribute<T> attr, T value)
+ {
+ this.attr = attr;
+ this.value = value;
+ }
+
+ public Attribute<T> getAttr()
+ {
+ return attr;
+ }
+
+ public T getValue()
+ {
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
index 748a76a..bc99035 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
@@ -18,6 +18,22 @@
*/
package org.apache.apex.malhar.stream.api;
+import java.util.List;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
+import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.accumulation.FoldFn;
+import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.util.KeyValPair;
+
/**
* <p>
* A stream with windowed transformation
@@ -51,6 +67,140 @@ package org.apache.apex.malhar.stream.api;
*
* @since 3.4.0
*/
+@InterfaceStability.Evolving
public interface WindowedStream<T> extends ApexStream<T>
{
+
+ /**
+ * Count of all tuples
+ * @return new stream of Integer
+ */
+ <STREAM extends WindowedStream<Tuple.WindowedTuple<Long>>> STREAM count(Option... opts);
+
+ /**
+ * Count tuples by the key<br>
+ * @param name name of the operator
+ * @param convertToKeyValue The function convert plain tuple to k,v pair
+ * @return new stream of Key Value Pair
+ */
+ <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> convertToKeyValue, Option... opts);
+
+ /**
+ * Return top N tuples by the selected key
+ * @param N how many tuples you want to keep
+ * @param name name of the operator
+ * @param convertToKeyVal The function convert plain tuple to k,v pair
+ * @return new stream of Key and top N tuple of the key
+ */
+ <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, List<V>>>>> STREAM topByKey(int N, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts);
+
+ /**
+ * Return top N tuples of all tuples in the window
+ * @param N
+ * @param name name of the operator
+ * @return new stream of topN
+ */
+ <STREAM extends WindowedStream<Tuple.WindowedTuple<List<T>>>> STREAM top(int N, Option... opts);
+
+ /**
+ * Add {@link KeyedWindowedOperatorImpl} with specified {@link Accumulation} <br>
+ * Accumulate tuples by some key within the window definition in this stream
+ * Also give a name to the accumulation
+ * @param accumulation Accumulation function you want to do
+ * @param convertToKeyVal The function convert plain tuple to k,v pair
+ * @param <K> The type of the key used to group tuples
+ * @param <V> The type of value you want to do accumulation on
+ * @param <O> The output type for each given key that you want to accumulate the value to
+ * @param <ACCU> The type of accumulation you want to keep (it can be in memory or on disk)
+ * @param <STREAM> return type
+ * @return
+ */
+ <K, V, O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM accumulateByKey(Accumulation<V, ACCU, O> accumulation,
+ Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts);
+
+ /**
+ * Add {@link WindowedOperatorImpl} with specified {@link Accumulation} <br>
+ * Accumulate tuples by some key within the window definition in this stream
+ * Also give a name to the accumulation
+ * @param accumulation Accumulation function you want to do
+ * @param <O> The output type that you want to accumulate the value to
+ * @param <ACCU> The type of accumulation you want to keep (it can be in memory or on disk)
+ * @param <STREAM> return type
+ * @return
+ */
+ <O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM accumulate(Accumulation<T, ACCU, O> accumulation, Option... opts);
+
+ /**
+ * Add {@link WindowedOperatorImpl} with specified {@link ReduceFn} <br>
+ * Do reduce transformation<br>
+ * @param reduce reduce function
+ * @param <STREAM> return type
+ * @return new stream of same type
+ */
+ <STREAM extends WindowedStream<Tuple.WindowedTuple<T>>> STREAM reduce(ReduceFn<T> reduce, Option... opts);
+
+ /**
+ * Add {@link KeyedWindowedOperatorImpl} with specified {@link ReduceFn} <br>
+ * Reduce transformation by selected key <br>
+ * Add an operator to the DAG which merge tuple t1, t2 to new tuple by key
+ * @param reduce reduce function
+ * @param convertToKeyVal The function convert plain tuple to k,v pair
+ * @param <K> The type of key you want to group tuples by
+ * @param <V> The type of value extract from tuple T
+ * @param <STREAM> return type
+ * @return new stream of key value pair
+ */
+ <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, V>>>> STREAM reduceByKey(ReduceFn<V> reduce, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts);
+
+
+ /**
+ * Add {@link WindowedOperatorImpl} with specified {@link FoldFn} <br>
+ * Fold transformation <br>
+ * @param fold fold function
+ * @param <O> output type of fold function
+ * @param <STREAM> return type
+ * @return
+ */
+ <O, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM fold(FoldFn<T, O> fold, Option... opts);
+
+ /**
+ * Add {@link KeyedWindowedOperatorImpl} with specified {@link FoldFn} <br>
+ * Fold transformation by key <br>
+ * @param fold fold function
+ * @param <O> Result type
+ * @return new stream of type O
+ */
+ <K, V, O, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM foldByKey(FoldFn<V, O> fold, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts);
+
+
+ /**
+ * Return tuples for each key for each window
+ * @param <O>
+ * @param <K>
+ * @param <STREAM>
+ * @return
+ */
+ <O, K, STREAM extends WindowedStream<KeyValPair<K, Iterable<O>>>> STREAM groupByKey(Function.ToKeyValue<T, K, O> convertToKeyVal, Option... opts);
+
+ /**
+ * Return tuples for each window
+ * @param <STREAM>
+ * @return
+ */
+ <STREAM extends WindowedStream<Iterable<T>>> STREAM group();
+
+ /**
+ * Reset the trigger settings for next transforms
+ * @param triggerOption
+ * @param <STREAM>
+ */
+ <STREAM extends WindowedStream<T>> STREAM resetTrigger(TriggerOption triggerOption);
+
+ /**
+ * Reset the allowedLateness settings for next transforms
+ * @param allowedLateness
+ * @param <STREAM>
+ */
+ <STREAM extends WindowedStream<T>> STREAM resetAllowedLateness(Duration allowedLateness);
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
index f4e5e60..d516064 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
@@ -18,11 +18,24 @@
*/
package org.apache.apex.malhar.stream.api.function;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.util.KeyValPair;
+
/**
- * The top level function interface
+ * The top level function interface <br>
+ * The function is wrapped by {@link FunctionOperator} <br>
+ * It takes input from input port of {@link FunctionOperator} ex. {@link FunctionOperator.MapFunctionOperator#input} <br>
+ * And the output will be emitted using {@link FunctionOperator#tupleOutput} <br>
+ * Anonymous function is not fully supported. It must be <b>stateless</b> should not be defined in any static context<br>
+ * If anonymous function does not working, you can should use top level function class<br>
+ * Top level function class should have public non-arg constructor
*
* @since 3.4.0
*/
+@InterfaceStability.Evolving
public interface Function
{
/**
@@ -45,26 +58,18 @@ public interface Function
}
/**
- * An interface defines a reduce transformation
+ * A special map function to convert any pojo to key value pair datastructure
* @param <T>
+ * @param <K>
+ * @param <V>
*/
- public static interface ReduceFunction<T> extends Function
+ public static interface ToKeyValue<T, K, V> extends MapFunction<T, Tuple<KeyValPair<K, V>>>
{
- T reduce(T t1, T t2);
- }
- /**
- * An interface that defines a fold transformation
- * @param <I>
- * @param <O>
- */
- public static interface FoldFunction<I, O> extends Function
- {
- O fold(I input, O output);
}
/**
- * An interface that defines flatmap transforation
+ * An interface that defines flatmap transformation
* @param <I>
* @param <O>
*/
@@ -76,7 +81,8 @@ public interface Function
* An interface that defines filter transformation
* @param <T>
*/
- public static interface FilterFunction<T> extends MapFunction<T, Boolean>
+ public static interface FilterFunction<T> extends Function
{
+ boolean f(T input);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
index 2ff6d51..032cb03 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
@@ -27,28 +27,39 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.WindowOption;
+
import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.Option;
+import org.apache.apex.malhar.stream.api.WindowedStream;
import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.function.Function.FlatMapFunction;
import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
-import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
/**
- * Default stream implementation for ApexStream interface.
- * It creates the dag(execution plan) from stream api
+ * Default stream implementation for ApexStream interface. <br>
+ * It creates the dag(execution plan) from stream api <br>
+ * The dag won't be constructed until {@link #populateDag(DAG)} is called
*
* @since 3.4.0
*/
+@InterfaceStability.Evolving
public class ApexStreamImpl<T> implements ApexStream<T>
{
@@ -135,18 +146,17 @@ public class ApexStreamImpl<T> implements ApexStream<T>
}
}
+
/**
* Graph behind the stream
*/
- private DagMeta graph;
-
- private ApexStream<T> delegator;
+ protected DagMeta graph;
/**
- * Right now the stream only support single extend point
- * You can have multiple downstream operators connect to this single extend point though
+ * Right now the stream only support single extension point
+ * You can have multiple downstream operators connect to this single extension point though
*/
- private Brick<T> lastBrick;
+ protected Brick<T> lastBrick;
public Brick<T> getLastBrick()
{
@@ -163,13 +173,11 @@ public class ApexStreamImpl<T> implements ApexStream<T>
graph = new DagMeta();
}
- public ApexStreamImpl(ApexStream<T> apexStream)
+ public ApexStreamImpl(ApexStreamImpl<T> apexStream)
{
- this.delegator = apexStream;
- if (delegator != null && delegator instanceof ApexStreamImpl) {
- graph = ((ApexStreamImpl)delegator).graph;
- lastBrick = ((ApexStreamImpl<T>)delegator).lastBrick;
- }
+ //copy the variables over to the new ApexStreamImpl
+ graph = apexStream.graph;
+ lastBrick = apexStream.lastBrick;
}
public ApexStreamImpl(DagMeta graph)
@@ -184,128 +192,52 @@ public class ApexStreamImpl<T> implements ApexStream<T>
}
@Override
- public <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mf)
- {
- return map(mf.toString(), mf);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <O, STREAM extends ApexStream<O>> STREAM map(String name, Function.MapFunction<T, O> mf)
+ public <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mf, Option... opts)
{
FunctionOperator.MapFunctionOperator<T, O> opt = new FunctionOperator.MapFunctionOperator<>(mf);
- return (STREAM)addOperator(name, opt, opt.input, opt.output);
+ return addOperator(opt, opt.input, opt.output, opts);
}
- @Override
- public <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatten)
- {
- return flatMap(flatten.toString(), flatten);
- }
@Override
- @SuppressWarnings("unchecked")
- public <O, STREAM extends ApexStream<O>> STREAM flatMap(String name, Function.FlatMapFunction<T, O> flatten)
+ public <O, STREAM extends ApexStream<O>> STREAM flatMap(FlatMapFunction<T, O> flatten, Option... opts)
{
FunctionOperator.FlatMapFunctionOperator<T, O> opt = new FunctionOperator.FlatMapFunctionOperator<>(flatten);
- return (STREAM)addOperator(name, opt, opt.input, opt.output);
- }
-
- @Override
- public <STREAM extends ApexStream<T>> STREAM filter(final Function.FilterFunction<T> filter)
- {
- return filter(filter.toString(), filter);
+ return addOperator(opt, opt.input, opt.output, opts);
}
@Override
@SuppressWarnings("unchecked")
- public <STREAM extends ApexStream<T>> STREAM filter(String name, final Function.FilterFunction<T> filter)
+ public <STREAM extends ApexStream<T>> STREAM filter(final Function.FilterFunction<T> filter, Option... opts)
{
FunctionOperator.FilterFunctionOperator<T> filterFunctionOperator = new FunctionOperator.FilterFunctionOperator<>(filter);
- return (STREAM)addOperator(name, filterFunctionOperator, filterFunctionOperator.input, filterFunctionOperator.output);
- }
-
- @Override
- public <STREAM extends ApexStream<T>> STREAM reduce(Function.ReduceFunction<T> reduce)
- {
- return reduce(reduce.toString(), reduce);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <STREAM extends ApexStream<T>> STREAM reduce(String name, Function.ReduceFunction<T> reduce)
- {
- FunctionOperator.ReduceFunctionOperator<T> opt = new FunctionOperator.ReduceFunctionOperator<>(reduce);
- return (STREAM)addOperator(name, opt, opt.input, opt.output);
- }
-
- @Override
- public <O, STREAM extends ApexStream<O>> STREAM fold(final O initialValue, Function.FoldFunction<T, O> fold)
- {
- return fold(fold.toString(), initialValue, fold);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <O, STREAM extends ApexStream<O>> STREAM fold(String name, O initialValue, Function.FoldFunction<T, O> fold)
- {
- FunctionOperator.FoldFunctionOperator<T, O> opt = new FunctionOperator.FoldFunctionOperator<>(fold, initialValue);
- return (STREAM)addOperator(name, opt, opt.input, opt.output);
- }
-
- @Override
- public <STREAM extends ApexStream<Integer>> STREAM count()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey(int key)
- {
- throw new UnsupportedOperationException();
+ return addOperator(filterFunctionOperator, filterFunctionOperator.input, filterFunctionOperator.output, opts);
}
- @Override
- @SuppressWarnings("unchecked")
- public <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey()
+ public <STREAM extends ApexStream<Map.Entry<Object, Integer>>> STREAM countByElement()
{
- // Needs to change the unique counter to support keys
- UniqueCounter<Object> uniqueCounter = new UniqueCounter<>();
- uniqueCounter.setCumulative(true);
- Operator.OutputPort<? extends Map<Object, Integer>> resultPort = uniqueCounter.count;
- return (STREAM)addOperator("CounterByKey", uniqueCounter, (Operator.InputPort<T>)uniqueCounter.data, resultPort);
+ return null;
}
@Override
- public <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort)
+ public <O, STREAM extends ApexStream<O>> STREAM endWith(Operator op, Operator.InputPort<T> inputPort, Option... opts)
{
- return addOperator(op.toString(), op, inputPort, outputPort);
+ return (STREAM)addOperator(op, inputPort, null, opts);
}
-
@Override
@SuppressWarnings("unchecked")
- public <O, STREAM extends ApexStream<O>> STREAM addOperator(String opName, Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort)
+ public <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort, Option... opts)
{
- if (delegator != null) {
- ApexStreamImpl<O> apexStream = delegator.addOperator(opName, op, inputPort, outputPort);
- try {
- return (STREAM)this.getClass().getConstructor(ApexStream.class).newInstance(apexStream);
- } catch (Exception e) {
- throw new RuntimeException("You have to override the default constructor with ApexStreamImpl as delegator");
- }
- }
-
checkArguments(op, inputPort, outputPort);
DagMeta.NodeMeta nm = null;
if (lastBrick == null) {
- nm = graph.addNode(opName, op, null, null, inputPort);
+ nm = graph.addNode(op, null, null, inputPort, opts);
} else {
-
- nm = graph.addNode(opName, op, lastBrick.nodeMeta, lastBrick.lastOutput, inputPort);
+ nm = graph.addNode(op, lastBrick.nodeMeta, lastBrick.lastOutput, inputPort, opts);
}
Brick<O> newBrick = new Brick<>();
@@ -315,9 +247,25 @@ public class ApexStreamImpl<T> implements ApexStream<T>
newBrick.lastStream = Pair.<Operator.OutputPort, Operator.InputPort>of(lastBrick.lastOutput, inputPort);
}
- return (STREAM)new ApexStreamImpl<>(this.graph, newBrick);
+ if (this.getClass() == ApexStreamImpl.class || this.getClass() == ApexWindowedStreamImpl.class) {
+ return (STREAM)newStream(this.graph, newBrick);
+ } else {
+ try {
+ return (STREAM)this.getClass().getConstructor(ApexStreamImpl.class).newInstance(newStream(this.graph, newBrick));
+ } catch (Exception e) {
+ throw new RuntimeException("You have to override the default constructor with ApexStreamImpl as default parameter", e);
+ }
+ }
+
+ }
+
+ @Override
+ public <O, INSTREAM extends ApexStream<T>, OUTSTREAM extends ApexStream<O>> OUTSTREAM addCompositeStreams(CompositeStreamTransform<INSTREAM, OUTSTREAM> compositeStreamTransform)
+ {
+ return compositeStreamTransform.compose((INSTREAM)this);
}
+
/* Check to see if inputPort and outputPort belongs to the operator */
private void checkArguments(Operator op, Operator.InputPort inputPort, Operator.OutputPort outputPort)
{
@@ -362,8 +310,8 @@ public class ApexStreamImpl<T> implements ApexStream<T>
public ApexStreamImpl<T> print()
{
ConsoleOutputOperator consoleOutputOperator = new ConsoleOutputOperator();
- addOperator(IDGenerator.generateOperatorIDWithUUID(consoleOutputOperator.getClass()), consoleOutputOperator,
- (Operator.InputPort<T>)consoleOutputOperator.input, null);
+ addOperator(consoleOutputOperator,
+ (Operator.InputPort<T>)consoleOutputOperator.input, null, Option.Options.name(IDGenerator.generateOperatorIDWithUUID(consoleOutputOperator.getClass())));
return this;
}
@@ -469,6 +417,7 @@ public class ApexStreamImpl<T> implements ApexStream<T>
{
LocalMode lma = LocalMode.newInstance();
populateDag(lma.getDAG());
+ DAG dag = lma.getDAG();
LocalMode.Controller lc = lma.getController();
if (lc instanceof StramLocalCluster) {
((StramLocalCluster)lc).setExitCondition(exitCondition);
@@ -493,5 +442,36 @@ public class ApexStreamImpl<T> implements ApexStream<T>
//TODO need an api to submit the StreamingApplication to cluster
}
+ @Override
+ public WindowedStream<T> window(WindowOption option)
+ {
+ return window(option, null, null);
+ }
+
+ @Override
+ public WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption)
+ {
+ return window(windowOption, triggerOption, null);
+ }
+
+ @Override
+ public WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption, Duration allowLateness)
+ {
+ ApexWindowedStreamImpl<T> windowedStream = new ApexWindowedStreamImpl<>();
+ windowedStream.lastBrick = lastBrick;
+ windowedStream.graph = graph;
+ windowedStream.windowOption = windowOption;
+ windowedStream.triggerOption = triggerOption;
+ windowedStream.allowedLateness = allowLateness;
+ return windowedStream;
+ }
+
+ protected <O> ApexStream<O> newStream(DagMeta graph, Brick<O> newBrick)
+ {
+ ApexStreamImpl<O> newstream = new ApexStreamImpl<>();
+ newstream.graph = graph;
+ newstream.lastBrick = newBrick;
+ return newstream;
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
new file mode 100644
index 0000000..a293ea8
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import java.util.List;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
+import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
+import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.Option;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+
+import org.apache.apex.malhar.stream.api.impl.accumulation.Count;
+import org.apache.apex.malhar.stream.api.impl.accumulation.FoldFn;
+import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn;
+import org.apache.apex.malhar.stream.api.impl.accumulation.TopN;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Default windowed stream implementation for WindowedStream interface.
+ * It adds more windowed transform for Stream interface
+ *
+ * @since 3.4.0
+ */
+@InterfaceStability.Evolving
+public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements WindowedStream<T>
+{
+
+ protected WindowOption windowOption;
+
+ protected TriggerOption triggerOption;
+
+ protected Duration allowedLateness;
+
+ private class ConvertFn<T> implements Function.MapFunction<T, Tuple<T>>
+ {
+
+ @Override
+ public Tuple<T> f(T input)
+ {
+ if (input instanceof Tuple.TimestampedTuple) {
+ return (Tuple.TimestampedTuple)input;
+ } else {
+ return new Tuple.TimestampedTuple<>(System.currentTimeMillis(), input);
+ }
+ }
+ }
+
+
+ public ApexWindowedStreamImpl()
+ {
+ }
+
+ @Override
+ public <STREAM extends WindowedStream<Tuple.WindowedTuple<Long>>> STREAM count(Option... opts)
+ {
+ Function.MapFunction<T, Tuple<Long>> kVMap = new Function.MapFunction<T, Tuple<Long>>()
+ {
+ @Override
+ public Tuple<Long> f(T input)
+ {
+ if (input instanceof Tuple.TimestampedTuple) {
+ return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)input).getTimestamp(), 1L);
+ } else {
+ return new Tuple.TimestampedTuple<>(System.currentTimeMillis(), 1L);
+ }
+ }
+ };
+
+ WindowedStream<Tuple<Long>> innerstream = map(kVMap);
+ WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createWindowedOperator(new Count());
+ return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
+ }
+
+ @Override
+ public <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> convertToKeyValue, Option... opts)
+ {
+ WindowedStream<Tuple<KeyValPair<K, Long>>> kvstream = map(convertToKeyValue);
+ KeyedWindowedOperatorImpl<K, Long, MutableLong, Long> keyedWindowedOperator = createKeyedWindowedOperator(new Count());
+ return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
+ }
+
+ @Override
+ public <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, List<V>>>>> STREAM topByKey(int N, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts)
+ {
+ TopN<V> top = new TopN<>();
+ top.setN(N);
+ WindowedStream<Tuple<KeyValPair<K, V>>> kvstream = map(convertToKeyVal);
+ KeyedWindowedOperatorImpl<K, V, List<V>, List<V>> keyedWindowedOperator = createKeyedWindowedOperator(top);
+ return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
+ }
+
+ @Override
+ public <STREAM extends WindowedStream<Tuple.WindowedTuple<List<T>>>> STREAM top(int N, Option... opts)
+ {
+
+ TopN<T> top = new TopN<>();
+ top.setN(N);
+ WindowedStream<Tuple<T>> innerstream = map(new ConvertFn<T>());
+ WindowedOperatorImpl<T, List<T>, List<T>> windowedOperator = createWindowedOperator(top);
+ return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
+ }
+
+
+ @Override
+ public <K, V, O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM accumulateByKey(Accumulation<V, ACCU, O> accumulation,
+ Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts)
+ {
+ WindowedStream<Tuple<KeyValPair<K, V>>> kvstream = map(convertToKeyVal);
+ KeyedWindowedOperatorImpl<K, V, ACCU, O> keyedWindowedOperator = createKeyedWindowedOperator(accumulation);
+ return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
+ }
+
+
+ @Override
+ public <O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM accumulate(Accumulation<T, ACCU, O> accumulation, Option... opts)
+ {
+ WindowedStream<Tuple<T>> innerstream = map(new ConvertFn<T>());
+ WindowedOperatorImpl<T, ACCU, O> windowedOperator = createWindowedOperator(accumulation);
+ return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
+ }
+
+
+ @Override
+ public <STREAM extends WindowedStream<Tuple.WindowedTuple<T>>> STREAM reduce(ReduceFn<T> reduce, Option... opts)
+ {
+ WindowedStream<Tuple<T>> innerstream = map(new ConvertFn<T>());
+ WindowedOperatorImpl<T, T, T> windowedOperator = createWindowedOperator(reduce);
+ return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
+ }
+
+ @Override
+ public <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, V>>>> STREAM reduceByKey(ReduceFn<V> reduce, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts)
+ {
+ WindowedStream<Tuple<KeyValPair<K, V>>> kvstream = map(convertToKeyVal);
+ KeyedWindowedOperatorImpl<K, V, V, V> keyedWindowedOperator = createKeyedWindowedOperator(reduce);
+ return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
+ }
+
+ @Override
+ public <O, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM fold(FoldFn<T, O> fold, Option... opts)
+ {
+ WindowedStream<Tuple<T>> innerstream = map(new ConvertFn<T>());
+ WindowedOperatorImpl<T, O, O> windowedOperator = createWindowedOperator(fold);
+ return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
+ }
+
+ @Override
+ public <K, V, O, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM foldByKey(FoldFn<V, O> fold, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts)
+ {
+ WindowedStream<Tuple<KeyValPair<K, V>>> kvstream = map(convertToKeyVal);
+ KeyedWindowedOperatorImpl<K, V, O, O> keyedWindowedOperator = createKeyedWindowedOperator(fold);
+ return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
+
+ }
+
+ @Override
+ public <O, K, STREAM extends WindowedStream<KeyValPair<K, Iterable<O>>>> STREAM groupByKey(Function.ToKeyValue<T, K, O> convertToKeyVal, Option... opts)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <STREAM extends WindowedStream<Iterable<T>>> STREAM group()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <STREAM extends WindowedStream<T>> STREAM resetTrigger(TriggerOption option)
+ {
+ triggerOption = option;
+ return (STREAM)this;
+ }
+
+ @Override
+ public <STREAM extends WindowedStream<T>> STREAM resetAllowedLateness(Duration allowedLateness)
+ {
+ this.allowedLateness = allowedLateness;
+ return (STREAM)this;
+ }
+
+ @Override
+ protected <O> ApexStream<O> newStream(DagMeta graph, Brick<O> newBrick)
+ {
+ ApexWindowedStreamImpl<O> newstream = new ApexWindowedStreamImpl<>();
+ newstream.graph = graph;
+ newstream.lastBrick = newBrick;
+ newstream.windowOption = this.windowOption;
+ newstream.triggerOption = this.triggerOption;
+ newstream.allowedLateness = this.allowedLateness;
+ return newstream;
+ }
+
+ /**
+ * Create the windowed operator for windowed transformation
+ * @param accumulationFn
+ * @param <IN>
+ * @param <ACCU>
+ * @param <OUT>
+ * @return
+ */
+ private <IN, ACCU, OUT> WindowedOperatorImpl<IN, ACCU, OUT> createWindowedOperator(Accumulation<IN, ACCU, OUT> accumulationFn)
+ {
+ WindowedOperatorImpl<IN, ACCU, OUT> windowedOperator = new WindowedOperatorImpl<>();
+ //TODO use other default setting in the future
+ windowedOperator.setDataStorage(new InMemoryWindowedStorage<ACCU>());
+ windowedOperator.setRetractionStorage(new InMemoryWindowedStorage<OUT>());
+ windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
+ if (windowOption != null) {
+ windowedOperator.setWindowOption(windowOption);
+ }
+ if (triggerOption != null) {
+ windowedOperator.setTriggerOption(triggerOption);
+ }
+ if (allowedLateness != null) {
+ windowedOperator.setAllowedLateness(allowedLateness);
+ }
+ windowedOperator.setAccumulation(accumulationFn);
+ return windowedOperator;
+ }
+
+ private <K, V, ACCU, OUT> KeyedWindowedOperatorImpl<K, V, ACCU, OUT> createKeyedWindowedOperator(Accumulation<V, ACCU, OUT> accumulationFn)
+ {
+ KeyedWindowedOperatorImpl<K, V, ACCU, OUT> keyedWindowedOperator = new KeyedWindowedOperatorImpl<>();
+
+ //TODO use other default setting in the future
+ keyedWindowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<K, ACCU>());
+ keyedWindowedOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<K, OUT>());
+ keyedWindowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
+ if (windowOption != null) {
+ keyedWindowedOperator.setWindowOption(windowOption);
+ }
+ if (triggerOption != null) {
+ keyedWindowedOperator.setTriggerOption(triggerOption);
+ }
+ if (allowedLateness != null) {
+ keyedWindowedOperator.setAllowedLateness(allowedLateness);
+ }
+
+ keyedWindowedOperator.setAccumulation(accumulationFn);
+ return keyedWindowedOperator;
+ }
+
+}