You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:16 UTC
[11/30] apex-malhar git commit: Renamed demos to examples. Packages
and artifactid names are changed as suggested.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
new file mode 100644
index 0000000..9fd9495
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.accumulation.Max;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import static java.sql.Types.DOUBLE;
+import static java.sql.Types.INTEGER;
+
+import com.google.common.collect.Lists;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcFieldInfo;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * MaxPerKeyExamples Application from Beam
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "MaxPerKeyExamples")
+public class MaxPerKeyExamples implements StreamingApplication
+{
+
+ /**
+ * A map function to extract the mean temperature from {@link InputPojo}.
+ */
+ public static class ExtractTempFn implements Function.MapFunction<InputPojo, KeyValPair<Integer, Double>>
+ {
+ @Override
+ public KeyValPair<Integer, Double> f(InputPojo row)
+ {
+ Integer month = row.getMonth();
+ Double meanTemp = row.getMeanTemp();
+ return new KeyValPair<Integer, Double>(month, meanTemp);
+ }
+ }
+
+
+ /**
+ * A map function to format output to {@link OutputPojo}.
+ */
+ public static class FormatMaxesFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<Integer, Double>>, OutputPojo>
+ {
+ @Override
+ public OutputPojo f(Tuple.WindowedTuple<KeyValPair<Integer, Double>> input)
+ {
+ OutputPojo row = new OutputPojo();
+ row.setMonth(input.getValue().getKey());
+ row.setMeanTemp(input.getValue().getValue());
+ return row;
+ }
+ }
+
+ /**
+ * A composite transformation to perform three tasks:
+ * 1. extract the month and its mean temperature from input pojo.
+ * 2. find the maximum mean temperature for every month.
+ * 3. format the result to a output pojo object.
+ */
+ public static class MaxMeanTemp extends CompositeStreamTransform<WindowedStream<InputPojo>, WindowedStream<OutputPojo>>
+ {
+ @Override
+ public WindowedStream<OutputPojo> compose(WindowedStream<InputPojo> rows)
+ {
+ // InputPojo... => <month, meanTemp> ...
+ WindowedStream<KeyValPair<Integer, Double>> temps = rows.map(new ExtractTempFn(), name("ExtractTempFn"));
+
+ // month, meanTemp... => <month, max mean temp>...
+ WindowedStream<Tuple.WindowedTuple<KeyValPair<Integer, Double>>> tempMaxes =
+ temps.accumulateByKey(new Max<Double>(),
+ new Function.ToKeyValue<KeyValPair<Integer, Double>, Integer, Double>()
+ {
+ @Override
+ public Tuple<KeyValPair<Integer, Double>> f(KeyValPair<Integer, Double> input)
+ {
+ return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GlobalWindow.INSTANCE, input);
+ }
+ }, name("MaxPerMonth"));
+
+ // <month, max>... => OutputPojo...
+ WindowedStream<OutputPojo> results = tempMaxes.map(new FormatMaxesFn(), name("FormatMaxesFn"));
+
+ return results;
+ }
+ }
+
+ /**
+ * Method to set field info for {@link JdbcPOJOInputOperator}.
+ * @return
+ */
+ private List<FieldInfo> addInputFieldInfos()
+ {
+ List<FieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new FieldInfo("MONTH", "month", FieldInfo.SupportType.INTEGER));
+ fieldInfos.add(new FieldInfo("DAY", "day", FieldInfo.SupportType.INTEGER));
+ fieldInfos.add(new FieldInfo("YEAR", "year", FieldInfo.SupportType.INTEGER));
+ fieldInfos.add(new FieldInfo("MEANTEMP", "meanTemp", FieldInfo.SupportType.DOUBLE));
+ return fieldInfos;
+ }
+
+ /**
+ * Method to set field info for {@link JdbcPOJOInsertOutputOperator}.
+ * @return
+ */
+ private List<JdbcFieldInfo> addOutputFieldInfos()
+ {
+ List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new JdbcFieldInfo("MONTH", "month", JdbcFieldInfo.SupportType.INTEGER, INTEGER));
+ fieldInfos.add(new JdbcFieldInfo("MEANTEMP", "meanTemp", JdbcFieldInfo.SupportType.DOUBLE, DOUBLE));
+ return fieldInfos;
+ }
+
+
+ /**
+ * Populate the dag using High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ JdbcPOJOInputOperator jdbcInput = new JdbcPOJOInputOperator();
+ jdbcInput.setFieldInfos(addInputFieldInfos());
+
+ JdbcStore store = new JdbcStore();
+ jdbcInput.setStore(store);
+
+ JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
+ jdbcOutput.setFieldInfos(addOutputFieldInfos());
+ JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+ jdbcOutput.setStore(outputStore);
+
+ // Create stream that reads from a Jdbc Input.
+ ApexStream<Object> stream = StreamFactory.fromInput(jdbcInput, jdbcInput.outputPort, name("jdbcInput"))
+
+ // Apply window and trigger option to the stream.
+ .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+
+ // Because Jdbc Input sends out a stream of Object, need to cast them to InputPojo.
+ .map(new Function.MapFunction<Object, InputPojo>()
+ {
+ @Override
+ public InputPojo f(Object input)
+ {
+ return (InputPojo)input;
+ }
+ }, name("ObjectToInputPojo"))
+
+ // Plug in the composite transformation to the stream to calculate the maximum temperature for each month.
+ .addCompositeStreams(new MaxMeanTemp())
+
+ // Cast the resulted OutputPojo to Object for Jdbc Output to consume.
+ .map(new Function.MapFunction<OutputPojo, Object>()
+ {
+ @Override
+ public Object f(OutputPojo input)
+ {
+ return (Object)input;
+ }
+ }, name("OutputPojoToObject"))
+
+ // Output the result to Jdbc Output.
+ .endWith(jdbcOutput, jdbcOutput.input, name("jdbcOutput"));
+
+ stream.populateDag(dag);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
new file mode 100644
index 0000000..f3d0c64
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+/**
+ * OutputPojo Tuple Class for jdbcOutput of {@link MaxPerKeyExamples}.
+ *
+ * @since 3.5.0
+ */
+public class OutputPojo
+{
+ private int month;
+ private double meanTemp;
+
+ @Override
+ public String toString()
+ {
+ return "PojoEvent [month=" + getMonth() + ", meanTemp=" + getMeanTemp() + "]";
+ }
+
+ public void setMonth(int month)
+ {
+ this.month = month;
+ }
+
+ public int getMonth()
+ {
+ return this.month;
+ }
+
+ public void setMeanTemp(double meanTemp)
+ {
+ this.meanTemp = meanTemp;
+ }
+
+ public double getMeanTemp()
+ {
+ return meanTemp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
new file mode 100644
index 0000000..962faa5
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.Date;
+import java.util.Objects;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This example illustrates the basic concepts behind triggering. It shows how to use different
+ * trigger definitions to produce partial (speculative) results before all the data is processed and
+ * to control when updated results are produced for late data. The example performs a streaming
+ * analysis of the data coming in from PubSub and writes the results to BigQuery. It divides the
+ * data into {@link Window windows} to be processed, and demonstrates using various kinds of
+ * {@link org.apache.beam.sdk.transforms.windowing.Trigger triggers} to control when the results for
+ * each window are emitted.
+ *
+ * <p> This example uses a portion of real traffic data from San Diego freeways. It contains
+ * readings from sensor stations set up along each freeway. Each sensor reading includes a
+ * calculation of the 'total flow' across all lanes in that freeway direction.
+ *
+ * <p> Concepts:
+ * <pre>
+ * 1. The default triggering behavior
+ * 2. Late data with the default trigger
+ * 3. How to get speculative estimates
+ * 4. Combining late data and speculative estimates
+ * </pre>
+ *
+ * <p> Before running this example, it will be useful to familiarize yourself with Dataflow triggers
+ * and understand the concept of 'late data',
+ * See: <a href="https://cloud.google.com/dataflow/model/triggers">
+ * https://cloud.google.com/dataflow/model/triggers </a> and
+ * <a href="https://cloud.google.com/dataflow/model/windowing#Advanced">
+ * https://cloud.google.com/dataflow/model/windowing#Advanced </a>
+ *
+ * <p> The example pipeline reads data from a Pub/Sub topic. By default, running the example will
+ * also run an auxiliary pipeline to inject data from the default {@code --input} file to the
+ * {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the injected data so that the
+ * example pipeline can operate on <i>event time</i> (rather than arrival time). The auxiliary
+ * pipeline also randomly simulates late data, by setting the timestamps of some of the data
+ * elements to be in the past. You may override the default {@code --input} with the file of your
+ * choosing or set {@code --input=""} which will disable the automatic Pub/Sub injection, and allow
+ * you to use a separate tool to publish to the given topic.
+ *
+ * <p> The example is configured to use the default Pub/Sub topic and the default BigQuery table
+ * from the example common package (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@code --pubsubTopic}, {@code --bigQueryDataset}, and
+ * {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
+ * the example will try to create them.
+ *
+ * <p> The pipeline outputs its results to a BigQuery table.
+ * Here are some queries you can use to see interesting results:
+ * Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table.
+ * Replace {@code <enter_window_interval>} in the query below with the window interval.
+ *
+ * <p> To see the results of the default trigger,
+ * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after
+ * the window duration, until the first pane of non-late data has been emitted, to see more
+ * interesting results.
+ * {@code SELECT * FROM enter_table_name WHERE triggerType = "default" ORDER BY window DESC}
+ *
+ * <p> To see the late data i.e. dropped by the default trigger,
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "withAllowedLateness" and
+ * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processingTime}
+ *
+ * <p>To see the the difference between accumulation mode and discarding mode,
+ * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND
+ * (triggerType = "withAllowedLateness" or triggerType = "sequential") and freeway = "5" ORDER BY
+ * window DESC, processingTime}
+ *
+ * <p> To see speculative results every minute,
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "speculative" and freeway = "5"
+ * ORDER BY window DESC, processingTime}
+ *
+ * <p> To see speculative results every five minutes after the end of the window
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "sequential" and timing != "EARLY"
+ * and freeway = "5" ORDER BY window DESC, processingTime}
+ *
+ * <p> To see the first and the last pane for a freeway in a window for all the trigger types,
+ * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window}
+ *
+ * <p> To reduce the number of results for each query we can add additional where clauses.
+ * For examples, To see the results of the default trigger,
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "default" AND freeway = "5" AND
+ * window = "<enter_window_interval>"}
+ *
+ * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
+ *
+ * @since 3.5.0
+ */
+
+public class TriggerExample
+{
+ //Numeric value of fixed window duration, in minutes
+ public static final int WINDOW_DURATION = 30;
+ // Constants used in triggers.
+ // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
+ // ONE_MINUTE is used only with processing time before the end of the window
+ public static final Duration ONE_MINUTE = Duration.standardMinutes(1);
+ // FIVE_MINUTES is used only with processing time after the end of the window
+ public static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
+ // ONE_DAY is used to specify the amount of lateness allowed for the data elements.
+ public static final Duration ONE_DAY = Duration.standardDays(1);
+
+ /**
+ * This transform demonstrates using triggers to control when data is produced for each window
+ * Consider an example to understand the results generated by each type of trigger.
+ * The example uses "freeway" as the key. Event time is the timestamp associated with the data
+ * element and processing time is the time when the data element gets processed in the pipeline.
+ * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window.
+ * Key (freeway) | Value (totalFlow) | event time | processing time
+ * 5 | 50 | 10:00:03 | 10:00:47
+ * 5 | 30 | 10:01:00 | 10:01:03
+ * 5 | 30 | 10:02:00 | 11:07:00
+ * 5 | 20 | 10:04:10 | 10:05:15
+ * 5 | 60 | 10:05:00 | 11:03:00
+ * 5 | 20 | 10:05:01 | 11.07:30
+ * 5 | 60 | 10:15:00 | 10:27:15
+ * 5 | 40 | 10:26:40 | 10:26:43
+ * 5 | 60 | 10:27:20 | 10:27:25
+ * 5 | 60 | 10:29:00 | 11:11:00
+ *
+ * <p> Dataflow tracks a watermark which records up to what point in event time the data is
+ * complete. For the purposes of the example, we'll assume the watermark is approximately 15m
+ * behind the current processing time. In practice, the actual value would vary over time based
+ * on the systems knowledge of the current PubSub delay and contents of the backlog (data
+ * that has not yet been processed).
+ *
+ * <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would
+ * close at 10:44:59, when the watermark passes 10:30:00.
+ */
+ static class CalculateTotalFlow
+ extends CompositeStreamTransform<ApexStream<String>, WindowedStream<SampleBean>>
+ {
+ private int windowDuration;
+
+ CalculateTotalFlow(int windowDuration)
+ {
+ this.windowDuration = windowDuration;
+ }
+
+ @Override
+ public WindowedStream<SampleBean> compose(ApexStream<String> inputStream)
+ {
+ // Concept #1: The default triggering behavior
+ // By default Dataflow uses a trigger which fires when the watermark has passed the end of the
+ // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+
+ // The system also defaults to dropping late data -- data which arrives after the watermark
+ // has passed the event timestamp of the arriving element. This means that the default trigger
+ // will only fire once.
+
+ // Each pane produced by the default trigger with no allowed lateness will be the first and
+ // last pane in the window, and will be ON_TIME.
+
+ // The results for the example above with the default trigger and zero allowed lateness
+ // would be:
+ // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
+ // 5 | 260 | 6 | true | true | ON_TIME
+
+ // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
+ // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
+ // late, and dropped.
+
+ WindowedStream<SampleBean> defaultTriggerResults = inputStream
+ .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+ new TriggerOption().discardingFiredPanes())
+ .addCompositeStreams(new TotalFlow("default"));
+
+ // Concept #2: Late data with the default trigger
+ // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This
+ // leads to each window staying open for ONE_DAY after the watermark has passed the end of the
+ // window. Any late data will result in an additional pane being fired for that same window.
+
+ // The first pane produced will be ON_TIME and the remaining panes will be LATE.
+ // To definitely get the last pane when the window closes, use
+ // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS).
+
+ // The results for the example above with the default trigger and ONE_DAY allowed lateness
+ // would be:
+ // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
+ // 5 | 260 | 6 | true | false | ON_TIME
+ // 5 | 60 | 1 | false | false | LATE
+ // 5 | 30 | 1 | false | false | LATE
+ // 5 | 20 | 1 | false | false | LATE
+ // 5 | 60 | 1 | false | false | LATE
+ WindowedStream<SampleBean> withAllowedLatenessResults = inputStream
+ .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+ new TriggerOption().discardingFiredPanes(),
+ Duration.standardDays(1))
+ .addCompositeStreams(new TotalFlow("withAllowedLateness"));
+
+ // Concept #3: How to get speculative estimates
+ // We can specify a trigger that fires independent of the watermark, for instance after
+ // ONE_MINUTE of processing time. This allows us to produce speculative estimates before
+ // all the data is available. Since we don't have any triggers that depend on the watermark
+ // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE.
+
+ // We also use accumulatingFiredPanes to build up the results across each pane firing.
+
+ // The results for the example above for this trigger would be:
+ // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
+ // 5 | 80 | 2 | true | false | EARLY
+ // 5 | 100 | 3 | false | false | EARLY
+ // 5 | 260 | 6 | false | false | EARLY
+ // 5 | 320 | 7 | false | false | LATE
+ // 5 | 370 | 9 | false | false | LATE
+ // 5 | 430 | 10 | false | false | LATE
+
+ ApexStream<SampleBean> speculativeResults = inputStream
+ .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+ //Trigger fires every minute
+ new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
+ // After emitting each pane, it will continue accumulating the elements so that each
+ // approximation includes all of the previous data in addition to the newly arrived
+ // data.
+ .accumulatingFiredPanes(),
+ Duration.standardDays(1))
+ .addCompositeStreams(new TotalFlow("speculative"));
+
+ // Concept #4: Combining late data and speculative estimates
+ // We can put the previous concepts together to get EARLY estimates, an ON_TIME result,
+ // and LATE updates based on late data.
+
+ // Each time a triggering condition is satisfied it advances to the next trigger.
+ // If there are new elements this trigger emits a window under following condition:
+ // > Early approximations every minute till the end of the window.
+ // > An on-time firing when the watermark has passed the end of the window
+ // > Every five minutes of late data.
+
+ // Every pane produced will either be EARLY, ON_TIME or LATE.
+
+ // The results for the example above for this trigger would be:
+ // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
+ // 5 | 80 | 2 | true | false | EARLY
+ // 5 | 100 | 3 | false | false | EARLY
+ // 5 | 260 | 6 | false | false | EARLY
+ // [First pane fired after the end of the window]
+ // 5 | 320 | 7 | false | false | ON_TIME
+ // 5 | 430 | 10 | false | false | LATE
+
+ // For more possibilities of how to build advanced triggers, see {@link Trigger}.
+ WindowedStream<SampleBean> sequentialResults = inputStream
+ .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+ // Speculative every ONE_MINUTE
+ new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
+ .withLateFiringsAtEvery(Duration.standardMinutes(5))
+ // After emitting each pane, it will continue accumulating the elements so that each
+ // approximation includes all of the previous data in addition to the newly arrived
+ // data.
+ .accumulatingFiredPanes(),
+ Duration.standardDays(1))
+ .addCompositeStreams(new TotalFlow("sequential"));
+
+ return sequentialResults;
+ }
+
+ }
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // The remaining parts of the pipeline are needed to produce the output for each
+ // concept above. Not directly relevant to understanding the trigger examples.
+
+ /**
+ * Calculate total flow and number of records for each freeway and format the results to TableRow
+ * objects, to save to BigQuery.
+ */
+ static class TotalFlow extends
+ CompositeStreamTransform<WindowedStream<String>, WindowedStream<SampleBean>>
+ {
+ private String triggerType;
+
+ public TotalFlow(String triggerType)
+ {
+ this.triggerType = triggerType;
+ }
+
+ @Override
+ public WindowedStream<SampleBean> compose(WindowedStream<String> inputStream)
+ {
+
+ WindowedStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = inputStream
+ .groupByKey(new ExtractFlowInfo());
+
+ return flowPerFreeway
+ .map(new Function.MapFunction<KeyValPair<String, Iterable<Integer>>, KeyValPair<String, String>>()
+ {
+ @Override
+ public KeyValPair<String, String> f(KeyValPair<String, Iterable<Integer>> input)
+ {
+ Iterable<Integer> flows = input.getValue();
+ Integer sum = 0;
+ Long numberOfRecords = 0L;
+ for (Integer value : flows) {
+ sum += value;
+ numberOfRecords++;
+ }
+ return new KeyValPair<>(input.getKey(), sum + "," + numberOfRecords);
+ }
+ })
+ .map(new FormatTotalFlow(triggerType));
+ }
+ }
+
+ /**
+ * Format the results of the Total flow calculation to a TableRow, to save to BigQuery.
+ * Adds the triggerType, pane information, processing time and the window timestamp.
+ */
+ static class FormatTotalFlow implements Function.MapFunction<KeyValPair<String, String>, SampleBean>
+ {
+ private String triggerType;
+
+ public FormatTotalFlow(String triggerType)
+ {
+ this.triggerType = triggerType;
+ }
+
+ @Override
+ public SampleBean f(KeyValPair<String, String> input)
+ {
+ String[] values = input.getValue().split(",");
+ //TODO need to have a callback to get the metadata like window id, pane id, timestamps etc.
+ return new SampleBean(triggerType, input.getKey(), Integer.parseInt(values[0]), Long
+ .parseLong(values[1]), null, false, false, null, null, new Date());
+ }
+ }
+
+ public static class SampleBean
+ {
+ public SampleBean()
+ {
+ }
+
+ private String triggerType;
+
+ private String freeway;
+
+ private int totalFlow;
+
+ private long numberOfRecords;
+
+ private String window;
+
+ private boolean isFirst;
+
+ private boolean isLast;
+
+ private Date timing;
+
+ private Date eventTime;
+
+ private Date processingTime;
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SampleBean that = (SampleBean)o;
+ return totalFlow == that.totalFlow &&
+ numberOfRecords == that.numberOfRecords &&
+ isFirst == that.isFirst &&
+ isLast == that.isLast &&
+ Objects.equals(triggerType, that.triggerType) &&
+ Objects.equals(freeway, that.freeway) &&
+ Objects.equals(window, that.window) &&
+ Objects.equals(timing, that.timing) &&
+ Objects.equals(eventTime, that.eventTime) &&
+ Objects.equals(processingTime, that.processingTime);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects
+ .hash(triggerType, freeway, totalFlow, numberOfRecords, window, isFirst, isLast, timing, eventTime,
+ processingTime);
+ }
+
+ public SampleBean(String triggerType, String freeway, int totalFlow, long numberOfRecords, String window, boolean isFirst, boolean isLast, Date timing, Date eventTime, Date processingTime)
+ {
+
+ this.triggerType = triggerType;
+ this.freeway = freeway;
+ this.totalFlow = totalFlow;
+ this.numberOfRecords = numberOfRecords;
+ this.window = window;
+ this.isFirst = isFirst;
+ this.isLast = isLast;
+ this.timing = timing;
+ this.eventTime = eventTime;
+ this.processingTime = processingTime;
+ }
+
+ public String getTriggerType()
+ {
+ return triggerType;
+ }
+
+ public void setTriggerType(String triggerType)
+ {
+ this.triggerType = triggerType;
+ }
+
+ public String getFreeway()
+ {
+ return freeway;
+ }
+
+ public void setFreeway(String freeway)
+ {
+ this.freeway = freeway;
+ }
+
+ public int getTotalFlow()
+ {
+ return totalFlow;
+ }
+
+ public void setTotalFlow(int totalFlow)
+ {
+ this.totalFlow = totalFlow;
+ }
+
+ public long getNumberOfRecords()
+ {
+ return numberOfRecords;
+ }
+
+ public void setNumberOfRecords(long numberOfRecords)
+ {
+ this.numberOfRecords = numberOfRecords;
+ }
+
+ public String getWindow()
+ {
+ return window;
+ }
+
+ public void setWindow(String window)
+ {
+ this.window = window;
+ }
+
+ public boolean isFirst()
+ {
+ return isFirst;
+ }
+
+ public void setFirst(boolean first)
+ {
+ isFirst = first;
+ }
+
+ public boolean isLast()
+ {
+ return isLast;
+ }
+
+ public void setLast(boolean last)
+ {
+ isLast = last;
+ }
+
+ public Date getTiming()
+ {
+ return timing;
+ }
+
+ public void setTiming(Date timing)
+ {
+ this.timing = timing;
+ }
+
+ public Date getEventTime()
+ {
+ return eventTime;
+ }
+
+ public void setEventTime(Date eventTime)
+ {
+ this.eventTime = eventTime;
+ }
+
+ public Date getProcessingTime()
+ {
+ return processingTime;
+ }
+
+ public void setProcessingTime(Date processingTime)
+ {
+ this.processingTime = processingTime;
+ }
+ }
+
+ /**
+ * Extract the freeway and total flow in a reading.
+ * Freeway is used as key since we are calculating the total flow for each freeway.
+ */
+ static class ExtractFlowInfo implements Function.ToKeyValue<String, String, Integer>
+ {
+ @Override
+ public Tuple<KeyValPair<String, Integer>> f(String input)
+ {
+ String[] laneInfo = input.split(",");
+ if (laneInfo[0].equals("timestamp")) {
+ // Header row
+ return null;
+ }
+ if (laneInfo.length < 48) {
+ //Skip the invalid input.
+ return null;
+ }
+ String freeway = laneInfo[2];
+ Integer totalFlow = tryIntegerParse(laneInfo[7]);
+ // Ignore the records with total flow 0 to easily understand the working of triggers.
+ // Skip the records with total flow -1 since they are invalid input.
+ if (totalFlow == null || totalFlow <= 0) {
+ return null;
+ }
+ return new Tuple.PlainTuple<>(new KeyValPair<>(freeway, totalFlow));
+ }
+ }
+
+ private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
+
+ public static void main(String[] args) throws Exception
+ {
+ StreamFactory.fromFolder("some folder")
+ .addCompositeStreams(new CalculateTotalFlow(60));
+
+ }
+
+ private static Integer tryIntegerParse(String number)
+ {
+ try {
+ return Integer.parseInt(number);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/resources/META-INF/properties.xml b/examples/highlevelapi/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..ead0460
--- /dev/null
+++ b/examples/highlevelapi/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <!--
+ <property>
+ <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+ <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+ </property>
+ -->
+
+ <!-- Properties for TwitterAutoComplete, please fill out all of them to make the application work -->
+ <property>
+ <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret</name>
+ <value></value>
+ </property>
+
+ <!-- Properties for StreamingWordExtract -->
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.userName</name>
+ <value>root</value>
+ </property>
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.password</name>
+ <value>password</value>
+ </property>
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseDriver</name>
+ <value>org.hsqldb.jdbcDriver</value>
+ </property>
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.batchSize</name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name>
+ <value>org.apache.apex.malhar.stream.sample.complete.PojoEvent</value>
+ </property>
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseUrl</name>
+ <value>jdbc:hsqldb:mem:test</value>
+ </property>
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.tablename</name>
+ <value>Test</value>
+ </property>
+
+ <!-- Properties for MaxPerKeyExamples -->
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.userName</name>
+ <value>root</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.password</name>
+ <value>password</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseDriver</name>
+ <value>org.hsqldb.jdbcDriver</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.batchSize</name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS</name>
+ <value>org.apache.apex.malhar.stream.sample.cookbook.InputPojo</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseUrl</name>
+ <value>jdbc:hsqldb:mem:test</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.tableName</name>
+ <value>InputTable</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.query</name>
+ <value>SELECT * FROM InputTable;</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.userName</name>
+ <value>root</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.password</name>
+ <value>password</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseDriver</name>
+ <value>org.hsqldb.jdbcDriver</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.batchSize</name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name>
+ <value>org.apache.apex.malhar.stream.sample.cookbook.OutputPojo</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseUrl</name>
+ <value>jdbc:hsqldb:mem:test</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.tablename</name>
+ <value>OutputTable</value>
+ </property>
+
+</configuration>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
new file mode 100644
index 0000000..c078683
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test for {@link MinimalWordCount}.
+ */
+public class MinimalWordCountTest
+{
+ @Test
+ public void MinimalWordCountTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.application.MinimalWordCount.operator.console.silent", "true");
+ MinimalWordCount app = new MinimalWordCount();
+
+ lma.prepareDAG(app, conf);
+
+ LocalMode.Controller lc = lma.getController();
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return MinimalWordCount.Collector.isDone();
+ }
+ });
+
+ lc.run(10000);
+
+ Assert.assertTrue(MinimalWordCount.Collector.result.get("error") == 7);
+ Assert.assertTrue(MinimalWordCount.Collector.result.get("word") == 119);
+ Assert.assertTrue(MinimalWordCount.Collector.result.get("bye") == 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
new file mode 100644
index 0000000..f0c51f6
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app
+ * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties
+ * for the application before running it:
+ * Your application consumer key,
+ * Your application consumer secret,
+ * Your twitter access token, and
+ * Your twitter access token secret.
+ */
+public class WindowedWordCountTest
+{
+ @Test
+ public void WindowedWordCountTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.application.WindowedWordCount.operator.console.silent", "true");
+ lma.prepareDAG(new WindowedWordCount(), conf);
+ LocalMode.Controller lc = lma.getController();
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return WindowedWordCount.Collector.isDone();
+ }
+ });
+
+ lc.run(60000);
+
+ Assert.assertEquals(127, countSum(WindowedWordCount.Collector.getResult()));
+ Assert.assertEquals(28, countSumWord(WindowedWordCount.Collector.getResult(), "word2"));
+ Assert.assertEquals(7, countSumWord(WindowedWordCount.Collector.getResult(), "error"));
+ Assert.assertEquals(21, countSumWord(WindowedWordCount.Collector.getResult(), "word9"));
+ Assert.assertEquals(1, countSumWord(WindowedWordCount.Collector.getResult(), "bye"));
+ }
+
+ public long countSum(Map<KeyValPair<Long, String>, Long> map)
+ {
+ long sum = 0;
+ for (long count : map.values()) {
+ sum += count;
+ }
+ return sum;
+ }
+
+ public long countSumWord(Map<KeyValPair<Long, String>, Long> map, String word)
+ {
+ long sum = 0;
+ for (Map.Entry<KeyValPair<Long, String>, Long> entry : map.entrySet()) {
+ if (entry.getKey().getValue().equals(word)) {
+ sum += entry.getValue();
+ }
+ }
+ return sum;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
new file mode 100644
index 0000000..4ed2d5d
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the AutoComplete Application
+ */
+public class AutoCompleteTest
+{
+
+ @Test
+ public void AutoCompleteTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.application.AutoComplete.operator.console.silent", "true");
+ lma.prepareDAG(new AutoComplete(), conf);
+ LocalMode.Controller lc = lma.getController();
+
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return AutoComplete.Collector.isDone();
+ }
+ });
+
+ lc.run(200000);
+
+ Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("had"));
+ Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("hadoop"));
+ Assert.assertEquals(2, AutoComplete.Collector.getResult().get("mapreduce").get(0).getCount());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
new file mode 100644
index 0000000..dc9cdec
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Callable;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing StreamingWordExtract application
+ */
+public class StreamingWordExtractTest
+{
+ private static final String TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.complete.PojoEvent";
+ private static final String DB_DRIVER = "org.h2.Driver";
+ private static final String DB_URL = "jdbc:h2:~/test";
+ private static final String TABLE_NAME = "Test";
+ private static final String USER_NAME = "root";
+ private static final String PSW = "password";
+
+ @BeforeClass
+ public static void setup()
+ {
+ try {
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+ Statement stmt = con.createStatement();
+
+ String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+ + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+ + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+ + ")";
+ stmt.executeUpdate(createMetaTable);
+
+ String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ + "(STRINGVALUE VARCHAR(255))";
+ stmt.executeUpdate(createTable);
+
+ } catch (Throwable e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @After
+ public void cleanTable()
+ {
+ try {
+ Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+ Statement stmt = con.createStatement();
+ String dropTable = "drop table " + TABLE_NAME;
+ stmt.executeUpdate(dropTable);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void setConfig(Configuration conf)
+ {
+ conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME);
+ conf.set("dt.operator.jdbcOutput.prop.store.password", PSW);
+ conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER);
+ conf.set("dt.operator.jdbcOutput.prop.batchSize", "5");
+ conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", TUPLE_CLASS);
+ conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL);
+ conf.set("dt.operator.jdbcOutput.prop.tablename", TABLE_NAME);
+ }
+
+ public int getNumOfEventsInStore()
+ {
+ Connection con;
+ try {
+ con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+ Statement stmt = con.createStatement();
+
+ String countQuery = "SELECT count(*) from " + TABLE_NAME;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ resultSet.next();
+ return resultSet.getInt(1);
+ } catch (SQLException e) {
+ throw new RuntimeException("fetching count", e);
+ }
+ }
+
+ @Test
+ public void StreamingWordExtractTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ setConfig(conf);
+ StreamingWordExtract app = new StreamingWordExtract();
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return getNumOfEventsInStore() == 36;
+ }
+ });
+
+ lc.run(10000);
+
+ Assert.assertEquals(app.getWordCount(), getNumOfEventsInStore());
+ Assert.assertEquals(app.getEntriesMapped(), getNumOfEventsInStore());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
new file mode 100644
index 0000000..fddf511
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the {@link TopWikipediaSessions} Application.
+ */
+public class TopWikipediaSessionsTest
+{
+ @Test
+ public void TopWikipediaSessionsTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.application.TopWikipediaSessions.operator.console.silent", "true");
+ lma.prepareDAG(new TopWikipediaSessions(), conf);
+ LocalMode.Controller lc = lma.getController();
+
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return TopWikipediaSessions.SessionGen.getTupleCount() >= 250;
+ }
+ });
+
+ lc.run(30000);
+
+ for (int i = 0; i < TopWikipediaSessions.Collector.getResult().size(); i++) {
+ Assert.assertTrue(isInOrder(TopWikipediaSessions.Collector.getResult().get(i)));
+ }
+ }
+
+ public boolean isInOrder(List<TopWikipediaSessions.TempWrapper> input)
+ {
+ if (input.size() == 0 || input.size() == 1) {
+ return true;
+ }
+ for (int i = 0; i < input.size() - 2; i++) {
+ if (input.get(i).getValue().getValue() < input.get(i + 1).getValue().getValue()) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
new file mode 100644
index 0000000..766fa60
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the {@link TrafficRoutes} Application.
+ */
+public class TrafficRoutesTest
+{
+
+ @Test
+ public void TrafficRoutesTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.application.TrafficRoutes.operator.console.silent", "true");
+ lma.prepareDAG(new TrafficRoutes(), conf);
+ LocalMode.Controller lc = lma.getController();
+
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return TrafficRoutes.InfoGen.getTupleCount() >= 100;
+ }
+ });
+
+ lc.run(60000);
+
+ Assert.assertTrue(!TrafficRoutes.Collector.getResult().isEmpty());
+ for (Map.Entry<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> entry : TrafficRoutes.Collector.getResult().entrySet()) {
+ Assert.assertTrue(entry.getValue().getKey() <= 75);
+ Assert.assertTrue(entry.getValue().getKey() >= 55);
+ Assert.assertTrue(entry.getKey().getValue().equals("SDRoute1") || entry.getKey().getValue().equals("SDRoute2"));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
new file mode 100644
index 0000000..9ba2f25
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app
+ * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties
+ * for the application before running it:
+ * Your application consumer key,
+ * Your application consumer secret,
+ * Your twitter access token, and
+ * Your twitter access token secret.
+ *
+ * This test is mainly for local demonstration purpose. Default time to run the application is 1 minute, please
+ * set the time you need to run the application before you run.
+ */
+public class TwitterAutoCompleteTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(org.apache.apex.malhar.stream.sample.complete.AutoCompleteTest.class);
+
+ @Test
+ @Ignore
+ public void TwitterAutoCompleteTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ //uncomment the following lines and change YOUR_XXX to the corresponding information needed.
+ //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey", "YOUR_CONSUMERKEY");
+ //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret", "YOUR_CONSUERSECRET");
+ //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken", "YOUR_ACCESSTOKEN");
+ //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret", "YOUR_TOKENSECRET");
+ lma.prepareDAG(new TwitterAutoComplete(), conf);
+ LocalMode.Controller lc = lma.getController();
+ long start = System.currentTimeMillis();
+ lc.run(60000); // Set your desired time to run the application here.
+ long end = System.currentTimeMillis();
+ long time = end - start;
+ logger.info("Test used " + time + " ms");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
new file mode 100644
index 0000000..1e14fff
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.concurrent.Callable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test for {@link CombinePerKeyExamples}.
+ */
+public class CombinePerKeyExamplesTest
+{
+ @Test
+ public void CombinePerKeyExamplesTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.application.CombinePerKeyExamples.operator.console.silent", "true");
+ CombinePerKeyExamples app = new CombinePerKeyExamples();
+
+ lma.prepareDAG(app, conf);
+
+ LocalMode.Controller lc = lma.getController();
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return CombinePerKeyExamples.Collector.isDone();
+ }
+ });
+ lc.run(100000);
+
+ Assert.assertTrue(CombinePerKeyExamples.Collector.getResult().get(CombinePerKeyExamples.Collector.getResult().size() - 2).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
new file mode 100644
index 0000000..7f93f50
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+
+import com.datatorrent.stram.StramLocalCluster;
+
+
+/**
+ * Test for {@link DeDupExample}.
+ */
+public class DeDupExampleTest
+{
+ @Test
+ public void DeDupExampleTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.application.DeDupExample.operator.console.silent", "true");
+ DeDupExample app = new DeDupExample();
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return DeDupExample.Collector.isDone();
+ }
+ });
+ lc.run(50000);
+
+ Assert.assertEquals(9, DeDupExample.Collector.getResult().getValue().size());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
new file mode 100644
index 0000000..ec28b40
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test for MaxPerKeyExamples Application.
+ */
+public class MaxPerKeyExamplesTest
+{
+
+ private static final String INPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.InputPojo";
+ private static final String OUTPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.OutputPojo";
+ private static final String DB_DRIVER = "org.h2.Driver";
+ private static final String DB_URL = "jdbc:h2:~/test";
+ private static final String INPUT_TABLE = "InputTable";
+ private static final String OUTPUT_TABLE = "OutputTable";
+ private static final String USER_NAME = "root";
+ private static final String PSW = "password";
+ private static final String QUERY = "SELECT * FROM " + INPUT_TABLE + ";";
+
+ private static final double[] MEANTEMPS = {85.3, 75.4};
+
+ @BeforeClass
+ public static void setup()
+ {
+ try {
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+ Statement stmt = con.createStatement();
+
+ String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+ + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+ + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+ + ")";
+ stmt.executeUpdate(createMetaTable);
+
+ String createInputTable = "CREATE TABLE IF NOT EXISTS " + INPUT_TABLE
+ + "(MONTH INT(2) not NULL, DAY INT(2), YEAR INT(4), MEANTEMP DOUBLE(10) )";
+ stmt.executeUpdate(createInputTable);
+
+ String createOutputTable = "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE
+ + "(MONTH INT(2) not NULL, MEANTEMP DOUBLE(10) )";
+ stmt.executeUpdate(createOutputTable);
+
+ String cleanTable = "truncate table " + INPUT_TABLE;
+ stmt.executeUpdate(cleanTable);
+
+ stmt = con.createStatement();
+
+ String sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 21, 2014, 85.3)";
+ stmt.executeUpdate(sql);
+ sql = "INSERT INTO " + INPUT_TABLE + " VALUES (7, 20, 2014, 75.4)";
+ stmt.executeUpdate(sql);
+ sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 18, 2014, 45.3)";
+ stmt.executeUpdate(sql);
+
+ } catch (Throwable e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @AfterClass
+ public static void cleanup()
+ {
+ try {
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(DB_URL, USER_NAME, PSW);
+ Statement stmt = con.createStatement();
+
+ String dropInputTable = "DROP TABLE " + INPUT_TABLE;
+ stmt.executeUpdate(dropInputTable);
+
+ String dropOutputTable = "DROP TABLE " + OUTPUT_TABLE;
+ stmt.executeUpdate(dropOutputTable);
+
+ } catch (Throwable e) {
+ throw Throwables.propagate(e);
+ }
+
+ }
+
+ public void setConfig(Configuration conf)
+ {
+ conf.set("dt.operator.jdbcInput.prop.store.userName", USER_NAME);
+ conf.set("dt.operator.jdbcInput.prop.store.password", PSW);
+ conf.set("dt.operator.jdbcInput.prop.store.databaseDriver", DB_DRIVER);
+ conf.set("dt.operator.jdbcInput.prop.batchSize", "5");
+ conf.set("dt.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS", INPUT_TUPLE_CLASS);
+ conf.set("dt.operator.jdbcInput.prop.store.databaseUrl", DB_URL);
+ conf.set("dt.operator.jdbcInput.prop.tableName", INPUT_TABLE);
+ conf.set("dt.operator.jdbcInput.prop.query", QUERY);
+
+ conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME);
+ conf.set("dt.operator.jdbcOutput.prop.store.password", PSW);
+ conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER);
+ conf.set("dt.operator.jdbcOutput.prop.batchSize", "5");
+ conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", OUTPUT_TUPLE_CLASS);
+ conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL);
+ conf.set("dt.operator.jdbcOutput.prop.tablename", OUTPUT_TABLE);
+ }
+
+ public int getNumEntries()
+ {
+ Connection con;
+ try {
+ con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+ Statement stmt = con.createStatement();
+
+ String countQuery = "SELECT count(DISTINCT (MONTH, MEANTEMP)) from " + OUTPUT_TABLE;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ resultSet.next();
+ return resultSet.getInt(1);
+ } catch (SQLException e) {
+ throw new RuntimeException("fetching count", e);
+ }
+ }
+
+ public Map<Integer, Double> getMaxMeanTemp()
+ {
+ Map<Integer, Double> result = new HashMap<>();
+ Connection con;
+ try {
+ con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+ Statement stmt = con.createStatement();
+
+ String countQuery = "SELECT DISTINCT * from " + OUTPUT_TABLE;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ while (resultSet.next()) {
+ result.put(resultSet.getInt("MONTH"), resultSet.getDouble("MEANTEMP"));
+
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new RuntimeException("fetching count", e);
+ }
+ }
+
+ @Test
+ public void MaxPerKeyExampleTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ setConfig(conf);
+
+ MaxPerKeyExamples app = new MaxPerKeyExamples();
+
+ lma.prepareDAG(app, conf);
+
+ LocalMode.Controller lc = lma.getController();
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return getNumEntries() == 2;
+ }
+ });
+
+ lc.run(5000);
+
+ double[] result = new double[2];
+ result[0] = getMaxMeanTemp().get(6);
+ result[1] = getMaxMeanTemp().get(7);
+ Assert.assertArrayEquals(MEANTEMPS, result, 0.0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/resources/data/word.txt
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/resources/data/word.txt b/examples/highlevelapi/src/test/resources/data/word.txt
new file mode 100644
index 0000000..7e28409
--- /dev/null
+++ b/examples/highlevelapi/src/test/resources/data/word.txt
@@ -0,0 +1,2 @@
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/resources/log4j.properties b/examples/highlevelapi/src/test/resources/log4j.properties
new file mode 100644
index 0000000..592eb19
--- /dev/null
+++ b/examples/highlevelapi/src/test/resources/log4j.properties
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=INFO,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=INFO
+#log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=WARN
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+#log4j.logger.org=INFO
+
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/resources/sampletweets.txt
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/resources/sampletweets.txt b/examples/highlevelapi/src/test/resources/sampletweets.txt
new file mode 100644
index 0000000..c113130
--- /dev/null
+++ b/examples/highlevelapi/src/test/resources/sampletweets.txt
@@ -0,0 +1,44 @@
+Last week, I published a blog announcing that Apex was accepted as an Apache incubator project.
+This week, I\u2019ll give you a little more detail on what Apache Apex is, and why it\u2019s important.
+
+Apache #Hadoop has been around for over a decade. It has become the de-facto big data platform,
+allowing enterprises to transform their business operations by turning big data into something useful, meaningful,
+and revenue-generating. #Hadoop promised the enablement of big data without incurring the costs you would normally think such powerful
+processing systems would demand. This tremendous promise of transforming business operations continues to fuel high growth in the industry.
+
+It all got started when Hadoop engineers at Yahoo! asked, \u201cHow can we build an efficient search indexing capability?\u201d
+The ensuing iterations and some inspiration resulted in the #MapReduce programming model. Although powerful, MapReduce wasn\u2019t perfect.
+
+Mastering MapReduce required a steep learning curve. Migrating applications to MapReduce required an almost complete rewrite.
+Equally worrisome was the fact that MapReduce had batch processing paradigm and \u201ccompute going to data\u201d at its core,
+thus posing a deterrent to Hadoop realizing its true potential.
+
+Expectedly enough, #MapReduce was an impediment that did little to bolster productization of big data.
+Not to be deterred, there were faster substitutes for MapReduce. Just like Hadoop, these models required deeper expertise, were tough to operate and difficult to master.
+As such, #Hadoop disrupted the way big data needs were handled, but remained largely under-productized.
+
+A decade after Hadoop was started, only a small percentage of big data projects are in production.
+Data is growing rapidly and the ability to harness big data has become a decisive competitive advantage.
+MapReduce impedes this demand (actually more of a scramble) to transform into a data-driven business.
+
+In hindsight, it is clear that in the early days, the subsequent success of Hadoop was not anticipated.
+If they had anticipated Hadoop\u2019s success, the question would have been, \u201cWhat can we do with massively distributed resources?\u201d
+The answer to this question, which came about soon after, was YARN (Hadoop 2.0), the next generation Hadoop.
+For the first time, #YARN brought the capability of exploring how distributed resources handling big data could perform \u201ca lot of things\u201d,
+thus going beyond the early MapReduce paradigm, and in a way beyond batch or even compute-going-to-data paradigms.
+YARN presented the capability to allow big data to not just become big in size, but broader in use cases. With its enabling capability as a Hadoop facilitator,
+YARN has pushed Hadoop towards realizing its true potential. The Hadoop predicament is similar to what
+cellphones would have been without the more popular features such as messaging and internet connectivity.
+
+
+In their early years, cellphones upset the landline market, but did not foster an immediate market furor till
+it transformed into the new-age \u201csmartphone\u201d with impressive features.
+YARN is most certainly the enabling factor for big data dreaming bigger and wider, and with it, Hadoop 2.0 is now a true de-facto distributed operating system.
+
+What\u2019s needed is bleeding edge YARN-based platforms capable of radically realizing Hadoop\u2019s potential
+
+Now is the right time to not only productize big data, but to see how setting it in motion can ensure realization of greater business goals.
+A Herculean task, this demands platforms that are easy to deploy, require nothing beyond everyday IT expertise, can effortlessly integrate with an existing IT infrastructure while ensuring ease of migration.
+The new-age Hadoop platforms need to be designed with an approach to reduce time-to-market by shortening the application lifecycle, from building to launching, thus quickening the realization of revenue for businesses.
+They will also have to reduce time for developers to develop, devOps to operationalize, and finally reduce time to insight for business.
+Platforms such as these will need to learn, adapt, and change to meet the burgeoning needs of the big data world.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/resources/wordcount/word.txt
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/resources/wordcount/word.txt b/examples/highlevelapi/src/test/resources/wordcount/word.txt
new file mode 100644
index 0000000..edd0f51
--- /dev/null
+++ b/examples/highlevelapi/src/test/resources/wordcount/word.txt
@@ -0,0 +1,8 @@
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+bye
\ No newline at end of file