You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/25 16:43:19 UTC
[3/6] apex-malhar git commit: Added Beam Examples and Implementations
of Accumulation.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
index 5d4c628..ecd71ae 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@ -18,19 +18,33 @@
*/
package org.apache.apex.malhar.stream.sample.cookbook;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn;
+
+import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.util.KeyValPair;
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
/**
* An example that reads the public 'Shakespeare' data, and for each word in
* the dataset that is over a given length, generates a string containing the
@@ -40,12 +54,13 @@ import com.datatorrent.lib.util.KeyValPair;
* key-grouped Collection
*
*/
-public class CombinePerKeyExamples
+@ApplicationAnnotation(name = "CombinePerKeyExamples")
+public class CombinePerKeyExamples implements StreamingApplication
{
// Use the shakespeare public BigQuery sample
private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare";
// We'll track words >= this word length across all plays in the table.
- private static final int MIN_WORD_LENGTH = 9;
+ private static final int MIN_WORD_LENGTH = 0;
/**
* Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
@@ -76,70 +91,59 @@ public class CombinePerKeyExamples
@Override
public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input)
{
- return new SampleBean(input.getValue().getKey(), input.getValue().getValue(), null);
+ return new SampleBean(input.getValue().getKey(), input.getValue().getValue());
}
}
-
+
+ /**
+ * A reduce function to concat two strings together.
+ */
+ public static class Concat extends ReduceFn<String>
+ {
+ @Override
+ public String reduce(String input1, String input2)
+ {
+ return input1 + ", " + input2;
+ }
+ }
+
/**
* Reads the public 'Shakespeare' data, and for each word in the dataset
* over a given length, generates a string containing the list of play names
* in which that word appears.
*/
- static class PlaysForWord
- extends CompositeStreamTransform<SampleBean, SampleBean>
+ private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>>
{
-
+
@Override
- public ApexStream<SampleBean> compose(ApexStream<SampleBean> inputStream)
+ public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream)
{
- // fix this later
- return inputStream.map(new ExtractLargeWordsFn())
- .window(new WindowOption.GlobalWindow())
- .reduceByKey(new ReduceFn<String>()
- {
- @Override
- public String defaultAccumulatedValue()
- {
- return "";
- }
-
- @Override
- public String accumulate(String accumulatedValue, String input)
- {
- return accumulatedValue + "," + input;
- }
-
- @Override
- public String merge(String accumulatedValue1, String accumulatedValue2)
- {
- return accumulatedValue1 + "," + accumulatedValue2;
- }
-
- @Override
- public String getOutput(String accumulatedValue)
- {
- return accumulatedValue;
- }
-
- @Override
- public String getRetraction(String value)
- {
- return value;
- }
- }, new Function.MapFunction<KeyValPair<String, String>, Tuple<KeyValPair<String, String>>>()
-
+ return inputStream
+ // Extract words from the input SampleBeam stream.
+ .map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn"))
+
+ // Apply window and trigger option to the streams.
+ .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+
+ // Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together.
+ .reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>()
{
@Override
public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input)
{
- return null;
+ return new Tuple.PlainTuple<KeyValPair<String, String>>(input);
}
- })
- .map(new FormatShakespeareOutputFn());
+ }, name("Concat"))
+
+ // Format the output back to a SampleBeam object.
+ .map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn"));
}
}
-
-
+
+
+ /**
+ * A Java Beam class that contains information about a word appears in a corpus written by Shakespeare.
+ */
public static class SampleBean
{
@@ -148,17 +152,20 @@ public class CombinePerKeyExamples
}
- public SampleBean(String word, String all_plays, String corpus)
+ public SampleBean(String word, String corpus)
{
this.word = word;
- this.all_plays = all_plays;
this.corpus = corpus;
}
-
+
+ @Override
+ public String toString()
+ {
+ return this.word + " : " + this.corpus;
+ }
+
private String word;
- private String all_plays;
-
private String corpus;
public void setWord(String word)
@@ -180,58 +187,87 @@ public class CombinePerKeyExamples
{
return corpus;
}
-
- public void setAll_plays(String all_plays)
- {
- this.all_plays = all_plays;
- }
-
- public String getAll_plays()
- {
- return all_plays;
- }
}
-
- public static class SampleInput implements InputOperator
+
+ /**
+ * A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare'
+ * data.
+ */
+ public static class SampleInput extends BaseOperator implements InputOperator
{
public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort();
-
- @Override
- public void emitTuples()
+ private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"};
+ private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"};
+ private static int i;
+
+ public static int getI()
{
-
+ return i;
}
-
+
@Override
- public void beginWindow(long l)
+ public void setup(Context.OperatorContext context)
{
-
+ super.setup(context);
+ i = 0;
}
-
+
@Override
- public void endWindow()
+ public void emitTuples()
{
-
+ while (i < 1) {
+ for (String word : words) {
+ for (String corpus : corpuses) {
+ beanOutput.emit(new SampleBean(word, corpus));
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // Ignore it
+ }
+ }
+ }
+ i++;
+ }
+
}
-
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ static List<SampleBean> result;
+
@Override
public void setup(Context.OperatorContext context)
{
-
+ result = new ArrayList<>();
}
-
- @Override
- public void teardown()
+
+ public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>()
{
-
- }
+ @Override
+ public void process(SampleBean tuple)
+ {
+ result.add(tuple);
+ }
+ };
}
-
-
- public static void main(String[] args) throws Exception
+
+ /**
+ * Populate dag using High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
{
SampleInput input = new SampleInput();
- StreamFactory.fromInput(input, input.beanOutput).addCompositeStreams(new PlaysForWord());
+ Collector collector = new Collector();
+ StreamFactory.fromInput(input, input.beanOutput, name("input"))
+ .addCompositeStreams(new PlaysForWord())
+ .print()
+ .endWith(collector, collector.input, name("Collector"))
+ .populateDag(dag);
+
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
new file mode 100644
index 0000000..53426f3
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.impl.accumulation.RemoveDuplicates;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam DeDupExample.
+ */
+@ApplicationAnnotation(name = "DeDupExample")
+public class DeDupExample implements StreamingApplication
+{
+
+ public static class Collector extends BaseOperator
+ {
+ private static Tuple.WindowedTuple<List<String>> result;
+ private static boolean done = false;
+
+ public static Tuple.WindowedTuple<List<String>> getResult()
+ {
+ return result;
+ }
+
+ public static boolean isDone()
+ {
+ return done;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ result = new Tuple.WindowedTuple<>();
+ done = false;
+ }
+
+ public transient DefaultInputPort<Tuple.WindowedTuple<List<String>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<String>>>()
+ {
+ @Override
+ public void process(Tuple.WindowedTuple<List<String>> tuple)
+ {
+ result = tuple;
+ if (result.getValue().contains("bye")) {
+ done = true;
+ }
+ }
+ };
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Collector collector = new Collector();
+
+ // Create a stream that reads from files in a local folder and output lines one by one to downstream.
+ ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
+
+ // Extract all the words from the input line of text.
+ .flatMap(new Function.FlatMapFunction<String, String>()
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
+ }
+ }, name("ExtractWords"))
+
+ // Change the words to lower case, also shutdown the app when the word "bye" is detected.
+ .map(new Function.MapFunction<String, String>()
+ {
+ @Override
+ public String f(String input)
+ {
+ return input.toLowerCase();
+ }
+ }, name("ToLowerCase"));
+
+ // Apply window and trigger option.
+ stream.window(new WindowOption.GlobalWindow(),
+ new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1)))
+
+ // Remove the duplicate words and print out the result.
+ .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates")).print().endWith(collector, collector.input)
+
+ .populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
new file mode 100644
index 0000000..3643eab
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+/**
+ * Tuple class for JDBC input of {@link MaxPerKeyExamples}.
+ */
+public class InputPojo extends Object
+{
+ private int month;
+ private int day;
+ private int year;
+ private double meanTemp;
+
+ @Override
+ public String toString()
+ {
+ return "PojoEvent [month=" + getMonth() + ", day=" + getDay() + ", year=" + getYear() + ", meanTemp=" + getMeanTemp() + "]";
+ }
+
+ public void setMonth(int month)
+ {
+ this.month = month;
+ }
+
+ public int getMonth()
+ {
+ return this.month;
+ }
+
+ public void setDay(int day)
+ {
+ this.day = day;
+ }
+
+ public int getDay()
+ {
+ return day;
+ }
+
+ public void setYear(int year)
+ {
+ this.year = year;
+ }
+
+ public int getYear()
+ {
+ return year;
+ }
+
+ public void setMeanTemp(double meanTemp)
+ {
+ this.meanTemp = meanTemp;
+ }
+
+ public double getMeanTemp()
+ {
+ return meanTemp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
new file mode 100644
index 0000000..97b2696
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.impl.accumulation.Max;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import static java.sql.Types.DOUBLE;
+import static java.sql.Types.INTEGER;
+
+import com.google.common.collect.Lists;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcFieldInfo;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * MaxPerKeyExamples Application from Beam
+ */
+@ApplicationAnnotation(name = "MaxPerKeyExamples")
+public class MaxPerKeyExamples implements StreamingApplication
+{
+
+ /**
+ * A map function to extract the mean temperature from {@link InputPojo}.
+ */
+ public static class ExtractTempFn implements Function.MapFunction<InputPojo, KeyValPair<Integer, Double>>
+ {
+ @Override
+ public KeyValPair<Integer, Double> f(InputPojo row)
+ {
+ Integer month = row.getMonth();
+ Double meanTemp = row.getMeanTemp();
+ return new KeyValPair<Integer, Double>(month, meanTemp);
+ }
+ }
+
+
+ /**
+ * A map function to format output to {@link OutputPojo}.
+ */
+ public static class FormatMaxesFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<Integer, Double>>, OutputPojo>
+ {
+ @Override
+ public OutputPojo f(Tuple.WindowedTuple<KeyValPair<Integer, Double>> input)
+ {
+ OutputPojo row = new OutputPojo();
+ row.setMonth(input.getValue().getKey());
+ row.setMeanTemp(input.getValue().getValue());
+ return row;
+ }
+ }
+
+ /**
+ * A composite transformation to perform three tasks:
+ * 1. extract the month and its mean temperature from input pojo.
+ * 2. find the maximum mean temperature for every month.
+ * 3. format the result to a output pojo object.
+ */
+ public static class MaxMeanTemp extends CompositeStreamTransform<WindowedStream<InputPojo>, WindowedStream<OutputPojo>>
+ {
+ @Override
+ public WindowedStream<OutputPojo> compose(WindowedStream<InputPojo> rows)
+ {
+ // InputPojo... => <month, meanTemp> ...
+ WindowedStream<KeyValPair<Integer, Double>> temps = rows.map(new ExtractTempFn(), name("ExtractTempFn"));
+
+ // month, meanTemp... => <month, max mean temp>...
+ WindowedStream<Tuple.WindowedTuple<KeyValPair<Integer, Double>>> tempMaxes =
+ temps.accumulateByKey(new Max<Double>(),
+ new Function.ToKeyValue<KeyValPair<Integer, Double>, Integer, Double>()
+ {
+ @Override
+ public Tuple<KeyValPair<Integer, Double>> f(KeyValPair<Integer, Double> input)
+ {
+ return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GLOBAL_WINDOW, input);
+ }
+ }, name("MaxPerMonth"));
+
+ // <month, max>... => OutputPojo...
+ WindowedStream<OutputPojo> results = tempMaxes.map(new FormatMaxesFn(), name("FormatMaxesFn"));
+
+ return results;
+ }
+ }
+
+ /**
+ * Method to set field info for {@link JdbcPOJOInputOperator}.
+ * @return
+ */
+ private List<FieldInfo> addInputFieldInfos()
+ {
+ List<FieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new FieldInfo("MONTH", "month", FieldInfo.SupportType.INTEGER));
+ fieldInfos.add(new FieldInfo("DAY", "day", FieldInfo.SupportType.INTEGER));
+ fieldInfos.add(new FieldInfo("YEAR", "year", FieldInfo.SupportType.INTEGER));
+ fieldInfos.add(new FieldInfo("MEANTEMP", "meanTemp", FieldInfo.SupportType.DOUBLE));
+ return fieldInfos;
+ }
+
+ /**
+ * Method to set field info for {@link JdbcPOJOInsertOutputOperator}.
+ * @return
+ */
+ private List<JdbcFieldInfo> addOutputFieldInfos()
+ {
+ List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new JdbcFieldInfo("MONTH", "month", JdbcFieldInfo.SupportType.INTEGER, INTEGER));
+ fieldInfos.add(new JdbcFieldInfo("MEANTEMP", "meanTemp", JdbcFieldInfo.SupportType.DOUBLE, DOUBLE));
+ return fieldInfos;
+ }
+
+
+ /**
+ * Populate the dag using High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ JdbcPOJOInputOperator jdbcInput = new JdbcPOJOInputOperator();
+ jdbcInput.setFieldInfos(addInputFieldInfos());
+
+ JdbcStore store = new JdbcStore();
+ jdbcInput.setStore(store);
+
+ JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
+ jdbcOutput.setFieldInfos(addOutputFieldInfos());
+ JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+ jdbcOutput.setStore(outputStore);
+
+ // Create stream that reads from a Jdbc Input.
+ ApexStream<Object> stream = StreamFactory.fromInput(jdbcInput, jdbcInput.outputPort, name("jdbcInput"))
+
+ // Apply window and trigger option to the stream.
+ .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+
+ // Because Jdbc Input sends out a stream of Object, need to cast them to InputPojo.
+ .map(new Function.MapFunction<Object, InputPojo>()
+ {
+ @Override
+ public InputPojo f(Object input)
+ {
+ return (InputPojo)input;
+ }
+ }, name("ObjectToInputPojo"))
+
+ // Plug in the composite transformation to the stream to calculate the maximum temperature for each month.
+ .addCompositeStreams(new MaxMeanTemp())
+
+ // Cast the resulted OutputPojo to Object for Jdbc Output to consume.
+ .map(new Function.MapFunction<OutputPojo, Object>()
+ {
+ @Override
+ public Object f(OutputPojo input)
+ {
+ return (Object)input;
+ }
+ }, name("OutputPojoToObject"))
+
+ // Output the result to Jdbc Output.
+ .endWith(jdbcOutput, jdbcOutput.input, name("jdbcOutput"));
+
+ stream.populateDag(dag);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
new file mode 100644
index 0000000..db2a09e
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+/**
+ * OutputPojo Tuple Class for jdbcOutput of {@link MaxPerKeyExamples}.
+ */
+public class OutputPojo
+{
+ private int month;
+ private double meanTemp;
+
+ @Override
+ public String toString()
+ {
+ return "PojoEvent [month=" + getMonth() + ", meanTemp=" + getMeanTemp() + "]";
+ }
+
+ public void setMonth(int month)
+ {
+ this.month = month;
+ }
+
+ public int getMonth()
+ {
+ return this.month;
+ }
+
+ public void setMeanTemp(double meanTemp)
+ {
+ this.meanTemp = meanTemp;
+ }
+
+ public double getMeanTemp()
+ {
+ return meanTemp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
index 903f624..bf23e3a 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
@@ -24,6 +24,7 @@ import java.util.Objects;
import org.joda.time.Duration;
import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
@@ -85,31 +86,31 @@ import com.datatorrent.lib.util.KeyValPair;
* Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after
* the window duration, until the first pane of non-late data has been emitted, to see more
* interesting results.
- * {@code SELECT * FROM enter_table_name WHERE trigger_type = "default" ORDER BY window DESC}
+ * {@code SELECT * FROM enter_table_name WHERE triggerType = "default" ORDER BY window DESC}
*
* <p> To see the late data i.e. dropped by the default trigger,
- * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "withAllowedLateness" and
- * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time}
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "withAllowedLateness" and
+ * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processingTime}
*
* <p>To see the the difference between accumulation mode and discarding mode,
* {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND
- * (trigger_type = "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY
- * window DESC, processing_time}
+ * (triggerType = "withAllowedLateness" or triggerType = "sequential") and freeway = "5" ORDER BY
+ * window DESC, processingTime}
*
* <p> To see speculative results every minute,
- * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "speculative" and freeway = "5"
- * ORDER BY window DESC, processing_time}
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "speculative" and freeway = "5"
+ * ORDER BY window DESC, processingTime}
*
* <p> To see speculative results every five minutes after the end of the window
- * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY"
- * and freeway = "5" ORDER BY window DESC, processing_time}
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "sequential" and timing != "EARLY"
+ * and freeway = "5" ORDER BY window DESC, processingTime}
*
* <p> To see the first and the last pane for a freeway in a window for all the trigger types,
* {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window}
*
* <p> To reduce the number of results for each query we can add additional where clauses.
* For examples, To see the results of the default trigger,
- * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "default" AND freeway = "5" AND
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "default" AND freeway = "5" AND
* window = "<enter_window_interval>"}
*
* <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
@@ -135,7 +136,7 @@ public class TriggerExample
* The example uses "freeway" as the key. Event time is the timestamp associated with the data
* element and processing time is the time when the data element gets processed in the pipeline.
* For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window.
- * Key (freeway) | Value (total_flow) | event time | processing time
+ * Key (freeway) | Value (totalFlow) | event time | processing time
* 5 | 50 | 10:00:03 | 10:00:47
* 5 | 30 | 10:01:00 | 10:01:03
* 5 | 30 | 10:02:00 | 11:07:00
@@ -157,7 +158,7 @@ public class TriggerExample
* close at 10:44:59, when the watermark passes 10:30:00.
*/
static class CalculateTotalFlow
- extends CompositeStreamTransform<String, SampleBean>
+ extends CompositeStreamTransform<ApexStream<String>, WindowedStream<SampleBean>>
{
private int windowDuration;
@@ -167,7 +168,7 @@ public class TriggerExample
}
@Override
- public ApexStream<SampleBean> compose(ApexStream<String> inputStream)
+ public WindowedStream<SampleBean> compose(ApexStream<String> inputStream)
{
// Concept #1: The default triggering behavior
// By default Dataflow uses a trigger which fires when the watermark has passed the end of the
@@ -182,14 +183,14 @@ public class TriggerExample
// The results for the example above with the default trigger and zero allowed lateness
// would be:
- // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
// 5 | 260 | 6 | true | true | ON_TIME
// At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
// result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
// late, and dropped.
-
- ApexStream<SampleBean> defaultTriggerResults = inputStream
+
+ WindowedStream<SampleBean> defaultTriggerResults = inputStream
.window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
new TriggerOption().discardingFiredPanes())
.addCompositeStreams(new TotalFlow("default"));
@@ -205,13 +206,13 @@ public class TriggerExample
// The results for the example above with the default trigger and ONE_DAY allowed lateness
// would be:
- // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
// 5 | 260 | 6 | true | false | ON_TIME
// 5 | 60 | 1 | false | false | LATE
// 5 | 30 | 1 | false | false | LATE
// 5 | 20 | 1 | false | false | LATE
// 5 | 60 | 1 | false | false | LATE
- ApexStream<SampleBean> withAllowedLatenessResults = inputStream
+ WindowedStream<SampleBean> withAllowedLatenessResults = inputStream
.window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
new TriggerOption().discardingFiredPanes(),
Duration.standardDays(1))
@@ -226,7 +227,7 @@ public class TriggerExample
// We also use accumulatingFiredPanes to build up the results across each pane firing.
// The results for the example above for this trigger would be:
- // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
// 5 | 80 | 2 | true | false | EARLY
// 5 | 100 | 3 | false | false | EARLY
// 5 | 260 | 6 | false | false | EARLY
@@ -258,7 +259,7 @@ public class TriggerExample
// Every pane produced will either be EARLY, ON_TIME or LATE.
// The results for the example above for this trigger would be:
- // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
// 5 | 80 | 2 | true | false | EARLY
// 5 | 100 | 3 | false | false | EARLY
// 5 | 260 | 6 | false | false | EARLY
@@ -267,7 +268,7 @@ public class TriggerExample
// 5 | 430 | 10 | false | false | LATE
// For more possibilities of how to build advanced triggers, see {@link Trigger}.
- ApexStream<SampleBean> sequentialResults = inputStream
+ WindowedStream<SampleBean> sequentialResults = inputStream
.window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
// Speculative every ONE_MINUTE
new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
@@ -293,7 +294,7 @@ public class TriggerExample
* objects, to save to BigQuery.
*/
static class TotalFlow extends
- CompositeStreamTransform<String, SampleBean>
+ CompositeStreamTransform<WindowedStream<String>, WindowedStream<SampleBean>>
{
private String triggerType;
@@ -303,13 +304,10 @@ public class TriggerExample
}
@Override
- public ApexStream<SampleBean> compose(ApexStream<String> inputStream)
+ public WindowedStream<SampleBean> compose(WindowedStream<String> inputStream)
{
- if (!(inputStream instanceof WindowedStream)) {
- throw new RuntimeException("Not supported here");
- }
- WindowedStream<String> windowedStream = (WindowedStream<String>)inputStream;
- ApexStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = windowedStream
+
+ WindowedStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = inputStream
.groupByKey(new ExtractFlowInfo());
return flowPerFreeway
@@ -361,13 +359,13 @@ public class TriggerExample
{
}
- private String trigger_type;
+ private String triggerType;
private String freeway;
- private int total_flow;
+ private int totalFlow;
- private long number_of_records;
+ private long numberOfRecords;
private String window;
@@ -377,9 +375,9 @@ public class TriggerExample
private Date timing;
- private Date event_time;
+ private Date eventTime;
- private Date processing_time;
+ private Date processingTime;
@Override
public boolean equals(Object o)
@@ -391,50 +389,49 @@ public class TriggerExample
return false;
}
SampleBean that = (SampleBean)o;
- return total_flow == that.total_flow &&
- number_of_records == that.number_of_records &&
+ return totalFlow == that.totalFlow &&
+ numberOfRecords == that.numberOfRecords &&
isFirst == that.isFirst &&
isLast == that.isLast &&
- Objects.equals(trigger_type, that.trigger_type) &&
+ Objects.equals(triggerType, that.triggerType) &&
Objects.equals(freeway, that.freeway) &&
Objects.equals(window, that.window) &&
Objects.equals(timing, that.timing) &&
- Objects.equals(event_time, that.event_time) &&
- Objects.equals(processing_time, that.processing_time);
+ Objects.equals(eventTime, that.eventTime) &&
+ Objects.equals(processingTime, that.processingTime);
}
@Override
public int hashCode()
{
return Objects
- .hash(trigger_type, freeway, total_flow, number_of_records, window, isFirst, isLast, timing, event_time,
- processing_time);
+ .hash(triggerType, freeway, totalFlow, numberOfRecords, window, isFirst, isLast, timing, eventTime,
+ processingTime);
}
- public SampleBean(String trigger_type, String freeway, int total_flow, long number_of_records, String window,
- boolean isFirst, boolean isLast, Date timing, Date event_time, Date processing_time)
+ public SampleBean(String triggerType, String freeway, int totalFlow, long numberOfRecords, String window, boolean isFirst, boolean isLast, Date timing, Date eventTime, Date processingTime)
{
- this.trigger_type = trigger_type;
+ this.triggerType = triggerType;
this.freeway = freeway;
- this.total_flow = total_flow;
- this.number_of_records = number_of_records;
+ this.totalFlow = totalFlow;
+ this.numberOfRecords = numberOfRecords;
this.window = window;
this.isFirst = isFirst;
this.isLast = isLast;
this.timing = timing;
- this.event_time = event_time;
- this.processing_time = processing_time;
+ this.eventTime = eventTime;
+ this.processingTime = processingTime;
}
- public String getTrigger_type()
+ public String getTriggerType()
{
- return trigger_type;
+ return triggerType;
}
- public void setTrigger_type(String trigger_type)
+ public void setTriggerType(String triggerType)
{
- this.trigger_type = trigger_type;
+ this.triggerType = triggerType;
}
public String getFreeway()
@@ -447,24 +444,24 @@ public class TriggerExample
this.freeway = freeway;
}
- public int getTotal_flow()
+ public int getTotalFlow()
{
- return total_flow;
+ return totalFlow;
}
- public void setTotal_flow(int total_flow)
+ public void setTotalFlow(int totalFlow)
{
- this.total_flow = total_flow;
+ this.totalFlow = totalFlow;
}
- public long getNumber_of_records()
+ public long getNumberOfRecords()
{
- return number_of_records;
+ return numberOfRecords;
}
- public void setNumber_of_records(long number_of_records)
+ public void setNumberOfRecords(long numberOfRecords)
{
- this.number_of_records = number_of_records;
+ this.numberOfRecords = numberOfRecords;
}
public String getWindow()
@@ -507,24 +504,24 @@ public class TriggerExample
this.timing = timing;
}
- public Date getEvent_time()
+ public Date getEventTime()
{
- return event_time;
+ return eventTime;
}
- public void setEvent_time(Date event_time)
+ public void setEventTime(Date eventTime)
{
- this.event_time = event_time;
+ this.eventTime = eventTime;
}
- public Date getProcessing_time()
+ public Date getProcessingTime()
{
- return processing_time;
+ return processingTime;
}
- public void setProcessing_time(Date processing_time)
+ public void setProcessingTime(Date processingTime)
{
- this.processing_time = processing_time;
+ this.processingTime = processingTime;
}
}
@@ -532,10 +529,10 @@ public class TriggerExample
* Extract the freeway and total flow in a reading.
* Freeway is used as key since we are calculating the total flow for each freeway.
*/
- static class ExtractFlowInfo implements Function.MapFunction<String, KeyValPair<String, Integer>>
+ static class ExtractFlowInfo implements Function.ToKeyValue<String, String, Integer>
{
@Override
- public KeyValPair<String, Integer> f(String input)
+ public Tuple<KeyValPair<String, Integer>> f(String input)
{
String[] laneInfo = input.split(",");
if (laneInfo[0].equals("timestamp")) {
@@ -553,7 +550,7 @@ public class TriggerExample
if (totalFlow == null || totalFlow <= 0) {
return null;
}
- return new KeyValPair<>(freeway, totalFlow);
+ return new Tuple.PlainTuple<>(new KeyValPair<>(freeway, totalFlow));
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/resources/META-INF/properties.xml b/demos/highlevelapi/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..ead0460
--- /dev/null
+++ b/demos/highlevelapi/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <!--
+ <property>
+ <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+ <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+ </property>
+ -->
+
+ <!-- Properties for TwitterAutoComplete, please fill out all of them to make the application work -->
+ <property>
+ <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret</name>
+ <value></value>
+ </property>
+
+ <!-- Properties for StreamingWordExtract -->
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.userName</name>
+ <value>root</value>
+ </property>
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.password</name>
+ <value>password</value>
+ </property>
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseDriver</name>
+ <value>org.hsqldb.jdbcDriver</value>
+ </property>
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.batchSize</name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name>
+ <value>org.apache.apex.malhar.stream.sample.complete.PojoEvent</value>
+ </property>
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseUrl</name>
+ <value>jdbc:hsqldb:mem:test</value>
+ </property>
+ <property>
+ <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.tablename</name>
+ <value>Test</value>
+ </property>
+
+ <!-- Properties for MaxPerKeyExamples -->
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.userName</name>
+ <value>root</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.password</name>
+ <value>password</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseDriver</name>
+ <value>org.hsqldb.jdbcDriver</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.batchSize</name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS</name>
+ <value>org.apache.apex.malhar.stream.sample.cookbook.InputPojo</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseUrl</name>
+ <value>jdbc:hsqldb:mem:test</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.tableName</name>
+ <value>InputTable</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.query</name>
+ <value>SELECT * FROM InputTable;</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.userName</name>
+ <value>root</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.password</name>
+ <value>password</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseDriver</name>
+ <value>org.hsqldb.jdbcDriver</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.batchSize</name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name>
+ <value>org.apache.apex.malhar.stream.sample.cookbook.OutputPojo</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseUrl</name>
+ <value>jdbc:hsqldb:mem:test</value>
+ </property>
+ <property>
+ <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.tablename</name>
+ <value>OutputTable</value>
+ </property>
+
+</configuration>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
new file mode 100644
index 0000000..101953f
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test for {@link MinimalWordCount}.
+ */
+public class MinimalWordCountTest
+{
+ @Test
+ public void MinimalWordCountTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+
+ MinimalWordCount app = new MinimalWordCount();
+
+ lma.prepareDAG(app, conf);
+
+ LocalMode.Controller lc = lma.getController();
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return MinimalWordCount.Collector.isDone();
+ }
+ });
+
+ lc.run(10000);
+
+ Assert.assertTrue(MinimalWordCount.Collector.result.get("error") == 7);
+ Assert.assertTrue(MinimalWordCount.Collector.result.get("word") == 119);
+ Assert.assertTrue(MinimalWordCount.Collector.result.get("bye") == 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
new file mode 100644
index 0000000..952356f
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app
+ * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties
+ * for the application before running it:
+ * Your application consumer key,
+ * Your application consumer secret,
+ * Your twitter access token, and
+ * Your twitter access token secret.
+ */
+public class WindowedWordCountTest
+{
+ @Test
+ public void WindowedWordCountTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new WindowedWordCount(), conf);
+ LocalMode.Controller lc = lma.getController();
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return WindowedWordCount.TextInput.isDone();
+ }
+ });
+
+ lc.run(60000);
+
+ Assert.assertEquals(127, countSum(WindowedWordCount.Collector.getResult()));
+ Assert.assertEquals(28, countSumWord(WindowedWordCount.Collector.getResult(), "word2"));
+ Assert.assertEquals(7, countSumWord(WindowedWordCount.Collector.getResult(), "error"));
+ Assert.assertEquals(21, countSumWord(WindowedWordCount.Collector.getResult(), "word9"));
+ Assert.assertEquals(1, countSumWord(WindowedWordCount.Collector.getResult(), "bye"));
+ }
+
+ public long countSum(Map<KeyValPair<Long, String>, Long> map)
+ {
+ long sum = 0;
+ for (long count : map.values()) {
+ sum += count;
+ }
+ return sum;
+ }
+
+ public long countSumWord(Map<KeyValPair<Long, String>, Long> map, String word)
+ {
+ long sum = 0;
+ for (Map.Entry<KeyValPair<Long, String>, Long> entry : map.entrySet()) {
+ if (entry.getKey().getValue().equals(word)) {
+ sum += entry.getValue();
+ }
+ }
+ return sum;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
new file mode 100644
index 0000000..dc236f9
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the AutoComplete Application
+ */
+public class AutoCompleteTest
+{
+
+ @Test
+ public void AutoCompleteTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new AutoComplete(), conf);
+ LocalMode.Controller lc = lma.getController();
+
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return AutoComplete.TweetsInput.isDone();
+ }
+ });
+
+ lc.run(200000);
+
+ Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("chi"));
+ Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("china"));
+ Assert.assertEquals(2, AutoComplete.Collector.getResult().get("china").get(0).getCount());
+ Assert.assertEquals("China", AutoComplete.Collector.getResult().get("china").get(0).getValue());
+ Assert.assertEquals(2, AutoComplete.Collector.getResult().get("d").size());
+ Assert.assertEquals(3, AutoComplete.Collector.getResult().get("f").size());
+ Assert.assertTrue(AutoComplete.Collector.getResult().get("f").get(0).getCount() >= AutoComplete.Collector.getResult().get("f").get(1).getCount());
+ Assert.assertTrue(AutoComplete.Collector.getResult().get("f").get(1).getCount() >= AutoComplete.Collector.getResult().get("f").get(2).getCount());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
new file mode 100644
index 0000000..bf9b030
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Callable;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing StreamingWordExtract application
+ */
+public class StreamingWordExtractTest
+{
+ private static final String TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.complete.PojoEvent";
+ private static final String DB_DRIVER = "org.h2.Driver";
+ private static final String DB_URL = "jdbc:h2:~/test";
+ private static final String TABLE_NAME = "Test";
+ private static final String USER_NAME = "root";
+ private static final String PSW = "password";
+
+ @BeforeClass
+ public static void setup()
+ {
+ try {
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+ Statement stmt = con.createStatement();
+
+ String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+ + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+ + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+ + ")";
+ stmt.executeUpdate(createMetaTable);
+
+ String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ + "(STRINGVALUE VARCHAR(255))";
+ stmt.executeUpdate(createTable);
+
+ } catch (Throwable e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @After
+ public void cleanTable()
+ {
+ try {
+ Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+ Statement stmt = con.createStatement();
+ String dropTable = "drop table " + TABLE_NAME;
+ stmt.executeUpdate(dropTable);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void setConfig(Configuration conf)
+ {
+ conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME);
+ conf.set("dt.operator.jdbcOutput.prop.store.password", PSW);
+ conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER);
+ conf.set("dt.operator.jdbcOutput.prop.batchSize", "5");
+ conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", TUPLE_CLASS);
+ conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL);
+ conf.set("dt.operator.jdbcOutput.prop.tablename", TABLE_NAME);
+ }
+
+ public int getNumOfEventsInStore()
+ {
+ Connection con;
+ try {
+ con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+ Statement stmt = con.createStatement();
+
+ String countQuery = "SELECT count(*) from " + TABLE_NAME;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ resultSet.next();
+ return resultSet.getInt(1);
+ } catch (SQLException e) {
+ throw new RuntimeException("fetching count", e);
+ }
+ }
+
+ @Test
+ public void StreamingWordExtractTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ setConfig(conf);
+ StreamingWordExtract app = new StreamingWordExtract();
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return getNumOfEventsInStore() == 36;
+ }
+ });
+
+ lc.run(10000);
+
+ Assert.assertEquals(app.getWordCount(), getNumOfEventsInStore());
+ Assert.assertEquals(app.getEntriesMapped(), getNumOfEventsInStore());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
new file mode 100644
index 0000000..f8ec086
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the {@link TopWikipediaSessions} Application.
+ */
+public class TopWikipediaSessionsTest
+{
+ @Test
+ public void TopWikipediaSessionsTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new TopWikipediaSessions(), conf);
+ LocalMode.Controller lc = lma.getController();
+
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return TopWikipediaSessions.SessionGen.getTupleCount() >= 250;
+ }
+ });
+
+ lc.run(30000);
+
+ for (int i = 0; i < TopWikipediaSessions.Collector.getResult().size(); i++) {
+ Assert.assertTrue(isInOrder(TopWikipediaSessions.Collector.getResult().get(i)));
+ }
+ }
+
+ public boolean isInOrder(List<TopWikipediaSessions.TempWrapper> input)
+ {
+ if (input.size() == 0 || input.size() == 1) {
+ return true;
+ }
+ for (int i = 0; i < input.size() - 2; i++) {
+ if (input.get(i).getValue().getValue() < input.get(i + 1).getValue().getValue()) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
new file mode 100644
index 0000000..e363ca7
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the {@link TrafficRoutes} Application.
+ */
+public class TrafficRoutesTest
+{
+
+ @Test
+ public void TrafficRoutesTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new TrafficRoutes(), conf);
+ LocalMode.Controller lc = lma.getController();
+
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return TrafficRoutes.InfoGen.getTupleCount() >= 100;
+ }
+ });
+
+ lc.run(60000);
+
+ Assert.assertTrue(!TrafficRoutes.Collector.getResult().isEmpty());
+ for (Map.Entry<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> entry : TrafficRoutes.Collector.getResult().entrySet()) {
+ Assert.assertTrue(entry.getValue().getKey() <= 75);
+ Assert.assertTrue(entry.getValue().getKey() >= 55);
+ Assert.assertTrue(entry.getKey().getValue().equals("SDRoute1") || entry.getKey().getValue().equals("SDRoute2"));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
new file mode 100644
index 0000000..9ba2f25
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app
+ * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties
+ * for the application before running it:
+ * Your application consumer key,
+ * Your application consumer secret,
+ * Your twitter access token, and
+ * Your twitter access token secret.
+ *
+ * This test is mainly for local demonstration purpose. Default time to run the application is 1 minute, please
+ * set the time you need to run the application before you run.
+ */
+public class TwitterAutoCompleteTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(org.apache.apex.malhar.stream.sample.complete.AutoCompleteTest.class);
+
+ @Test
+ @Ignore
+ public void TwitterAutoCompleteTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ //uncomment the following lines and change YOUR_XXX to the corresponding information needed.
+ //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey", "YOUR_CONSUMERKEY");
+ //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret", "YOUR_CONSUERSECRET");
+ //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken", "YOUR_ACCESSTOKEN");
+ //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret", "YOUR_TOKENSECRET");
+ lma.prepareDAG(new TwitterAutoComplete(), conf);
+ LocalMode.Controller lc = lma.getController();
+ long start = System.currentTimeMillis();
+ lc.run(60000); // Set your desired time to run the application here.
+ long end = System.currentTimeMillis();
+ long time = end - start;
+ logger.info("Test used " + time + " ms");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
new file mode 100644
index 0000000..5858013
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.concurrent.Callable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test for {@link CombinePerKeyExamples}.
+ */
+public class CombinePerKeyExamplesTest
+{
+ @Test
+ public void CombinePerKeyExamplesTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+
+ CombinePerKeyExamples app = new CombinePerKeyExamples();
+
+ lma.prepareDAG(app, conf);
+
+ LocalMode.Controller lc = lma.getController();
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return CombinePerKeyExamples.SampleInput.getI() >= 1;
+ }
+ });
+ lc.run(100000);
+
+ Assert.assertTrue(CombinePerKeyExamples.Collector.result.get(CombinePerKeyExamples.Collector.result.size() - 1).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
new file mode 100644
index 0000000..ed4ddb4
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+
+import com.datatorrent.stram.StramLocalCluster;
+
+
+/**
+ * Test for {@link DeDupExample}.
+ */
+public class DeDupExampleTest
+{
+ @Test
+ public void DeDupExampleTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+
+ DeDupExample app = new DeDupExample();
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return DeDupExample.Collector.isDone();
+ }
+ });
+ lc.run(50000);
+
+ Assert.assertEquals(9, DeDupExample.Collector.getResult().getValue().size());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
new file mode 100644
index 0000000..51981de
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test for MaxPerKeyExamples Application.
+ */
+public class MaxPerKeyExamplesTest
+{
+
+ private static final String INPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.InputPojo";
+ private static final String OUTPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.OutputPojo";
+ private static final String DB_DRIVER = "org.h2.Driver";
+ private static final String DB_URL = "jdbc:h2:~/test";
+ private static final String INPUT_TABLE = "InputTable";
+ private static final String OUTPUT_TABLE = "OutputTable";
+ private static final String USER_NAME = "root";
+ private static final String PSW = "password";
+ private static final String QUERY = "SELECT * FROM " + INPUT_TABLE + ";";
+
+ private static final double[] MEANTEMPS = {85.3, 75.4};
+
+ @BeforeClass
+ public static void setup()
+ {
+ try {
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+ Statement stmt = con.createStatement();
+
+ String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+ + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+ + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+ + ")";
+ stmt.executeUpdate(createMetaTable);
+
+ String createInputTable = "CREATE TABLE IF NOT EXISTS " + INPUT_TABLE
+ + "(MONTH INT(2) not NULL, DAY INT(2), YEAR INT(4), MEANTEMP DOUBLE(10) )";
+ stmt.executeUpdate(createInputTable);
+
+ String createOutputTable = "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE
+ + "(MONTH INT(2) not NULL, MEANTEMP DOUBLE(10) )";
+ stmt.executeUpdate(createOutputTable);
+
+ String cleanTable = "truncate table " + INPUT_TABLE;
+ stmt.executeUpdate(cleanTable);
+
+ stmt = con.createStatement();
+
+ String sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 21, 2014, 85.3)";
+ stmt.executeUpdate(sql);
+ sql = "INSERT INTO " + INPUT_TABLE + " VALUES (7, 20, 2014, 75.4)";
+ stmt.executeUpdate(sql);
+ sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 18, 2014, 45.3)";
+ stmt.executeUpdate(sql);
+
+ } catch (Throwable e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @AfterClass
+ public static void cleanup()
+ {
+ try {
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(DB_URL, USER_NAME, PSW);
+ Statement stmt = con.createStatement();
+
+ String dropInputTable = "DROP TABLE " + INPUT_TABLE;
+ stmt.executeUpdate(dropInputTable);
+
+ String dropOutputTable = "DROP TABLE " + OUTPUT_TABLE;
+ stmt.executeUpdate(dropOutputTable);
+
+ } catch (Throwable e) {
+ throw Throwables.propagate(e);
+ }
+
+ }
+
+ public void setConfig(Configuration conf)
+ {
+ conf.set("dt.operator.jdbcInput.prop.store.userName", USER_NAME);
+ conf.set("dt.operator.jdbcInput.prop.store.password", PSW);
+ conf.set("dt.operator.jdbcInput.prop.store.databaseDriver", DB_DRIVER);
+ conf.set("dt.operator.jdbcInput.prop.batchSize", "5");
+ conf.set("dt.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS", INPUT_TUPLE_CLASS);
+ conf.set("dt.operator.jdbcInput.prop.store.databaseUrl", DB_URL);
+ conf.set("dt.operator.jdbcInput.prop.tableName", INPUT_TABLE);
+ conf.set("dt.operator.jdbcInput.prop.query", QUERY);
+
+ conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME);
+ conf.set("dt.operator.jdbcOutput.prop.store.password", PSW);
+ conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER);
+ conf.set("dt.operator.jdbcOutput.prop.batchSize", "5");
+ conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", OUTPUT_TUPLE_CLASS);
+ conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL);
+ conf.set("dt.operator.jdbcOutput.prop.tablename", OUTPUT_TABLE);
+ }
+
+ public int getNumEntries()
+ {
+ Connection con;
+ try {
+ con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+ Statement stmt = con.createStatement();
+
+ String countQuery = "SELECT count(DISTINCT (MONTH, MEANTEMP)) from " + OUTPUT_TABLE;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ resultSet.next();
+ return resultSet.getInt(1);
+ } catch (SQLException e) {
+ throw new RuntimeException("fetching count", e);
+ }
+ }
+
+ public Map<Integer, Double> getMaxMeanTemp()
+ {
+ Map<Integer, Double> result = new HashMap<>();
+ Connection con;
+ try {
+ con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+ Statement stmt = con.createStatement();
+
+ String countQuery = "SELECT DISTINCT * from " + OUTPUT_TABLE;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ while (resultSet.next()) {
+ result.put(resultSet.getInt("MONTH"), resultSet.getDouble("MEANTEMP"));
+
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new RuntimeException("fetching count", e);
+ }
+ }
+
+ @Test
+ public void MaxPerKeyExampleTest() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ setConfig(conf);
+
+ MaxPerKeyExamples app = new MaxPerKeyExamples();
+
+ lma.prepareDAG(app, conf);
+
+ LocalMode.Controller lc = lma.getController();
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return getNumEntries() == 2;
+ }
+ });
+
+ lc.run(5000);
+
+ double[] result = new double[2];
+ result[0] = getMaxMeanTemp().get(6);
+ result[1] = getMaxMeanTemp().get(7);
+ Assert.assertArrayEquals(MEANTEMPS, result, 0.0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/resources/data/word.txt
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/resources/data/word.txt b/demos/highlevelapi/src/test/resources/data/word.txt
new file mode 100644
index 0000000..7e28409
--- /dev/null
+++ b/demos/highlevelapi/src/test/resources/data/word.txt
@@ -0,0 +1,2 @@
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error