You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:10 UTC
[05/30] apex-malhar git commit: Renamed demos to examples. Packages
and artifactid names are changed as suggested.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/OutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/OutputCollectorImpl.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/OutputCollectorImpl.java
new file mode 100644
index 0000000..31337c1
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/OutputCollectorImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.OutputCollector;
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/**
+ * <p>OutputCollectorImpl class.</p>
+ *
+ * @since 0.9.0
+ */
+@SuppressWarnings("unchecked")
+public class OutputCollectorImpl<K extends Object, V extends Object> implements OutputCollector<K, V>
+{
+ private static final Logger logger = LoggerFactory.getLogger(OutputCollectorImpl.class);
+
+ private List<KeyHashValPair<K, V>> list = new ArrayList<KeyHashValPair<K, V>>();
+
+ public List<KeyHashValPair<K, V>> getList()
+ {
+ return list;
+ }
+
+ private transient SerializationFactory serializationFactory;
+ private transient Configuration conf = null;
+
+ public OutputCollectorImpl()
+ {
+ conf = new Configuration();
+ serializationFactory = new SerializationFactory(conf);
+
+ }
+
+ private <T> T cloneObj(T t) throws IOException
+ {
+ Serializer<T> keySerializer;
+ Class<T> keyClass;
+ PipedInputStream pis = new PipedInputStream();
+ PipedOutputStream pos = new PipedOutputStream(pis);
+ keyClass = (Class<T>)t.getClass();
+ keySerializer = serializationFactory.getSerializer(keyClass);
+ keySerializer.open(pos);
+ keySerializer.serialize(t);
+ Deserializer<T> keyDesiralizer = serializationFactory.getDeserializer(keyClass);
+ keyDesiralizer.open(pis);
+ T clonedArg0 = keyDesiralizer.deserialize(null);
+ pos.close();
+ pis.close();
+ keySerializer.close();
+ keyDesiralizer.close();
+ return clonedArg0;
+
+ }
+
+ @Override
+ public void collect(K arg0, V arg1) throws IOException
+ {
+ if (conf == null) {
+ conf = new Configuration();
+ serializationFactory = new SerializationFactory(conf);
+ }
+ list.add(new KeyHashValPair<K, V>(cloneObj(arg0), cloneObj(arg1)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReduceOperator.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReduceOperator.java
new file mode 100644
index 0000000..cdc5ec9
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReduceOperator.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/**
+ * <p>ReduceOperator class.</p>
+ *
+ * @since 0.9.0
+ */
+@SuppressWarnings({ "deprecation", "unused" })
+public class ReduceOperator<K1, V1, K2, V2> implements Operator
+{
+ private static final Logger logger = LoggerFactory.getLogger(ReduceOperator.class);
+
+ private Class<? extends Reducer<K1, V1, K2, V2>> reduceClass;
+ private transient Reducer<K1, V1, K2, V2> reduceObj;
+ private transient Reporter reporter;
+ private OutputCollector<K2, V2> outputCollector;
+ private String configFile;
+
+ public Class<? extends Reducer<K1, V1, K2, V2>> getReduceClass()
+ {
+ return reduceClass;
+ }
+
+ public void setReduceClass(Class<? extends Reducer<K1, V1, K2, V2>> reduceClass)
+ {
+ this.reduceClass = reduceClass;
+ }
+
+ public String getConfigFile()
+ {
+ return configFile;
+ }
+
+ public void setConfigFile(String configFile)
+ {
+ this.configFile = configFile;
+ }
+
+ private int numberOfMappersRunning = -1;
+ private int operatorId;
+
+ public final transient DefaultInputPort<KeyHashValPair<Integer, Integer>> inputCount = new DefaultInputPort<KeyHashValPair<Integer, Integer>>()
+ {
+ @Override
+ public void process(KeyHashValPair<Integer, Integer> tuple)
+ {
+ logger.info("processing {}", tuple);
+ if (numberOfMappersRunning == -1) {
+ numberOfMappersRunning = tuple.getValue();
+ } else {
+ numberOfMappersRunning += tuple.getValue();
+ }
+
+ }
+
+ };
+
+ public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>();
+ private Map<K1, List<V1>> cacheObject;
+ public final transient DefaultInputPort<KeyHashValPair<K1, V1>> input = new DefaultInputPort<KeyHashValPair<K1, V1>>()
+ {
+ @Override
+ public void process(KeyHashValPair<K1, V1> tuple)
+ {
+ // logger.info("processing tupple {}",tuple);
+ List<V1> list = cacheObject.get(tuple.getKey());
+ if (list == null) {
+ list = new ArrayList<V1>();
+ list.add(tuple.getValue());
+ cacheObject.put(tuple.getKey(), list);
+ } else {
+ list.add(tuple.getValue());
+ }
+ }
+
+ };
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ reporter = new ReporterImpl(ReporterImpl.ReporterType.Reducer, new Counters());
+ if (context != null) {
+ operatorId = context.getId();
+ }
+ cacheObject = new HashMap<K1, List<V1>>();
+ outputCollector = new OutputCollectorImpl<K2, V2>();
+ if (reduceClass != null) {
+ try {
+ reduceObj = reduceClass.newInstance();
+ } catch (Exception e) {
+ logger.info("can't instantiate object {}", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ Configuration conf = new Configuration();
+ InputStream stream = null;
+ if (configFile != null && configFile.length() > 0) {
+ logger.info("system /{}", configFile);
+ stream = ClassLoader.getSystemResourceAsStream("/" + configFile);
+ if (stream == null) {
+ logger.info("system {}", configFile);
+ stream = ClassLoader.getSystemResourceAsStream(configFile);
+ }
+ }
+ if (stream != null) {
+ logger.info("found our stream... so adding it");
+ conf.addResource(stream);
+ }
+ reduceObj.configure(new JobConf(conf));
+ }
+
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (numberOfMappersRunning == 0) {
+ for (Map.Entry<K1, List<V1>> e : cacheObject.entrySet()) {
+ try {
+ reduceObj.reduce(e.getKey(), e.getValue().iterator(), outputCollector, reporter);
+ } catch (IOException e1) {
+ logger.info(e1.getMessage());
+ throw new RuntimeException(e1);
+ }
+ }
+ List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList();
+ for (KeyHashValPair<K2, V2> e : list) {
+ output.emit(e);
+ }
+ list.clear();
+ cacheObject.clear();
+ numberOfMappersRunning = -1;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReporterImpl.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReporterImpl.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReporterImpl.java
new file mode 100644
index 0000000..cfbb26e
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReporterImpl.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * <p>ReporterImpl class.</p>
+ *
+ * @since 0.9.0
+ */
+public class ReporterImpl implements Reporter
+{
+ private Counters counters;
+ InputSplit inputSplit;
+
+ public enum ReporterType
+ {
+ Mapper, Reducer
+ }
+
+ private ReporterType typ;
+
+ public ReporterImpl(final ReporterType kind, final Counters ctrs)
+ {
+ this.typ = kind;
+ this.counters = ctrs;
+ }
+
+ @Override
+ public InputSplit getInputSplit()
+ {
+ if (typ == ReporterType.Reducer) {
+ throw new UnsupportedOperationException("Reducer cannot call getInputSplit()");
+ } else {
+ return inputSplit;
+ }
+ }
+
+ public void setInputSplit(InputSplit inputSplit)
+ {
+ this.inputSplit = inputSplit;
+ }
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount)
+ {
+ if (null != counters) {
+ counters.incrCounter(key, amount);
+ }
+ }
+
+ @Override
+ public void incrCounter(String group, String counter, long amount)
+ {
+ if (null != counters) {
+ counters.incrCounter(group, counter, amount);
+ }
+ }
+
+ @Override
+ public void setStatus(String status)
+ {
+ // do nothing.
+ }
+
+ @Override
+ public void progress()
+ {
+ // do nothing.
+ }
+
+ @Override
+ public Counter getCounter(String group, String name)
+ {
+ Counters.Counter counter = null;
+ if (counters != null) {
+ counter = counters.findCounter(group, name);
+ }
+
+ return counter;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> key)
+ {
+ Counters.Counter counter = null;
+ if (counters != null) {
+ counter = counters.findCounter(key);
+ }
+
+ return counter;
+ }
+
+ public float getProgress()
+ {
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/WordCount.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/WordCount.java
new file mode 100644
index 0000000..31ce3a9
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/WordCount.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+/**
+ * <p>WordCount class.</p>
+ *
+ * @since 0.9.0
+ */
+@SuppressWarnings("deprecation")
+public class WordCount
+{
+
+ public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
+ {
+ private static final IntWritable one = new IntWritable(1);
+ private Text word = new Text();
+
+ public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
+ {
+ String line = value.toString();
+ StringTokenizer tokenizer = new StringTokenizer(line);
+ while (tokenizer.hasMoreTokens()) {
+ word.set(tokenizer.nextToken());
+ output.collect(word, one);
+ }
+ }
+ }
+
+ public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
+ {
+ public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
+ {
+ int sum = 0;
+ while (values.hasNext()) {
+ sum += values.next().get();
+ }
+ output.collect(key, new IntWritable(sum));
+ }
+ }
+
+ public void run(String[] args) throws Exception
+ {
+
+ JobConf conf = new JobConf(this.getClass());
+ conf.setJobName("wordcount");
+
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(IntWritable.class);
+
+ conf.setMapperClass(Map.class);
+ conf.setCombinerClass(Reduce.class);
+ conf.setReducerClass(Reduce.class);
+
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputFormat(TextOutputFormat.class);
+
+ FileInputFormat.setInputPaths(conf, new Path(args[0]));
+ FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+ JobClient.runJob(conf);
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ new WordCount().run(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/resources/META-INF/properties.xml b/examples/mroperator/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..5a56014
--- /dev/null
+++ b/examples/mroperator/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <!-- Log Count Example -->
+ <property>
+ <name>dt.application.LogsCountExample.operator.Mapper.dirName</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.LogsCountExample.operator.Mapper.partitionCount</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.LogsCountExample.operator.Console.filePath</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.LogsCountExample.operator.Console.outputFileName</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.LogsCountExample.operator.Reducer.attr.PARTITIONER</name>
+ <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value>
+ </property>
+
+ <!-- Word Count Example -->
+ <property>
+ <name>dt.application.WordCountExample.operator.Mapper.dirName</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.WordCountExample.operator.Mapper.partitionCount</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.WordCountExample.operator.Console.filePath</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.WordCountExample.operator.Console.outputFileName</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.WordCountExample.operator.Reducer.attr.PARTITIONER</name>
+ <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value>
+ </property>
+
+ <!-- Inverted Index Example -->
+ <property>
+ <name>dt.application.InvertedIndexExample.operator.Mapper.dirName</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.InvertedIndexExample.operator.Mapper.partitionCount</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.InvertedIndexExample.operator.Console.filePath</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.InvertedIndexExample.operator.Console.outputFileName</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dt.application.LogsCountExample.operator.Reducer.attr.PARTITIONER</name>
+ <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/MapOperatorTest.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/MapOperatorTest.java b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/MapOperatorTest.java
new file mode 100644
index 0000000..073d847
--- /dev/null
+++ b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/MapOperatorTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+@SuppressWarnings("deprecation")
+public class MapOperatorTest
+{
+
+ private static Logger LOG = LoggerFactory.getLogger(MapOperatorTest.class);
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+ /**
+ * Test node logic emits correct results
+ */
+ @Test
+ public void testNodeProcessing() throws Exception
+ {
+ testNodeProcessingSchema(new MapOperator<LongWritable, Text, Text, IntWritable>());
+ }
+
+ public void testNodeProcessingSchema(MapOperator<LongWritable, Text, Text, IntWritable> oper) throws IOException
+ {
+
+ CollectorTestSink sortSink = new CollectorTestSink();
+ oper.output.setSink(sortSink);
+
+ oper.setMapClass(WordCount.Map.class);
+ oper.setCombineClass(WordCount.Reduce.class);
+ oper.setDirName(testMeta.testDir);
+ oper.setConfigFile(null);
+ oper.setInputFormatClass(TextInputFormat.class);
+
+ Configuration conf = new Configuration();
+ JobConf jobConf = new JobConf(conf);
+ FileInputFormat.setInputPaths(jobConf, new Path(testMeta.testDir));
+ TextInputFormat inputFormat = new TextInputFormat();
+ inputFormat.configure(jobConf);
+ InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+ Serializer keySerializer = serializationFactory.getSerializer(splits[0].getClass());
+ keySerializer.open(oper.getOutstream());
+ keySerializer.serialize(splits[0]);
+ oper.setInputSplitClass(splits[0].getClass());
+ keySerializer.close();
+ oper.setup(null);
+ oper.beginWindow(0);
+ oper.emitTuples();
+ oper.emitTuples();
+ oper.endWindow();
+ oper.beginWindow(1);
+ oper.emitTuples();
+ oper.endWindow();
+
+ Assert.assertEquals("number emitted tuples", 3, sortSink.collectedTuples.size());
+ for (Object o : sortSink.collectedTuples) {
+ LOG.debug(o.toString());
+ }
+ LOG.debug("Done testing round\n");
+ oper.teardown();
+ }
+
+ public static class TestMeta extends TestWatcher
+ {
+ public final String file1 = "file1";
+ public String baseDir;
+ public String testDir;
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+ String methodName = description.getMethodName();
+ String className = description.getClassName();
+ baseDir = "target/" + className;
+ testDir = baseDir + "/" + methodName;
+ try {
+ FileUtils.forceMkdir(new File(testDir));
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ createFile(testDir + "/" + file1, "1\n2\n3\n1\n2\n3\n");
+ }
+
+ private void createFile(String fileName, String data)
+ {
+ BufferedWriter output = null;
+ try {
+ output = new BufferedWriter(new FileWriter(new File(fileName)));
+ output.write(data);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ } finally {
+ if (output != null) {
+ try {
+ output.close();
+ } catch (IOException ex) {
+ LOG.error("not able to close the output stream: ", ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ try {
+ FileUtils.deleteDirectory(new File(baseDir));
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/ReduceOperatorTest.java b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/ReduceOperatorTest.java
new file mode 100644
index 0000000..bff982a
--- /dev/null
+++ b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/ReduceOperatorTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyHashValPair;
+
+public class ReduceOperatorTest
+{
+ private static Logger logger = LoggerFactory.getLogger(ReduceOperatorTest.class);
+
+ /**
+ * Test node logic emits correct results
+ */
+ @Test
+ public void testNodeProcessing() throws Exception
+ {
+ testNodeProcessingSchema(new ReduceOperator<Text, IntWritable,Text, IntWritable>());
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void testNodeProcessingSchema(ReduceOperator<Text, IntWritable,Text, IntWritable> oper)
+ {
+ oper.setReduceClass(WordCount.Reduce.class);
+ oper.setConfigFile(null);
+ oper.setup(null);
+
+ CollectorTestSink sortSink = new CollectorTestSink();
+ oper.output.setSink(sortSink);
+
+ oper.beginWindow(0);
+ oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, 1));
+ oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
+ oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
+ oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
+ oper.endWindow();
+
+ oper.beginWindow(1);
+ oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
+ oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
+ oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
+ oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, -1));
+ oper.endWindow();
+ Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size());
+ for (Object o : sortSink.collectedTuples) {
+ logger.debug(o.toString());
+ }
+ logger.debug("Done testing round\n");
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/WordCountMRApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/WordCountMRApplicationTest.java b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/WordCountMRApplicationTest.java
new file mode 100644
index 0000000..fccfa3b
--- /dev/null
+++ b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/WordCountMRApplicationTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.LocalMode;
+
+public class WordCountMRApplicationTest
+{
+ private static Logger LOG = LoggerFactory.getLogger(WordCountMRApplicationTest.class);
+ @Rule
+ public MapOperatorTest.TestMeta testMeta = new MapOperatorTest.TestMeta();
+
+ @Test
+ public void testSomeMethod() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.application.WordCountExample.operator.Mapper.dirName", testMeta.testDir);
+ conf.setInt("dt.application.WordCountExample.operator.Mapper.partitionCount", 1);
+ conf.set("dt.application.WordCountExample.operator.Console.filePath", testMeta.testDir);
+ conf.set("dt.application.WordCountExample.operator.Console.outputFileName", "output.txt");
+ lma.prepareDAG(new NewWordCountApplication(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+ lc.run(5000);
+ lc.shutdown();
+ List<String> readLines = FileUtils.readLines(new File(testMeta.testDir + "/output.txt"));
+ Map<String,Integer> readMap = Maps.newHashMap();
+ Iterator<String> itr = readLines.iterator();
+ while (itr.hasNext()) {
+ String[] splits = itr.next().split("=");
+ readMap.put(splits[0],Integer.valueOf(splits[1]));
+ }
+ Map<String,Integer> expectedMap = Maps.newHashMap();
+ expectedMap.put("1",2);
+ expectedMap.put("2",2);
+ expectedMap.put("3",2);
+ Assert.assertEquals("expected reduced data ", expectedMap, readMap);
+ LOG.info("read lines {}", readLines);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/test/resources/log4j.properties b/examples/mroperator/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/mroperator/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pi/pom.xml b/examples/pi/pom.xml
new file mode 100644
index 0000000..6c4935f
--- /dev/null
+++ b/examples/pi/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>malhar-examples-pi</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar Pi Example</name>
+ <description>Apex example applications that calculate the value of Pi. This is a starting point to understand how Apex works.</description>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.7.0-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ <version>6.6.4</version>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/pi/src/assemble/appPackage.xml b/examples/pi/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/pi/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/Application.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/Application.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/Application.java
new file mode 100644
index 0000000..45f2b37
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/Application.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+/**
+ * Monte Carlo PI estimation Example : <br>
+ * This application computes value of PI using Monte Carlo pi estimation
+ * formula.
+ * <p>
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see something like the
+ * following output on the console (since the input sequence of random numbers
+ * can vary from one run to the next, there will be some variation in the
+ * output values):
+ *
+ * <pre>
+ * 3.1430480549199085
+ * 3.1423454157782515
+ * 3.1431377245508982
+ * 3.142078799249531
+ * 2013-06-18 10:43:18,335 [main] INFO stram.StramLocalCluster run - Application finished.
+ * </pre>
+ *
+ * Application DAG : <br>
+ * <img src="doc-files/Application.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 1000 ms(1 Sec) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The rand Operator : </b> This operator generates random integer
+ * between 0-30k. <br>
+ * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator}<br>
+ * StateFull : No</li>
+ * <li><b>The calc operator : </b> This operator computes value of pi using
+ * monte carlo estimation. <br>
+ * Class : PiCalculateOperator <br>
+ * StateFull : No</li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). You can use other output adapters if needed.<br>
+ * </li>
+ * </ul>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = "PiExample")
+public class Application implements StreamingApplication
+{
+ private final Locality locality = null;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+ PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator());
+ ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
+ dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality);
+ dag.addStream("rand_console",calc.output, console.input).setLocality(locality);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationAppData.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationAppData.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationAppData.java
new file mode 100644
index 0000000..fbd196a
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationAppData.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import java.net.URI;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+/**
+ * Monte Carlo PI estimation example : <br>
+ * This application computes value of PI using Monte Carlo pi estimation
+ * formula.
+ * <p>
+ * Very similar to PiExample but data is also written to an App Data operator for visualization.
+ * <p>
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see something like the
+ * following output on the console (since the input sequence of random numbers
+ * can vary from one run to the next, there will be some variation in the
+ * output values):
+ *
+ * <pre>
+ * 3.1430480549199085
+ * 3.1423454157782515
+ * 3.1431377245508982
+ * 3.142078799249531
+ * 2013-06-18 10:43:18,335 [main] INFO stram.StramLocalCluster run - Application finished.
+ * </pre>
+ *
+ * Application DAG : <br>
+ * <img src="doc-files/Application.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 1000 ms(1 Sec) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The rand Operator : </b> This operator generates random integer
+ * between 0-30k. <br>
+ * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator}<br>
+ * StateFull : No</li>
+ * <li><b>The calc operator : </b> This operator computes value of pi using
+ * monte carlo estimation. <br>
+ * Class : PiCalculateOperator <br>
+ * StateFull : No</li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). You can use other output adapters if needed.<br>
+ * </li>
+ * </ul>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = "PiExampleAppData")
+public class ApplicationAppData implements StreamingApplication
+{
+ public static final String SNAPSHOT_SCHEMA = "PiExampleDataSchema.json";
+
+ private final Locality locality = null;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+ PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator());
+
+
+ dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality);
+
+ String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+
+ if (StringUtils.isEmpty(gatewayAddress)) {
+ throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS");
+ }
+
+ URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+
+ AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap());
+
+ String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA);
+
+ snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON);
+
+ PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery();
+ wsQuery.enableEmbeddedMode();
+ snapshotServer.setEmbeddableQueryInfoProvider(wsQuery);
+
+ PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
+
+ wsQuery.setUri(uri);
+ wsResult.setUri(uri);
+ Operator.InputPort<String> queryResultPort = wsResult.input;
+
+ NamedValueList<Object> adaptor = dag.addOperator("adaptor", new NamedValueList<Object>());
+ ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
+
+ dag.addStream("PiValues", calc.output, adaptor.inPort, console.input).setLocality(locality);;
+ dag.addStream("NamedPiValues", adaptor.outPort, snapshotServer.input);
+ dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationWithScript.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationWithScript.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationWithScript.java
new file mode 100644
index 0000000..8a9cc50
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationWithScript.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.script.JavaScriptOperator;
+import com.datatorrent.lib.stream.RoundRobinHashMap;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+/**
+ * Monte Carlo PI estimation example : <br>
+ * This application computes value of PI using Monte Carlo pi estimation
+ * formula. This example inputs formula using java script operator.
+ *
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see following output on
+ * console:
+ * <pre>
+ * 2013-06-25 11:44:25,842 [container-2] DEBUG stram.StramChildAgent updateOperatorStatus - container-2 pendingDeploy []
+ * 2013-06-25 11:44:25,929 [ServerHelper-1-1] DEBUG netlet.AbstractClient send - allocating new sendBuffer4Offers of size 16384 for Server.Subscriber{type=rrhm_calc/3.inBindings, mask=0, partitions=null}
+ * 3.16
+ * 3.15
+ * 3.1616
+ * 3.148
+ * 3.1393846153846154
+ * </pre>
+ *
+ * * Application DAG : <br>
+ * <img src="doc-files/ApplicationScript.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 1000 ms(1 Sec) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The rand Operator : </b> This operator generates random integer
+ * between 0-30k. <br>
+ * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator} <br>
+ * StateFull : No</li>
+ * <li><b>The rrhm Operator : </b> This operator takes input from random generator
+ * creates tuples of (x,y) in round robin fashion. <br>
+ * Class : {@link com.datatorrent.lib.stream.RandomEventGenerator} <br>
+ * StateFull : Yes, tuple is emitted after (x, y) values have been aggregated.</li>
+ * <li><b>The calc operator : </b> This is java script operator implementing <br>
+ * Class : {@link com.datatorrent.lib.math.Script} <br>
+ * StateFull : No</li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). User can use any output adapter. <br>
+ * .</li>
+ * </ul>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = "PiJavaScriptExample")
+public class ApplicationWithScript implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ int maxValue = 30000;
+
+ RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+ rand.setMinvalue(0);
+ rand.setMaxvalue(maxValue);
+
+ RoundRobinHashMap<String,Object> rrhm = dag.addOperator("rrhm", new RoundRobinHashMap<String, Object>());
+ rrhm.setKeys(new String[]{"x", "y"});
+
+ JavaScriptOperator calc = dag.addOperator("picalc", new JavaScriptOperator());
+ calc.setPassThru(false);
+ calc.put("i",0);
+ calc.put("count",0);
+ calc.addSetupScript("function pi() { if (x*x+y*y <= " + maxValue * maxValue + ") { i++; } count++; return i / count * 4; }");
+
+ calc.setInvoke("pi");
+
+ dag.addStream("rand_rrhm", rand.integer_data, rrhm.data);
+ dag.addStream("rrhm_calc", rrhm.map, calc.inBindings);
+
+ ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
+ dag.addStream("rand_console",calc.result, console.input);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/Calculator.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/Calculator.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/Calculator.java
new file mode 100644
index 0000000..672a931
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/Calculator.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.math.Division;
+import com.datatorrent.lib.math.LogicalCompareToConstant;
+import com.datatorrent.lib.math.MultiplyByConstant;
+import com.datatorrent.lib.math.RunningAverage;
+import com.datatorrent.lib.math.Sigma;
+import com.datatorrent.lib.math.SquareCalculus;
+import com.datatorrent.lib.stream.AbstractAggregator;
+import com.datatorrent.lib.stream.ArrayListAggregator;
+import com.datatorrent.lib.stream.Counter;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+/**
+ * <p>Calculator class.</p>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = "PiLibraryExample")
+public class Calculator implements StreamingApplication
+{
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ /* keep generating random values between 0 and 30000 */
+ RandomEventGenerator xyGenerator = dag.addOperator("GenerateX", RandomEventGenerator.class);
+
+ /* calculate square of each of the values it receives */
+ SquareCalculus squareOperator = dag.addOperator("SquareX", SquareCalculus.class);
+
+ /* pair the consecutive values */
+ AbstractAggregator<Integer> pairOperator = dag.addOperator("PairXY", new ArrayListAggregator<Integer>());
+ Sigma<Integer> sumOperator = dag.addOperator("SumXY", new Sigma<Integer>());
+ LogicalCompareToConstant<Integer> comparator = dag.addOperator("AnalyzeLocation", new LogicalCompareToConstant<Integer>());
+ comparator.setConstant(30000 * 30000);
+ Counter inCircle = dag.addOperator("CountInCircle", Counter.class);
+ Counter inSquare = dag.addOperator("CountInSquare", Counter.class);
+ Division division = dag.addOperator("Ratio", Division.class);
+ MultiplyByConstant multiplication = dag.addOperator("InstantPI", MultiplyByConstant.class);
+ multiplication.setMultiplier(4);
+ RunningAverage average = dag.addOperator("AveragePI", new RunningAverage());
+ ConsoleOutputOperator oper = dag.addOperator("Console", new ConsoleOutputOperator());
+
+ dag.addStream("x", xyGenerator.integer_data, squareOperator.input);
+ dag.addStream("sqr", squareOperator.integerResult, pairOperator.input);
+ dag.addStream("x2andy2", pairOperator.output, sumOperator.input);
+ dag.addStream("x2plusy2", sumOperator.integerResult, comparator.input, inSquare.input);
+ dag.addStream("inCirclePoints", comparator.greaterThan, inCircle.input);
+ dag.addStream("numerator", inCircle.output, division.numerator);
+ dag.addStream("denominator", inSquare.output, division.denominator);
+ dag.addStream("ratio", division.doubleQuotient, multiplication.input);
+ dag.addStream("instantPi", multiplication.doubleProduct, average.input);
+ dag.addStream("averagePi", average.doubleAverage, oper.input);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/NamedValueList.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/NamedValueList.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/NamedValueList.java
new file mode 100644
index 0000000..aef8a0c
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/NamedValueList.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * <p>An operator which converts a raw value to a named value singleton list.</p>
+ * AppDataSnapshotServerMap.input accepts a List<Map<String,Object>> so we use this operator to
+ * convert individual values to a singleton list of a named value
+ * <p>
+ * @displayNamed Value
+ * @tags count
+ * @since 3.2.0
+ */
+public class NamedValueList<T> extends BaseOperator
+{
+ @NotNull
+ private String valueName;
+
+ private List<Map<String, T>> valueList;
+ private Map<String, T> valueMap;
+
+ public final transient DefaultInputPort<T> inPort = new DefaultInputPort<T>()
+ {
+ @Override
+ public void process(T val)
+ {
+ valueMap.put(valueName, val);
+ outPort.emit(valueList);
+ }
+ };
+
+ public final transient DefaultOutputPort<List<Map<String, T>>> outPort = new DefaultOutputPort<>();
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ valueMap = new HashMap<>();
+ valueMap.put(valueName, null);
+ valueList = Collections.singletonList(valueMap);
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ }
+
+ public String getValueName()
+ {
+ return valueName;
+ }
+
+ public void setValueName(String name)
+ {
+ valueName = name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/PiCalculateOperator.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/PiCalculateOperator.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/PiCalculateOperator.java
new file mode 100644
index 0000000..b710a96
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/PiCalculateOperator.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This operator implements Monte Carlo estimation of pi. For points randomly distributed points on
+ * square circle. pi ~= Number of points in circle/Total number of points * 4.
+ *
+ * @since 0.3.2
+ */
+public class PiCalculateOperator extends BaseOperator
+{
+ private transient int x = -1;
+ private transient int y = -1;
+ private int base;
+ private long inArea = 0;
+ private long totalArea = 0;
+ public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
+ {
+ @Override
+ public void process(Integer tuple)
+ {
+ if (x == -1) {
+ x = tuple;
+ } else {
+ y = tuple;
+ if (x * x + y * y <= base) {
+ inArea++;
+ }
+ totalArea++;
+ x = y = -1;
+ }
+ }
+
+ };
+ public final transient DefaultOutputPort<Double> output = new DefaultOutputPort<Double>();
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ logger.info("inArea {} totalArea {}", inArea, totalArea);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ output.emit((double)inArea / totalArea * 4);
+ }
+
+ public void setBase(int num)
+ {
+ base = num;
+ }
+
+ public int getBase()
+ {
+ return base;
+ }
+
+ private static Logger logger = LoggerFactory.getLogger(PiCalculateOperator.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/Application.gif
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/Application.gif b/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/Application.gif
new file mode 100644
index 0000000..9545c6c
Binary files /dev/null and b/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/Application.gif differ
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/ApplicationScript.gif
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/ApplicationScript.gif b/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/ApplicationScript.gif
new file mode 100644
index 0000000..29f9ef8
Binary files /dev/null and b/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/ApplicationScript.gif differ
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/package-info.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/package-info.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/package-info.java
new file mode 100644
index 0000000..3ebad48
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Pi calculation demonstration application.
+ */
+package org.apache.apex.examples.pi;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/resources/META-INF/properties.xml b/examples/pi/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..95df4c7
--- /dev/null
+++ b/examples/pi/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <!-- Memory settings for all examples -->
+ <property>
+ <name>dt.attr.MASTER_MEMORY_MB</name>
+ <value>1024</value>
+ </property>
+ <property>
+ <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
+ <value>256</value>
+ </property>
+ <property>
+ <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name>
+ <value>-Xmx128M</value>
+ </property>
+ <property>
+ <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
+ <value>128</value>
+ </property>
+
+ <!-- PiExample -->
+ <property>
+ <name>dt.application.PiExample.operator.rand.minvalue</name>
+ <value>0</value>
+ </property>
+ <property>
+ <name>dt.application.PiExample.operator.rand.maxvalue</name>
+ <value>30000</value>
+ </property>
+ <property>
+ <name>dt.application.PiExample.operator.picalc.base</name>
+ <value>900000000</value>
+ </property>
+ <property>
+ <name>dt.application.PiExample.operator.adaptor.valueName</name>
+ <value>piValue</value>
+ </property>
+
+ <!-- PiExampleAppData -->
+ <property>
+ <name>dt.application.PiExampleAppData.operator.rand.minvalue</name>
+ <value>0</value>
+ </property>
+ <property>
+ <name>dt.application.PiExampleAppData.operator.rand.maxvalue</name>
+ <value>30000</value>
+ </property>
+ <property>
+ <name>dt.application.PiExampleAppData.operator.picalc.base</name>
+ <value>900000000</value>
+ </property>
+ <property>
+ <name>dt.application.PiExampleAppData.operator.adaptor.valueName</name>
+ <value>piValue</value>
+ </property>
+ <property>
+ <name>dt.application.PiExampleAppData.operator.Query.topic</name>
+ <value>PiExampleQuery</value>
+ </property>
+ <property>
+ <name>dt.application.PiExampleAppData.operator.QueryResult.topic</name>
+ <value>PiExampleQueryResult</value>
+ </property>
+ <property>
+ <name>dt.application.PiExampleAppData.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name>
+ <value>PiExampleQuery</value>
+ </property>
+
+ <!-- PiLibraryExample -->
+ <property>
+ <name>dt.application.PiLibraryExample.operator.GenerateX.minvalue</name>
+ <value>0</value>
+ </property>
+ <property>
+ <name>dt.application.PiLibraryExample.operator.GenerateX.maxvalue</name>
+ <value>30000</value>
+ </property>
+ <property>
+ <name>dt.application.PiLibraryExample.operator.PairXY.size</name>
+ <value>2</value>
+ </property>
+
+ <!-- PiJavaScriptExample -->
+ <property>
+ <name>dt.application.PiJavaScriptExample.operator.rand.tuplesBlast</name>
+ <value>9</value>
+ </property>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/resources/PiExampleDataSchema.json
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/resources/PiExampleDataSchema.json b/examples/pi/src/main/resources/PiExampleDataSchema.json
new file mode 100644
index 0000000..47db8eb
--- /dev/null
+++ b/examples/pi/src/main/resources/PiExampleDataSchema.json
@@ -0,0 +1,3 @@
+{
+ "values": [{"name": "piValue", "type": "double"}]
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/resources/app/PiJsonExample.json
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/resources/app/PiJsonExample.json b/examples/pi/src/main/resources/app/PiJsonExample.json
new file mode 100644
index 0000000..b1e2972
--- /dev/null
+++ b/examples/pi/src/main/resources/app/PiJsonExample.json
@@ -0,0 +1,52 @@
+{
+ "description": "Pi JSON Example that is intended to demonstrate the capability of specifying an app using JSON",
+ "operators": [
+ {
+ "name": "rand",
+ "class": "com.datatorrent.lib.testbench.RandomEventGenerator",
+ "properties": {
+ "minvalue": 0,
+ "maxvalue": 30000
+ }
+ },
+ {
+ "name": "picalc",
+ "class": "com.datatorrent.examples.pi.PiCalculateOperator",
+ "properties": {
+ "base": 900000000
+ }
+ },
+ {
+ "name": "console",
+ "class": "com.datatorrent.lib.io.ConsoleOutputOperator"
+ }
+ ],
+ "streams": [
+ {
+ "name": "rand_calc",
+ "source": {
+ "operatorName": "rand",
+ "portName": "integer_data"
+ },
+ "sinks": [
+ {
+ "operatorName": "picalc",
+ "portName": "input"
+ }
+ ]
+ },
+ {
+ "name": "calc_console",
+ "source": {
+ "operatorName": "picalc",
+ "portName": "output"
+ },
+ "sinks": [
+ {
+ "operatorName": "console",
+ "portName": "input"
+ }
+ ]
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationTest.java b/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationTest.java
new file mode 100644
index 0000000..d71288f
--- /dev/null
+++ b/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ *
+ */
+public class ApplicationTest
+{
+ @Test
+ public void testSomeMethod() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource("dt-site-pi.xml");
+ lma.prepareDAG(new Application(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(10000);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationWithScriptTest.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationWithScriptTest.java b/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationWithScriptTest.java
new file mode 100644
index 0000000..9551c52
--- /dev/null
+++ b/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationWithScriptTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.junit.Test;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ *
+ */
+public class ApplicationWithScriptTest
+{
+ @Test
+ public void testSomeMethod() throws Exception
+ {
+ LocalMode.runApp(new ApplicationWithScript(), 10000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/java/org/apache/apex/examples/pi/CalculatorTest.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/test/java/org/apache/apex/examples/pi/CalculatorTest.java b/examples/pi/src/test/java/org/apache/apex/examples/pi/CalculatorTest.java
new file mode 100644
index 0000000..34ad387
--- /dev/null
+++ b/examples/pi/src/test/java/org/apache/apex/examples/pi/CalculatorTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ *
+ */
+public class CalculatorTest
+{
+ @Test
+ public void testSomeMethod() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource("dt-site-pilibrary.xml");
+ lma.prepareDAG(new Calculator(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(10000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/resources/dt-site-pi.xml
----------------------------------------------------------------------
diff --git a/examples/pi/src/test/resources/dt-site-pi.xml b/examples/pi/src/test/resources/dt-site-pi.xml
new file mode 100644
index 0000000..6032400
--- /dev/null
+++ b/examples/pi/src/test/resources/dt-site-pi.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <property>
+ <name>dt.application.PiExample.class</name>
+ <value>org.apache.apex.examples.pi.Application</value>
+ <description>An alias for the application</description>
+</property>
+<property>
+ <name>dt.application.PiExample.operator.rand.minvalue</name>
+ <value>0</value>
+</property>
+<property>
+ <name>dt.application.PiExample.operator.rand.maxvalue</name>
+ <value>30000</value>
+</property>
+<property>
+ <name>dt.application.PiExample.operator.picalc.base</name>
+ <value>900000000</value>
+</property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/resources/dt-site-pilibrary.xml
----------------------------------------------------------------------
diff --git a/examples/pi/src/test/resources/dt-site-pilibrary.xml b/examples/pi/src/test/resources/dt-site-pilibrary.xml
new file mode 100644
index 0000000..8f1ae8b
--- /dev/null
+++ b/examples/pi/src/test/resources/dt-site-pilibrary.xml
@@ -0,0 +1,45 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <property>
+ <name>dt.application.PiLibraryExample.class</name>
+ <value>org.apache.apex.examples.pi.Calculator</value>
+ <description>An alias for the application</description>
+</property>
+<property>
+ <name>dt.application.PiLibraryExample.operator.GenerateX.minvalue</name>
+ <value>0</value>
+</property>
+<property>
+ <name>dt.application.PiLibraryExample.operator.GenerateX.maxvalue</name>
+ <value>30000</value>
+</property>
+<property>
+ <name>dt.application.PiLibraryExample.operator.PairXY.size</name>
+ <value>2</value>
+</property>
+<!--
+<property>
+ <name>dt.application.PiLibraryExample.operator.AnalyzeLocation.constant</name>
+ <value>900000000</value>
+</property> -->
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/pi/src/test/resources/log4j.properties b/examples/pi/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/pi/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug