You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:10 UTC

[05/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/OutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/OutputCollectorImpl.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/OutputCollectorImpl.java
new file mode 100644
index 0000000..31337c1
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/OutputCollectorImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.OutputCollector;
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/**
+ * <p>OutputCollectorImpl class.</p>
+ *
+ * @since 0.9.0
+ */
+@SuppressWarnings("unchecked")
+public class OutputCollectorImpl<K extends Object, V extends Object> implements OutputCollector<K, V>
+{
+  private static final Logger logger = LoggerFactory.getLogger(OutputCollectorImpl.class);
+
+  private List<KeyHashValPair<K, V>> list = new ArrayList<KeyHashValPair<K, V>>();
+
+  public List<KeyHashValPair<K, V>> getList()
+  {
+    return list;
+  }
+
+  private transient SerializationFactory serializationFactory;
+  private transient Configuration conf = null;
+
+  public OutputCollectorImpl()
+  {
+    conf = new Configuration();
+    serializationFactory = new SerializationFactory(conf);
+
+  }
+
+  private <T> T cloneObj(T t) throws IOException
+  {
+    Serializer<T> keySerializer;
+    Class<T> keyClass;
+    PipedInputStream pis = new PipedInputStream();
+    PipedOutputStream pos = new PipedOutputStream(pis);
+    keyClass = (Class<T>)t.getClass();
+    keySerializer = serializationFactory.getSerializer(keyClass);
+    keySerializer.open(pos);
+    keySerializer.serialize(t);
+    Deserializer<T> keyDesiralizer = serializationFactory.getDeserializer(keyClass);
+    keyDesiralizer.open(pis);
+    T clonedArg0 = keyDesiralizer.deserialize(null);
+    pos.close();
+    pis.close();
+    keySerializer.close();
+    keyDesiralizer.close();
+    return clonedArg0;
+
+  }
+
+  @Override
+  public void collect(K arg0, V arg1) throws IOException
+  {
+    if (conf == null) {
+      conf = new Configuration();
+      serializationFactory = new SerializationFactory(conf);
+    }
+    list.add(new KeyHashValPair<K, V>(cloneObj(arg0), cloneObj(arg1)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReduceOperator.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReduceOperator.java
new file mode 100644
index 0000000..cdc5ec9
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReduceOperator.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/**
+ * <p>ReduceOperator class.</p>
+ *
+ * @since 0.9.0
+ */
+@SuppressWarnings({ "deprecation", "unused" })
+public class ReduceOperator<K1, V1, K2, V2> implements Operator
+{
+  private static final Logger logger = LoggerFactory.getLogger(ReduceOperator.class);
+
+  private Class<? extends Reducer<K1, V1, K2, V2>> reduceClass;
+  private transient Reducer<K1, V1, K2, V2> reduceObj;
+  private transient Reporter reporter;
+  private OutputCollector<K2, V2> outputCollector;
+  private String configFile;
+
+  public Class<? extends Reducer<K1, V1, K2, V2>> getReduceClass()
+  {
+    return reduceClass;
+  }
+
+  public void setReduceClass(Class<? extends Reducer<K1, V1, K2, V2>> reduceClass)
+  {
+    this.reduceClass = reduceClass;
+  }
+
+  public String getConfigFile()
+  {
+    return configFile;
+  }
+
+  public void setConfigFile(String configFile)
+  {
+    this.configFile = configFile;
+  }
+
+  private int numberOfMappersRunning = -1;
+  private int operatorId;
+
+  public final transient DefaultInputPort<KeyHashValPair<Integer, Integer>> inputCount = new DefaultInputPort<KeyHashValPair<Integer, Integer>>()
+  {
+    @Override
+    public void process(KeyHashValPair<Integer, Integer> tuple)
+    {
+      logger.info("processing {}", tuple);
+      if (numberOfMappersRunning == -1) {
+        numberOfMappersRunning = tuple.getValue();
+      } else {
+        numberOfMappersRunning += tuple.getValue();
+      }
+
+    }
+
+  };
+
+  public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>();
+  private Map<K1, List<V1>> cacheObject;
+  public final transient DefaultInputPort<KeyHashValPair<K1, V1>> input = new DefaultInputPort<KeyHashValPair<K1, V1>>()
+  {
+    @Override
+    public void process(KeyHashValPair<K1, V1> tuple)
+    {
+      // logger.info("processing tupple {}",tuple);
+      List<V1> list = cacheObject.get(tuple.getKey());
+      if (list == null) {
+        list = new ArrayList<V1>();
+        list.add(tuple.getValue());
+        cacheObject.put(tuple.getKey(), list);
+      } else {
+        list.add(tuple.getValue());
+      }
+    }
+
+  };
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    reporter = new ReporterImpl(ReporterImpl.ReporterType.Reducer, new Counters());
+    if (context != null) {
+      operatorId = context.getId();
+    }
+    cacheObject = new HashMap<K1, List<V1>>();
+    outputCollector = new OutputCollectorImpl<K2, V2>();
+    if (reduceClass != null) {
+      try {
+        reduceObj = reduceClass.newInstance();
+      } catch (Exception e) {
+        logger.info("can't instantiate object {}", e.getMessage());
+        throw new RuntimeException(e);
+      }
+      Configuration conf = new Configuration();
+      InputStream stream = null;
+      if (configFile != null && configFile.length() > 0) {
+        logger.info("system /{}", configFile);
+        stream = ClassLoader.getSystemResourceAsStream("/" + configFile);
+        if (stream == null) {
+          logger.info("system {}", configFile);
+          stream = ClassLoader.getSystemResourceAsStream(configFile);
+        }
+      }
+      if (stream != null) {
+        logger.info("found our stream... so adding it");
+        conf.addResource(stream);
+      }
+      reduceObj.configure(new JobConf(conf));
+    }
+
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (numberOfMappersRunning == 0) {
+      for (Map.Entry<K1, List<V1>> e : cacheObject.entrySet()) {
+        try {
+          reduceObj.reduce(e.getKey(), e.getValue().iterator(), outputCollector, reporter);
+        } catch (IOException e1) {
+          logger.info(e1.getMessage());
+          throw new RuntimeException(e1);
+        }
+      }
+      List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList();
+      for (KeyHashValPair<K2, V2> e : list) {
+        output.emit(e);
+      }
+      list.clear();
+      cacheObject.clear();
+      numberOfMappersRunning = -1;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReporterImpl.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReporterImpl.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReporterImpl.java
new file mode 100644
index 0000000..cfbb26e
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReporterImpl.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * <p>ReporterImpl class.</p>
+ *
+ * @since 0.9.0
+ */
+public class ReporterImpl implements Reporter
+{
+  private Counters counters;
+  InputSplit inputSplit;
+
+  public enum ReporterType
+  {
+    Mapper, Reducer
+  }
+
+  private ReporterType typ;
+
+  public ReporterImpl(final ReporterType kind, final Counters ctrs)
+  {
+    this.typ = kind;
+    this.counters = ctrs;
+  }
+
+  @Override
+  public InputSplit getInputSplit()
+  {
+    if (typ == ReporterType.Reducer) {
+      throw new UnsupportedOperationException("Reducer cannot call getInputSplit()");
+    } else {
+      return inputSplit;
+    }
+  }
+
+  public void setInputSplit(InputSplit inputSplit)
+  {
+    this.inputSplit = inputSplit;
+  }
+
+  @Override
+  public void incrCounter(Enum<?> key, long amount)
+  {
+    if (null != counters) {
+      counters.incrCounter(key, amount);
+    }
+  }
+
+  @Override
+  public void incrCounter(String group, String counter, long amount)
+  {
+    if (null != counters) {
+      counters.incrCounter(group, counter, amount);
+    }
+  }
+
+  @Override
+  public void setStatus(String status)
+  {
+    // do nothing.
+  }
+
+  @Override
+  public void progress()
+  {
+    // do nothing.
+  }
+
+  @Override
+  public Counter getCounter(String group, String name)
+  {
+    Counters.Counter counter = null;
+    if (counters != null) {
+      counter = counters.findCounter(group, name);
+    }
+
+    return counter;
+  }
+
+  @Override
+  public Counter getCounter(Enum<?> key)
+  {
+    Counters.Counter counter = null;
+    if (counters != null) {
+      counter = counters.findCounter(key);
+    }
+
+    return counter;
+  }
+
+  public float getProgress()
+  {
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/WordCount.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/WordCount.java
new file mode 100644
index 0000000..31ce3a9
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/WordCount.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+/**
+ * <p>WordCount class.</p>
+ *
+ * @since 0.9.0
+ */
+@SuppressWarnings("deprecation")
+public class WordCount
+{
+
+  public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
+  {
+    private static final IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+
+    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
+    {
+      String line = value.toString();
+      StringTokenizer tokenizer = new StringTokenizer(line);
+      while (tokenizer.hasMoreTokens()) {
+        word.set(tokenizer.nextToken());
+        output.collect(word, one);
+      }
+    }
+  }
+
+  public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
+  {
+    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
+    {
+      int sum = 0;
+      while (values.hasNext()) {
+        sum += values.next().get();
+      }
+      output.collect(key, new IntWritable(sum));
+    }
+  }
+
+  public void run(String[] args) throws Exception
+  {
+
+    JobConf conf = new JobConf(this.getClass());
+    conf.setJobName("wordcount");
+
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(IntWritable.class);
+
+    conf.setMapperClass(Map.class);
+    conf.setCombinerClass(Reduce.class);
+    conf.setReducerClass(Reduce.class);
+
+    conf.setInputFormat(TextInputFormat.class);
+    conf.setOutputFormat(TextOutputFormat.class);
+
+    FileInputFormat.setInputPaths(conf, new Path(args[0]));
+    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+    JobClient.runJob(conf);
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    new WordCount().run(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/resources/META-INF/properties.xml b/examples/mroperator/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..5a56014
--- /dev/null
+++ b/examples/mroperator/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- Log Count Example -->
+  <property>
+    <name>dt.application.LogsCountExample.operator.Mapper.dirName</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.LogsCountExample.operator.Mapper.partitionCount</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.LogsCountExample.operator.Console.filePath</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.LogsCountExample.operator.Console.outputFileName</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.LogsCountExample.operator.Reducer.attr.PARTITIONER</name>
+    <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value>
+  </property>
+
+  <!-- Word Count Example -->
+  <property>
+    <name>dt.application.WordCountExample.operator.Mapper.dirName</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.WordCountExample.operator.Mapper.partitionCount</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.WordCountExample.operator.Console.filePath</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.WordCountExample.operator.Console.outputFileName</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.WordCountExample.operator.Reducer.attr.PARTITIONER</name>
+    <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value>
+  </property>
+
+  <!-- Inverted Index Example -->
+  <property>
+    <name>dt.application.InvertedIndexExample.operator.Mapper.dirName</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.InvertedIndexExample.operator.Mapper.partitionCount</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.InvertedIndexExample.operator.Console.filePath</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.InvertedIndexExample.operator.Console.outputFileName</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.LogsCountExample.operator.Reducer.attr.PARTITIONER</name>
+    <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/MapOperatorTest.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/MapOperatorTest.java b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/MapOperatorTest.java
new file mode 100644
index 0000000..073d847
--- /dev/null
+++ b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/MapOperatorTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+@SuppressWarnings("deprecation")
+public class MapOperatorTest
+{
+
+  private static Logger LOG = LoggerFactory.getLogger(MapOperatorTest.class);
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new MapOperator<LongWritable, Text, Text, IntWritable>());
+  }
+
+  public void testNodeProcessingSchema(MapOperator<LongWritable, Text, Text, IntWritable> oper) throws IOException
+  {
+
+    CollectorTestSink sortSink = new CollectorTestSink();
+    oper.output.setSink(sortSink);
+
+    oper.setMapClass(WordCount.Map.class);
+    oper.setCombineClass(WordCount.Reduce.class);
+    oper.setDirName(testMeta.testDir);
+    oper.setConfigFile(null);
+    oper.setInputFormatClass(TextInputFormat.class);
+
+    Configuration conf = new Configuration();
+    JobConf jobConf = new JobConf(conf);
+    FileInputFormat.setInputPaths(jobConf, new Path(testMeta.testDir));
+    TextInputFormat inputFormat = new TextInputFormat();
+    inputFormat.configure(jobConf);
+    InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    Serializer keySerializer = serializationFactory.getSerializer(splits[0].getClass());
+    keySerializer.open(oper.getOutstream());
+    keySerializer.serialize(splits[0]);
+    oper.setInputSplitClass(splits[0].getClass());
+    keySerializer.close();
+    oper.setup(null);
+    oper.beginWindow(0);
+    oper.emitTuples();
+    oper.emitTuples();
+    oper.endWindow();
+    oper.beginWindow(1);
+    oper.emitTuples();
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 3, sortSink.collectedTuples.size());
+    for (Object o : sortSink.collectedTuples) {
+      LOG.debug(o.toString());
+    }
+    LOG.debug("Done testing round\n");
+    oper.teardown();
+  }
+
+  public static class TestMeta extends TestWatcher
+  {
+    public final String file1 = "file1";
+    public String baseDir;
+    public String testDir;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      String methodName = description.getMethodName();
+      String className = description.getClassName();
+      baseDir = "target/" + className;
+      testDir = baseDir + "/" + methodName;
+      try {
+        FileUtils.forceMkdir(new File(testDir));
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+      createFile(testDir + "/" + file1, "1\n2\n3\n1\n2\n3\n");
+    }
+
+    private void createFile(String fileName, String data)
+    {
+      BufferedWriter output = null;
+      try {
+        output = new BufferedWriter(new FileWriter(new File(fileName)));
+        output.write(data);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      } finally {
+        if (output != null) {
+          try {
+            output.close();
+          } catch (IOException ex) {
+            LOG.error("not able to close the output stream: ", ex);
+          }
+        }
+      }
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      try {
+        FileUtils.deleteDirectory(new File(baseDir));
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/ReduceOperatorTest.java b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/ReduceOperatorTest.java
new file mode 100644
index 0000000..bff982a
--- /dev/null
+++ b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/ReduceOperatorTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyHashValPair;
+
+public class ReduceOperatorTest
+{
+  private static Logger logger = LoggerFactory.getLogger(ReduceOperatorTest.class);
+
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new ReduceOperator<Text, IntWritable,Text, IntWritable>());
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void testNodeProcessingSchema(ReduceOperator<Text, IntWritable,Text, IntWritable> oper)
+  {
+    oper.setReduceClass(WordCount.Reduce.class);
+    oper.setConfigFile(null);
+    oper.setup(null);
+
+    CollectorTestSink sortSink = new CollectorTestSink();
+    oper.output.setSink(sortSink);
+
+    oper.beginWindow(0);
+    oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, 1));
+    oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
+    oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
+    oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
+    oper.endWindow();
+
+    oper.beginWindow(1);
+    oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
+    oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
+    oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
+    oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, -1));
+    oper.endWindow();
+    Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size());
+    for (Object o : sortSink.collectedTuples) {
+      logger.debug(o.toString());
+    }
+    logger.debug("Done testing round\n");
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/WordCountMRApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/WordCountMRApplicationTest.java b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/WordCountMRApplicationTest.java
new file mode 100644
index 0000000..fccfa3b
--- /dev/null
+++ b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/WordCountMRApplicationTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.LocalMode;
+
+public class WordCountMRApplicationTest
+{
+  private static Logger LOG = LoggerFactory.getLogger(WordCountMRApplicationTest.class);
+  @Rule
+  public MapOperatorTest.TestMeta testMeta = new MapOperatorTest.TestMeta();
+
+  @Test
+  public void testSomeMethod() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.application.WordCountExample.operator.Mapper.dirName", testMeta.testDir);
+    conf.setInt("dt.application.WordCountExample.operator.Mapper.partitionCount", 1);
+    conf.set("dt.application.WordCountExample.operator.Console.filePath", testMeta.testDir);
+    conf.set("dt.application.WordCountExample.operator.Console.outputFileName", "output.txt");
+    lma.prepareDAG(new NewWordCountApplication(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(false);
+    lc.run(5000);
+    lc.shutdown();
+    List<String> readLines = FileUtils.readLines(new File(testMeta.testDir + "/output.txt"));
+    Map<String,Integer> readMap = Maps.newHashMap();
+    Iterator<String> itr = readLines.iterator();
+    while (itr.hasNext()) {
+      String[] splits = itr.next().split("=");
+      readMap.put(splits[0],Integer.valueOf(splits[1]));
+    }
+    Map<String,Integer> expectedMap = Maps.newHashMap();
+    expectedMap.put("1",2);
+    expectedMap.put("2",2);
+    expectedMap.put("3",2);
+    Assert.assertEquals("expected reduced data ", expectedMap, readMap);
+    LOG.info("read lines {}", readLines);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/test/resources/log4j.properties b/examples/mroperator/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/mroperator/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pi/pom.xml b/examples/pi/pom.xml
new file mode 100644
index 0000000..6c4935f
--- /dev/null
+++ b/examples/pi/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>malhar-examples-pi</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar Pi Example</name>
+  <description>Apex example applications that calculate the value of Pi. This is a starting point to understand how Apex works.</description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+
+  <dependencies>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+      <version>6.6.4</version>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/pi/src/assemble/appPackage.xml b/examples/pi/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/pi/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/Application.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/Application.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/Application.java
new file mode 100644
index 0000000..45f2b37
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/Application.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+/**
+ * Monte Carlo PI estimation Example : <br>
+ * This application computes value of PI using Monte Carlo pi estimation
+ * formula.
+ * <p>
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see something like the
+ * following output on the console (since the input sequence of random numbers
+ * can vary from one run to the next, there will be some variation in the
+ * output values):
+ *
+ * <pre>
+ * 3.1430480549199085
+ * 3.1423454157782515
+ * 3.1431377245508982
+ * 3.142078799249531
+ * 2013-06-18 10:43:18,335 [main] INFO  stram.StramLocalCluster run - Application finished.
+ * </pre>
+ *
+ * Application DAG : <br>
+ * <img src="doc-files/Application.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 1000 ms(1 Sec) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The rand Operator : </b> This operator generates random integer
+ * between 0-30k. <br>
+ * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator}<br>
+ * StateFull : No</li>
+ * <li><b>The calc operator : </b> This operator computes value of pi using
+ * monte carlo estimation. <br>
+ * Class : PiCalculateOperator <br>
+ * StateFull : No</li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). You can use other output adapters if needed.<br>
+ * </li>
+ * </ul>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = "PiExample")
+public class Application implements StreamingApplication
+{
+  private final Locality locality = null;
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+    PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator());
+    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
+    dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality);
+    dag.addStream("rand_console",calc.output, console.input).setLocality(locality);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationAppData.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationAppData.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationAppData.java
new file mode 100644
index 0000000..fbd196a
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationAppData.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import java.net.URI;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+/**
+ * Monte Carlo PI estimation example : <br>
+ * This application computes value of PI using Monte Carlo pi estimation
+ * formula.
+ * <p>
+ * Very similar to PiExample but data is also written to an App Data operator for visualization.
+ * <p>
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see something like the
+ * following output on the console (since the input sequence of random numbers
+ * can vary from one run to the next, there will be some variation in the
+ * output values):
+ *
+ * <pre>
+ * 3.1430480549199085
+ * 3.1423454157782515
+ * 3.1431377245508982
+ * 3.142078799249531
+ * 2013-06-18 10:43:18,335 [main] INFO  stram.StramLocalCluster run - Application finished.
+ * </pre>
+ *
+ * Application DAG : <br>
+ * <img src="doc-files/Application.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 1000 ms(1 Sec) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The rand Operator : </b> This operator generates random integer
+ * between 0-30k. <br>
+ * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator}<br>
+ * StateFull : No</li>
+ * <li><b>The calc operator : </b> This operator computes value of pi using
+ * monte carlo estimation. <br>
+ * Class : PiCalculateOperator <br>
+ * StateFull : No</li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). You can use other output adapters if needed.<br>
+ * </li>
+ * </ul>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = "PiExampleAppData")
+public class ApplicationAppData implements StreamingApplication
+{
+  public static final String SNAPSHOT_SCHEMA = "PiExampleDataSchema.json";
+
+  private final Locality locality = null;
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+    PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator());
+
+
+    dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality);
+
+    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+
+    if (StringUtils.isEmpty(gatewayAddress)) {
+      throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS");
+    }
+
+    URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+
+    AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap());
+
+    String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA);
+
+    snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON);
+
+    PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery();
+    wsQuery.enableEmbeddedMode();
+    snapshotServer.setEmbeddableQueryInfoProvider(wsQuery);
+
+    PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
+
+    wsQuery.setUri(uri);
+    wsResult.setUri(uri);
+    Operator.InputPort<String> queryResultPort = wsResult.input;
+
+    NamedValueList<Object> adaptor = dag.addOperator("adaptor", new NamedValueList<Object>());
+    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
+
+    dag.addStream("PiValues", calc.output, adaptor.inPort, console.input).setLocality(locality);;
+    dag.addStream("NamedPiValues", adaptor.outPort, snapshotServer.input);
+    dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationWithScript.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationWithScript.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationWithScript.java
new file mode 100644
index 0000000..8a9cc50
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationWithScript.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.script.JavaScriptOperator;
+import com.datatorrent.lib.stream.RoundRobinHashMap;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+/**
+ * Monte Carlo PI estimation example : <br>
+ * This application computes value of PI using Monte Carlo pi estimation
+ * formula. This example inputs formula using java script operator.
+ *
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see following output on
+ * console:
+ * <pre>
+ * 2013-06-25 11:44:25,842 [container-2] DEBUG stram.StramChildAgent updateOperatorStatus - container-2 pendingDeploy []
+ * 2013-06-25 11:44:25,929 [ServerHelper-1-1] DEBUG netlet.AbstractClient send - allocating new sendBuffer4Offers of size 16384 for Server.Subscriber{type=rrhm_calc/3.inBindings, mask=0, partitions=null}
+ * 3.16
+ * 3.15
+ * 3.1616
+ * 3.148
+ * 3.1393846153846154
+ * </pre>
+ *
+ *  * Application DAG : <br>
+ * <img src="doc-files/ApplicationScript.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 1000 ms(1 Sec) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The rand Operator : </b> This operator generates random integer
+ * between 0-30k. <br>
+ * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator} <br>
+ * StateFull : No</li>
+ *  <li><b>The rrhm Operator : </b> This operator takes input from random generator
+ *  creates tuples of (x,y) in round robin fashion. <br>
+ * Class : {@link com.datatorrent.lib.stream.RandomEventGenerator} <br>
+ * StateFull : Yes, tuple is emitted after (x, y) values have been aggregated.</li>
+ * <li><b>The calc operator : </b> This is java script operator implementing <br>
+ * Class : {@link com.datatorrent.lib.math.Script} <br>
+ * StateFull : No</li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). User can use any output adapter.  <br>
+ * .</li>
+ * </ul>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = "PiJavaScriptExample")
+public class ApplicationWithScript implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    int maxValue = 30000;
+
+    RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+    rand.setMinvalue(0);
+    rand.setMaxvalue(maxValue);
+
+    RoundRobinHashMap<String,Object> rrhm = dag.addOperator("rrhm", new RoundRobinHashMap<String, Object>());
+    rrhm.setKeys(new String[]{"x", "y"});
+
+    JavaScriptOperator calc = dag.addOperator("picalc", new JavaScriptOperator());
+    calc.setPassThru(false);
+    calc.put("i",0);
+    calc.put("count",0);
+    calc.addSetupScript("function pi() { if (x*x+y*y <= " + maxValue * maxValue + ") { i++; } count++; return i / count * 4; }");
+
+    calc.setInvoke("pi");
+
+    dag.addStream("rand_rrhm", rand.integer_data, rrhm.data);
+    dag.addStream("rrhm_calc", rrhm.map, calc.inBindings);
+
+    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
+    dag.addStream("rand_console",calc.result, console.input);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/Calculator.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/Calculator.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/Calculator.java
new file mode 100644
index 0000000..672a931
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/Calculator.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.math.Division;
+import com.datatorrent.lib.math.LogicalCompareToConstant;
+import com.datatorrent.lib.math.MultiplyByConstant;
+import com.datatorrent.lib.math.RunningAverage;
+import com.datatorrent.lib.math.Sigma;
+import com.datatorrent.lib.math.SquareCalculus;
+import com.datatorrent.lib.stream.AbstractAggregator;
+import com.datatorrent.lib.stream.ArrayListAggregator;
+import com.datatorrent.lib.stream.Counter;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+/**
+ * <p>Calculator class.</p>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = "PiLibraryExample")
+public class Calculator implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    /* keep generating random values between 0 and 30000 */
+    RandomEventGenerator xyGenerator = dag.addOperator("GenerateX", RandomEventGenerator.class);
+
+    /* calculate square of each of the values it receives */
+    SquareCalculus squareOperator = dag.addOperator("SquareX", SquareCalculus.class);
+
+    /* pair the consecutive values */
+    AbstractAggregator<Integer> pairOperator = dag.addOperator("PairXY", new ArrayListAggregator<Integer>());
+    Sigma<Integer> sumOperator = dag.addOperator("SumXY", new Sigma<Integer>());
+    LogicalCompareToConstant<Integer> comparator = dag.addOperator("AnalyzeLocation", new LogicalCompareToConstant<Integer>());
+    comparator.setConstant(30000 * 30000);
+    Counter inCircle = dag.addOperator("CountInCircle", Counter.class);
+    Counter inSquare = dag.addOperator("CountInSquare", Counter.class);
+    Division division = dag.addOperator("Ratio", Division.class);
+    MultiplyByConstant multiplication = dag.addOperator("InstantPI", MultiplyByConstant.class);
+    multiplication.setMultiplier(4);
+    RunningAverage average = dag.addOperator("AveragePI", new RunningAverage());
+    ConsoleOutputOperator oper = dag.addOperator("Console", new ConsoleOutputOperator());
+
+    dag.addStream("x", xyGenerator.integer_data, squareOperator.input);
+    dag.addStream("sqr", squareOperator.integerResult, pairOperator.input);
+    dag.addStream("x2andy2", pairOperator.output, sumOperator.input);
+    dag.addStream("x2plusy2", sumOperator.integerResult, comparator.input, inSquare.input);
+    dag.addStream("inCirclePoints", comparator.greaterThan, inCircle.input);
+    dag.addStream("numerator", inCircle.output, division.numerator);
+    dag.addStream("denominator", inSquare.output, division.denominator);
+    dag.addStream("ratio", division.doubleQuotient, multiplication.input);
+    dag.addStream("instantPi", multiplication.doubleProduct, average.input);
+    dag.addStream("averagePi", average.doubleAverage, oper.input);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/NamedValueList.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/NamedValueList.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/NamedValueList.java
new file mode 100644
index 0000000..aef8a0c
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/NamedValueList.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * <p>An operator which converts a raw value to a named value singleton list.</p>
+ * AppDataSnapshotServerMap.input accepts a List<Map<String,Object>> so we use this operator to
+ * convert individual values to a singleton list of a named value
+ * <p>
+ * @displayNamed Value
+ * @tags count
+ * @since 3.2.0
+ */
+public class NamedValueList<T> extends BaseOperator
+{
+  @NotNull
+  private String valueName;
+
+  private List<Map<String, T>> valueList;
+  private Map<String, T> valueMap;
+
+  public final transient DefaultInputPort<T> inPort = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T val)
+    {
+      valueMap.put(valueName, val);
+      outPort.emit(valueList);
+    }
+  };
+
+  public final transient DefaultOutputPort<List<Map<String, T>>> outPort = new DefaultOutputPort<>();
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    valueMap = new HashMap<>();
+    valueMap.put(valueName, null);
+    valueList = Collections.singletonList(valueMap);
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
+  public String getValueName()
+  {
+    return valueName;
+  }
+
+  public void setValueName(String name)
+  {
+    valueName = name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/PiCalculateOperator.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/PiCalculateOperator.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/PiCalculateOperator.java
new file mode 100644
index 0000000..b710a96
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/PiCalculateOperator.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This operator implements Monte Carlo estimation of pi. For points randomly distributed points on
+ * square circle. pi ~= Number of points in circle/Total number of points * 4.
+ *
+ * @since 0.3.2
+ */
+public class PiCalculateOperator extends BaseOperator
+{
+  private transient int x = -1;
+  private transient int y = -1;
+  private int base;
+  private long inArea = 0;
+  private long totalArea = 0;
+  public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
+  {
+    @Override
+    public void process(Integer tuple)
+    {
+      if (x == -1) {
+        x = tuple;
+      } else {
+        y = tuple;
+        if (x * x + y * y <= base) {
+          inArea++;
+        }
+        totalArea++;
+        x = y = -1;
+      }
+    }
+
+  };
+  public final transient DefaultOutputPort<Double> output = new DefaultOutputPort<Double>();
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    logger.info("inArea {} totalArea {}", inArea, totalArea);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+    output.emit((double)inArea / totalArea * 4);
+  }
+
+  public void setBase(int num)
+  {
+    base = num;
+  }
+
+  public int getBase()
+  {
+    return base;
+  }
+
+  private static Logger logger = LoggerFactory.getLogger(PiCalculateOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/Application.gif
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/Application.gif b/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/Application.gif
new file mode 100644
index 0000000..9545c6c
Binary files /dev/null and b/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/Application.gif differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/ApplicationScript.gif
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/ApplicationScript.gif b/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/ApplicationScript.gif
new file mode 100644
index 0000000..29f9ef8
Binary files /dev/null and b/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/ApplicationScript.gif differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/package-info.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/package-info.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/package-info.java
new file mode 100644
index 0000000..3ebad48
--- /dev/null
+++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Pi calculation demonstration application.
+ */
+package org.apache.apex.examples.pi;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/resources/META-INF/properties.xml b/examples/pi/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..95df4c7
--- /dev/null
+++ b/examples/pi/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- Memory settings for all examples -->
+  <property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>1024</value>
+  </property>
+  <property>
+    <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
+    <value>256</value>
+  </property>
+  <property>
+    <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name>
+    <value>-Xmx128M</value>
+  </property>
+  <property>
+    <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
+    <value>128</value>
+  </property>
+
+  <!-- PiExample  -->
+  <property>
+    <name>dt.application.PiExample.operator.rand.minvalue</name>
+    <value>0</value>
+  </property>
+  <property>
+    <name>dt.application.PiExample.operator.rand.maxvalue</name>
+    <value>30000</value>
+  </property>
+  <property>
+    <name>dt.application.PiExample.operator.picalc.base</name>
+    <value>900000000</value>
+  </property>
+  <property>
+    <name>dt.application.PiExample.operator.adaptor.valueName</name>
+    <value>piValue</value>
+  </property>
+
+  <!-- PiExampleAppData  -->
+  <property>
+    <name>dt.application.PiExampleAppData.operator.rand.minvalue</name>
+    <value>0</value>
+  </property>
+  <property>
+    <name>dt.application.PiExampleAppData.operator.rand.maxvalue</name>
+    <value>30000</value>
+  </property>
+  <property>
+    <name>dt.application.PiExampleAppData.operator.picalc.base</name>
+    <value>900000000</value>
+  </property>
+  <property>
+    <name>dt.application.PiExampleAppData.operator.adaptor.valueName</name>
+    <value>piValue</value>
+  </property>
+  <property>
+    <name>dt.application.PiExampleAppData.operator.Query.topic</name>
+    <value>PiExampleQuery</value>
+  </property>
+  <property>
+    <name>dt.application.PiExampleAppData.operator.QueryResult.topic</name>
+    <value>PiExampleQueryResult</value>
+  </property>
+  <property>
+    <name>dt.application.PiExampleAppData.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name>
+    <value>PiExampleQuery</value>
+  </property>
+
+  <!-- PiLibraryExample -->
+  <property>
+    <name>dt.application.PiLibraryExample.operator.GenerateX.minvalue</name>
+    <value>0</value>
+  </property>
+  <property>
+    <name>dt.application.PiLibraryExample.operator.GenerateX.maxvalue</name>
+    <value>30000</value>
+  </property>
+  <property>
+    <name>dt.application.PiLibraryExample.operator.PairXY.size</name>
+    <value>2</value>
+  </property>
+
+  <!-- PiJavaScriptExample -->
+  <property>
+    <name>dt.application.PiJavaScriptExample.operator.rand.tuplesBlast</name>
+    <value>9</value>
+  </property>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/resources/PiExampleDataSchema.json
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/resources/PiExampleDataSchema.json b/examples/pi/src/main/resources/PiExampleDataSchema.json
new file mode 100644
index 0000000..47db8eb
--- /dev/null
+++ b/examples/pi/src/main/resources/PiExampleDataSchema.json
@@ -0,0 +1,3 @@
+{
+  "values": [{"name": "piValue", "type": "double"}]
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/resources/app/PiJsonExample.json
----------------------------------------------------------------------
diff --git a/examples/pi/src/main/resources/app/PiJsonExample.json b/examples/pi/src/main/resources/app/PiJsonExample.json
new file mode 100644
index 0000000..b1e2972
--- /dev/null
+++ b/examples/pi/src/main/resources/app/PiJsonExample.json
@@ -0,0 +1,52 @@
+{
+  "description": "Pi JSON Example that is intended to demonstrate the capability of specifying an app using JSON",
+  "operators": [
+    {
+      "name": "rand",
+      "class": "com.datatorrent.lib.testbench.RandomEventGenerator",
+      "properties": {
+        "minvalue": 0,
+        "maxvalue": 30000
+      }
+    },
+    {
+      "name": "picalc",
+      "class": "com.datatorrent.examples.pi.PiCalculateOperator",
+      "properties": {
+        "base": 900000000
+      }
+    },
+    {
+      "name": "console",
+      "class": "com.datatorrent.lib.io.ConsoleOutputOperator"
+    }
+  ],
+  "streams": [
+    {
+      "name": "rand_calc",
+      "source": {
+        "operatorName": "rand",
+        "portName": "integer_data"
+      },
+      "sinks": [
+        {
+          "operatorName": "picalc",
+          "portName": "input"
+        }
+      ]
+    },
+    {
+      "name": "calc_console",
+      "source": {
+        "operatorName": "picalc",
+        "portName": "output"
+      },
+      "sinks": [
+        {
+          "operatorName": "console",
+          "portName": "input"
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationTest.java b/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationTest.java
new file mode 100644
index 0000000..d71288f
--- /dev/null
+++ b/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ *
+ */
+public class ApplicationTest
+{
+  @Test
+  public void testSomeMethod() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.addResource("dt-site-pi.xml");
+    lma.prepareDAG(new Application(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.run(10000);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationWithScriptTest.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationWithScriptTest.java b/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationWithScriptTest.java
new file mode 100644
index 0000000..9551c52
--- /dev/null
+++ b/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationWithScriptTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.junit.Test;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ *
+ */
+public class ApplicationWithScriptTest
+{
+  @Test
+  public void testSomeMethod() throws Exception
+  {
+    LocalMode.runApp(new ApplicationWithScript(), 10000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/java/org/apache/apex/examples/pi/CalculatorTest.java
----------------------------------------------------------------------
diff --git a/examples/pi/src/test/java/org/apache/apex/examples/pi/CalculatorTest.java b/examples/pi/src/test/java/org/apache/apex/examples/pi/CalculatorTest.java
new file mode 100644
index 0000000..34ad387
--- /dev/null
+++ b/examples/pi/src/test/java/org/apache/apex/examples/pi/CalculatorTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.pi;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ *
+ */
+public class CalculatorTest
+{
+  @Test
+  public void testSomeMethod() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.addResource("dt-site-pilibrary.xml");
+    lma.prepareDAG(new Calculator(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.run(10000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/resources/dt-site-pi.xml
----------------------------------------------------------------------
diff --git a/examples/pi/src/test/resources/dt-site-pi.xml b/examples/pi/src/test/resources/dt-site-pi.xml
new file mode 100644
index 0000000..6032400
--- /dev/null
+++ b/examples/pi/src/test/resources/dt-site-pi.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <property>
+	<name>dt.application.PiExample.class</name>
+	<value>org.apache.apex.examples.pi.Application</value>
+	<description>An alias for the application</description>
+</property> 
+<property>
+	<name>dt.application.PiExample.operator.rand.minvalue</name>
+	<value>0</value>
+</property>
+<property>
+	<name>dt.application.PiExample.operator.rand.maxvalue</name>
+	<value>30000</value>
+</property>
+<property>
+	<name>dt.application.PiExample.operator.picalc.base</name>
+	<value>900000000</value>
+</property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/resources/dt-site-pilibrary.xml
----------------------------------------------------------------------
diff --git a/examples/pi/src/test/resources/dt-site-pilibrary.xml b/examples/pi/src/test/resources/dt-site-pilibrary.xml
new file mode 100644
index 0000000..8f1ae8b
--- /dev/null
+++ b/examples/pi/src/test/resources/dt-site-pilibrary.xml
@@ -0,0 +1,45 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+ <property>
+	<name>dt.application.PiLibraryExample.class</name>
+	<value>org.apache.apex.examples.pi.Calculator</value>
+	<description>An alias for the application</description>
+</property> 
+<property>
+	<name>dt.application.PiLibraryExample.operator.GenerateX.minvalue</name>
+	<value>0</value>
+</property>
+<property>
+	<name>dt.application.PiLibraryExample.operator.GenerateX.maxvalue</name>
+	<value>30000</value>
+</property>
+<property>
+	<name>dt.application.PiLibraryExample.operator.PairXY.size</name>
+	<value>2</value>
+</property>
+<!--
+<property>
+	<name>dt.application.PiLibraryExample.operator.AnalyzeLocation.constant</name>
+	<value>900000000</value>
+</property> -->
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/pi/src/test/resources/log4j.properties b/examples/pi/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/pi/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug