You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:12 UTC

[07/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/mobile/src/assemble/appPackage.xml b/examples/mobile/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/mobile/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
----------------------------------------------------------------------
diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
new file mode 100644
index 0000000..f719643
--- /dev/null
+++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mobile;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.lang3.Range;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.counters.BasicCounters;
+import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
+import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+/**
+ * Mobile Example Application:
+ * <p>
+ * This example simulates large number of cell phones in the range of 40K to 200K
+ * and tracks a given cell number across cell towers. It also displays the changing locations of the cell number on a google map.
+ *
+ * This example demonstrates the scalability feature of the Apex platform.
+ * It showcases the ability of the platform to scale up and down as the phone numbers generated increase and decrease respectively.
+ * If the tuples processed per second by the pmove operator increase beyond 30,000, more partitions of the pmove operator gets deployed until
+ * each of the partition processes around 10000 to 30000 tuples per second.
+ * If the tuples processed per second drops below 10,000, the platform merges the operators until the partition count drops down to the original.
+ * The load can be varied using the tuplesBlast property.
+ * If the tuplesBlast is set to 200, 40K cell phones are generated.
+ * If the tuplesBlast is set to 1000, 200K cell phones are generated.
+ * The tuplesBlast property can be set using dtcli command: 'set-operator-property pmove tuplesBlast 1000'.
+ *
+ *
+ * The specs are as such<br>
+ * Depending on the tuplesBlast property, large number of cell phone numbers are generated.
+ * They jump a cell tower frequently. Sometimes
+ * within a second sometimes in 10 seconds. The aim is to demonstrate the
+ * following abilities<br>
+ * <ul>
+ * <li>Entering query dynamically: The phone numbers are added to locate its gps
+ * in run time.</li>
+ * <li>Changing functionality dynamically: The load is changed by making
+ * functional changes on the load generator operator (phonegen)(</li>
+ * <li>Auto Scale up/Down with load: Operator pmove increases and decreases
+ * partitions as per load</li>
+ * <li></li>
+ * </ul>
+ *
+ * Refer to examples/docs/MobileDemo.md for more information.
+ *
+ * <p>
+ *
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see following output on
+ * console: <br>
+ *
+ * <pre>
+ * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
+ * phoneLocationQueryResult: {phone=5554995, location=(10,4), queryId=q1}
+ * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
+ * phoneLocationQueryResult: {phone=5554995, location=(10,4), queryId=q1}
+ * phoneLocationQueryResult: {phone=5554995, location=(10,5), queryId=q1}
+ * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
+ * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1}
+ * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
+ * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
+ * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1}
+ * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1}
+ * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
+ * </pre>
+ *
+ * * <b>Application DAG : </b><br>
+ * <img src="doc-files/mobile.png" width=600px > <br>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = "MobileExample")
+public class Application implements StreamingApplication
+{
+  public static final String PHONE_RANGE_PROP = "dt.application.MobileExample.phoneRange";
+  public static final String TOTAL_SEED_NOS = "dt.application.MobileExample.totalSeedNumbers";
+  public static final String COOL_DOWN_MILLIS = "dt.application.MobileExample.coolDownMillis";
+  public static final String MAX_THROUGHPUT = "dt.application.MobileExample.maxThroughput";
+  public static final String MIN_THROUGHPUT = "dt.application.MobileExample.minThroughput";
+  private static final Logger LOG = LoggerFactory.getLogger(Application.class);
+  private Range<Integer> phoneRange = Range.between(5550000, 5559999);
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    String lPhoneRange = conf.get(PHONE_RANGE_PROP, null);
+    if (lPhoneRange != null) {
+      String[] tokens = lPhoneRange.split("-");
+      if (tokens.length != 2) {
+        throw new IllegalArgumentException("Invalid range: " + lPhoneRange);
+      }
+      this.phoneRange = Range.between(Integer.parseInt(tokens[0]), Integer.parseInt(tokens[1]));
+    }
+    LOG.debug("Phone range {}", this.phoneRange);
+
+    RandomEventGenerator phones = dag.addOperator("Receiver", RandomEventGenerator.class);
+    phones.setMinvalue(this.phoneRange.getMinimum());
+    phones.setMaxvalue(this.phoneRange.getMaximum());
+
+    PhoneMovementGenerator movementGen = dag.addOperator("LocationFinder", PhoneMovementGenerator.class);
+    dag.setAttribute(movementGen, OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
+
+    StatelessThroughputBasedPartitioner<PhoneMovementGenerator> partitioner = new StatelessThroughputBasedPartitioner<PhoneMovementGenerator>();
+    partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 45000));
+    partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));
+    partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));
+    dag.setAttribute(movementGen, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));
+    dag.setAttribute(movementGen, OperatorContext.PARTITIONER, partitioner);
+
+    // generate seed numbers
+    Random random = new Random();
+    int maxPhone = phoneRange.getMaximum() - phoneRange.getMinimum();
+    int phonesToDisplay = conf.getInt(TOTAL_SEED_NOS, 10);
+    for (int i = phonesToDisplay; i-- > 0; ) {
+      int phoneNo = phoneRange.getMinimum() + random.nextInt(maxPhone + 1);
+      LOG.info("seed no: " + phoneNo);
+      movementGen.phoneRegister.add(phoneNo);
+    }
+    // done generating data
+    LOG.info("Finished generating seed data.");
+
+    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+    URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+    PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator("LocationResults", new PubSubWebSocketOutputOperator<Object>());
+    wsOut.setUri(uri);
+    PubSubWebSocketInputOperator<Map<String, String>> wsIn = dag.addOperator("QueryLocation", new PubSubWebSocketInputOperator<Map<String, String>>());
+    wsIn.setUri(uri);
+    // default partitioning: first connected stream to movementGen will be partitioned
+    dag.addStream("Phone-Data", phones.integer_data, movementGen.data);
+    dag.addStream("Results", movementGen.locationQueryResult, wsOut.input);
+    dag.addStream("Query", wsIn.outputPort, movementGen.phoneQuery);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneEntryOperator.java
----------------------------------------------------------------------
diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneEntryOperator.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneEntryOperator.java
new file mode 100644
index 0000000..f8de357
--- /dev/null
+++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneEntryOperator.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mobile;
+
+import java.util.Map;
+import java.util.Random;
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Generates mobile numbers that will be displayed in mobile example just after launch.<br></br>
+ * Operator attributes:<b>
+ * <ul>
+ *   <li>initialDisplayCount: No. of seed phone numbers that will be generated.</li>
+ *   <li>maxSeedPhoneNumber: The largest seed phone number.</li>
+ * </ul>
+ * </b>
+ *
+ * @since 0.3.5
+ */
+public class PhoneEntryOperator extends BaseOperator
+{
+  private static Logger LOG = LoggerFactory.getLogger(PhoneEntryOperator.class);
+
+  private boolean seedGenerationDone = false;
+
+  @Min(0)
+  private int initialDisplayCount = 0;
+
+  private int maxSeedPhoneNumber = 0;
+  private int rangeLowerEndpoint;
+  private int rangeUpperEndpoint;
+
+  /**
+   * Sets the initial number of phones to display on the google map.
+   *
+   * @param i the count of initial phone numbers to display
+   */
+  public void setInitialDisplayCount(int i)
+  {
+    initialDisplayCount = i;
+  }
+
+  /**
+   * Sets the range for the phone numbers generated by the operator.
+   *
+   * @param i the range within which the phone numbers are randomly generated.
+   */
+  public void setPhoneRange(Range<Integer> phoneRange)
+  {
+    this.rangeLowerEndpoint = phoneRange.lowerEndpoint();
+    this.rangeUpperEndpoint = phoneRange.upperEndpoint();
+  }
+
+  /**
+   * Sets the max seed for random phone number generation
+   *
+   * @param i the number to initialize the random number phone generator.
+   */
+  public void setMaxSeedPhoneNumber(int number)
+  {
+    this.maxSeedPhoneNumber = number;
+  }
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<Map<String, String>> locationQuery = new DefaultInputPort<Map<String, String>>()
+  {
+    @Override
+    public void process(Map<String, String> tuple)
+    {
+      seedPhones.emit(tuple);
+    }
+  };
+
+  public final transient DefaultOutputPort<Map<String, String>> seedPhones = new DefaultOutputPort<Map<String, String>>();
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    if (!seedGenerationDone) {
+      Random random = new Random();
+      int maxPhone = (maxSeedPhoneNumber <= rangeUpperEndpoint && maxSeedPhoneNumber >= rangeLowerEndpoint) ? maxSeedPhoneNumber : rangeUpperEndpoint;
+      maxPhone -= 5550000;
+      int phonesToDisplay = initialDisplayCount > maxPhone ? maxPhone : initialDisplayCount;
+      for (int i = phonesToDisplay; i-- > 0; ) {
+        int phoneNo = 5550000 + random.nextInt(maxPhone + 1);
+        LOG.info("seed no: " + phoneNo);
+        Map<String, String> valueMap = Maps.newHashMap();
+        valueMap.put(PhoneMovementGenerator.KEY_COMMAND, PhoneMovementGenerator.COMMAND_ADD);
+        valueMap.put(PhoneMovementGenerator.KEY_PHONE, Integer.toString(phoneNo));
+        seedPhones.emit(valueMap);
+      }
+      // done generating data
+      seedGenerationDone = true;
+      LOG.info("Finished generating seed data.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneMovementGenerator.java
----------------------------------------------------------------------
diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneMovementGenerator.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneMovementGenerator.java
new file mode 100644
index 0000000..3a293f0
--- /dev/null
+++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneMovementGenerator.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mobile;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.counters.BasicCounters;
+import com.datatorrent.lib.util.HighLow;
+
+/**
+ * <p>
+ * This operator generates the GPS locations for the phone numbers specified.
+ * The range of phone numbers or a specific phone number can be set for which the GPS locations will be generated.
+ * It supports querying the locations of a given phone number.
+ * This is a partionable operator that can partition as the tuplesBlast increases.
+ * </p>
+ *
+ * @since 0.3.2
+ */
+public class PhoneMovementGenerator extends BaseOperator
+{
+  public final transient DefaultInputPort<Integer> data = new DefaultInputPort<Integer>()
+  {
+    @Override
+    public void process(Integer tuple)
+    {
+      HighLow<Integer> loc = gps.get(tuple);
+      if (loc == null) {
+        loc = new HighLow<Integer>(random.nextInt(range), random.nextInt(range));
+        gps.put(tuple, loc);
+      }
+      int xloc = loc.getHigh();
+      int yloc = loc.getLow();
+      int state = rotate % 4;
+
+      // Compute new location
+      int delta = random.nextInt(100);
+      if (delta >= threshold) {
+        if (state < 2) {
+          xloc++;
+        } else {
+          xloc--;
+        }
+        if (xloc < 0) {
+          xloc += range;
+        }
+      }
+      delta = random.nextInt(100);
+      if (delta >= threshold) {
+        if ((state == 1) || (state == 3)) {
+          yloc++;
+        } else {
+          yloc--;
+        }
+        if (yloc < 0) {
+          yloc += range;
+        }
+      }
+      xloc %= range;
+      yloc %= range;
+
+      // Set new location
+      HighLow<Integer> nloc = newgps.get(tuple);
+      if (nloc == null) {
+        newgps.put(tuple, new HighLow<Integer>(xloc, yloc));
+      } else {
+        nloc.setHigh(xloc);
+        nloc.setLow(yloc);
+      }
+      rotate++;
+    }
+  };
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<Map<String,String>> phoneQuery = new DefaultInputPort<Map<String,String>>()
+  {
+    @Override
+    public void process(Map<String,String> tuple)
+    {
+      LOG.info("new query {}", tuple);
+      String command = tuple.get(KEY_COMMAND);
+      if (command != null) {
+        if (command.equals(COMMAND_ADD)) {
+          commandCounters.getCounter(CommandCounters.ADD).increment();
+          String phoneStr = tuple.get(KEY_PHONE);
+          registerPhone(phoneStr);
+        } else if (command.equals(COMMAND_ADD_RANGE)) {
+          commandCounters.getCounter(CommandCounters.ADD_RANGE).increment();
+          registerPhoneRange(tuple.get(KEY_START_PHONE), tuple.get(KEY_END_PHONE));
+        } else if (command.equals(COMMAND_DELETE)) {
+          commandCounters.getCounter(CommandCounters.DELETE).increment();
+          String phoneStr = tuple.get(KEY_PHONE);
+          deregisterPhone(phoneStr);
+        } else if (command.equals(COMMAND_CLEAR)) {
+          commandCounters.getCounter(CommandCounters.CLEAR).increment();
+          clearPhones();
+        }
+      }
+    }
+  };
+
+  public static final String KEY_COMMAND = "command";
+  public static final String KEY_PHONE = "phone";
+  public static final String KEY_LOCATION = "location";
+  public static final String KEY_REMOVED = "removed";
+  public static final String KEY_START_PHONE = "startPhone";
+  public static final String KEY_END_PHONE = "endPhone";
+
+  public static final String COMMAND_ADD = "add";
+  public static final String COMMAND_ADD_RANGE = "addRange";
+  public static final String COMMAND_DELETE = "del";
+  public static final String COMMAND_CLEAR = "clear";
+
+  final Set<Integer> phoneRegister = Sets.newHashSet();
+
+  private final transient HashMap<Integer, HighLow<Integer>> gps = new HashMap<Integer, HighLow<Integer>>();
+  private final Random random = new Random();
+  private int range = 50;
+  private int threshold = 80;
+  private int rotate = 0;
+
+  protected BasicCounters<MutableLong> commandCounters;
+
+  private transient OperatorContext context;
+  private final transient HashMap<Integer, HighLow<Integer>> newgps = new HashMap<Integer, HighLow<Integer>>();
+
+  public PhoneMovementGenerator()
+  {
+    this.commandCounters = new BasicCounters<MutableLong>(MutableLong.class);
+  }
+
+  /**
+   * @return the range of the phone numbers
+   */
+  @Min(0)
+  public int getRange()
+  {
+    return range;
+  }
+
+  /**
+   * Sets the range of phone numbers for which the GPS locations need to be generated.
+   *
+   * @param i the range of phone numbers to set
+   */
+  public void setRange(int i)
+  {
+    range = i;
+  }
+
+  /**
+   * @return the threshold
+   */
+  @Min(0)
+  public int getThreshold()
+  {
+    return threshold;
+  }
+
+  /**
+   * Sets the threshold that decides how frequently the GPS locations are updated.
+   *
+   * @param i the value that decides how frequently the GPS locations change.
+   */
+  public void setThreshold(int i)
+  {
+    threshold = i;
+  }
+
+  private void registerPhone(String phoneStr)
+  {
+    // register the phone channel
+    if (Strings.isNullOrEmpty(phoneStr)) {
+      return;
+    }
+    try {
+      Integer phone = new Integer(phoneStr);
+      registerSinglePhone(phone);
+    } catch (NumberFormatException nfe) {
+      LOG.warn("Invalid no {}", phoneStr);
+    }
+  }
+
+  private void registerPhoneRange(String startPhoneStr, String endPhoneStr)
+  {
+    if (Strings.isNullOrEmpty(startPhoneStr) || Strings.isNullOrEmpty(endPhoneStr)) {
+      LOG.warn("Invalid phone range {} {}", startPhoneStr, endPhoneStr);
+      return;
+    }
+    try {
+      Integer startPhone = new Integer(startPhoneStr);
+      Integer endPhone = new Integer(endPhoneStr);
+      if (endPhone < startPhone) {
+        LOG.warn("Invalid phone range {} {}", startPhone, endPhone);
+        return;
+      }
+      for (int i = startPhone; i <= endPhone; i++) {
+        registerSinglePhone(i);
+      }
+    } catch (NumberFormatException nfe) {
+      LOG.warn("Invalid phone range <{},{}>", startPhoneStr, endPhoneStr);
+    }
+  }
+
+  private void registerSinglePhone(int phone)
+  {
+    phoneRegister.add(phone);
+    LOG.debug("Registered query id with phone {}", phone);
+    emitQueryResult(phone);
+  }
+
+  private void deregisterPhone(String phoneStr)
+  {
+    if (Strings.isNullOrEmpty(phoneStr)) {
+      return;
+    }
+    try {
+      Integer phone = new Integer(phoneStr);
+      // remove the channel
+      if (phoneRegister.contains(phone)) {
+        phoneRegister.remove(phone);
+        LOG.debug("Removing query id {}", phone);
+        emitPhoneRemoved(phone);
+      }
+    } catch (NumberFormatException nfe) {
+      LOG.warn("Invalid phone {}", phoneStr);
+    }
+  }
+
+  private void clearPhones()
+  {
+    phoneRegister.clear();
+    LOG.info("Clearing phones");
+  }
+
+  public final transient DefaultOutputPort<Map<String, String>> locationQueryResult = new DefaultOutputPort<Map<String, String>>();
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    this.context = context;
+    commandCounters.setCounter(CommandCounters.ADD, new MutableLong());
+    commandCounters.setCounter(CommandCounters.ADD_RANGE, new MutableLong());
+    commandCounters.setCounter(CommandCounters.DELETE, new MutableLong());
+    commandCounters.setCounter(CommandCounters.CLEAR, new MutableLong());
+  }
+
+  /**
+   * Emit all the data and clear the hash
+   */
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<Integer, HighLow<Integer>> e: newgps.entrySet()) {
+      HighLow<Integer> loc = gps.get(e.getKey());
+      if (loc == null) {
+        gps.put(e.getKey(), e.getValue());
+      } else {
+        loc.setHigh(e.getValue().getHigh());
+        loc.setLow(e.getValue().getLow());
+      }
+    }
+    boolean found = false;
+    for (Integer phone: phoneRegister) {
+      emitQueryResult( phone);
+      found = true;
+    }
+    if (!found) {
+      LOG.debug("No phone number");
+    }
+    newgps.clear();
+    context.setCounters(commandCounters);
+  }
+
+  private void emitQueryResult(Integer phone)
+  {
+    HighLow<Integer> loc = gps.get(phone);
+    if (loc != null) {
+      Map<String, String> queryResult = new HashMap<String, String>();
+      queryResult.put(KEY_PHONE, String.valueOf(phone));
+      queryResult.put(KEY_LOCATION, loc.toString());
+      locationQueryResult.emit(queryResult);
+    }
+  }
+
+  private void emitPhoneRemoved(Integer phone)
+  {
+    Map<String,String> removedResult = Maps.newHashMap();
+    removedResult.put(KEY_PHONE, String.valueOf(phone));
+    removedResult.put(KEY_REMOVED,"true");
+    locationQueryResult.emit(removedResult);
+  }
+
+  public static enum CommandCounters
+  {
+    ADD, ADD_RANGE, DELETE, CLEAR
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(PhoneMovementGenerator.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/doc-files/Mobile.png
----------------------------------------------------------------------
diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/doc-files/Mobile.png b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/doc-files/Mobile.png
new file mode 100644
index 0000000..a25da0d
Binary files /dev/null and b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/doc-files/Mobile.png differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/package-info.java
----------------------------------------------------------------------
diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/package-info.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/package-info.java
new file mode 100644
index 0000000..1262504
--- /dev/null
+++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Mobile phones tracking demonstration application.
+ */
+package org.apache.apex.examples.mobile;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/mobile/src/main/resources/META-INF/properties.xml b/examples/mobile/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..e213d18
--- /dev/null
+++ b/examples/mobile/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,82 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <property>
+    <name>dt.application.MobileExample.coolDownMillis</name>
+    <value>45000</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.maxThroughput</name>
+    <value>30000</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.minThroughput</name>
+    <value>10000</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.Receiver.tuplesBlast</name>
+    <value>200</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.Receiver.tuplesBlastIntervalMillis</name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.Receiver.outputport.integer_data.attr.QUEUE_CAPACITY</name>
+    <value>32768</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.LocationFinder.range</name>
+    <value>20</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.LocationFinder.threshold</name>
+    <value>80</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.LocationFinder.inputport.data.attr.QUEUE_CAPACITY</name>
+    <value>32768</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.LocationResults.prop.topic</name>
+    <value>examples.mobile.phoneLocationQueryResult</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.QueryLocation.prop.topic</name>
+    <value>examples.mobile.phoneLocationQuery</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.*.attr.MEMORY_MB</name>
+    <value>768</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.*.attr.JVM_OPTIONS</name>
+    <value>-Xmx128m</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
+    <value>256</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.attr.MASTER_MEMORY_MB</name>
+    <value>1024</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
new file mode 100644
index 0000000..ce6ca41
--- /dev/null
+++ b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mobile;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.Servlet;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
+import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+public class ApplicationTest
+{
+  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
+
+  public ApplicationTest()
+  {
+  }
+
+  /**
+   * Test of getApplication method, of class Application.
+   */
+  @Test
+  public void testGetApplication() throws Exception
+  {
+    Configuration conf = new Configuration(false);
+    conf.addResource("dt-site-mobile.xml");
+    Server server = new Server(0);
+    Servlet servlet = new SamplePubSubWebSocketServlet();
+    ServletHolder sh = new ServletHolder(servlet);
+    ServletContextHandler contextHandler = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
+    contextHandler.addServlet(sh, "/pubsub");
+    contextHandler.addServlet(sh, "/*");
+    server.start();
+    Connector[] connector = server.getConnectors();
+    conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort());
+    URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
+
+    PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>();
+    outputOperator.setUri(uri);
+    outputOperator.setTopic(conf.get("dt.application.MobileExample.operator.QueryLocation.topic"));
+
+    PubSubWebSocketInputOperator<Map<String, String>> inputOperator = new PubSubWebSocketInputOperator<Map<String, String>>();
+    inputOperator.setUri(uri);
+    inputOperator.setTopic(conf.get("dt.application.MobileExample.operator.LocationResults.topic"));
+
+    CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+    inputOperator.outputPort.setSink(sink);
+
+    Map<String, String> data = new HashMap<String, String>();
+    data.put("command", "add");
+    data.put("phone", "5559990");
+
+    Application app = new Application();
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(false);
+    lc.runAsync();
+    Thread.sleep(5000);
+    inputOperator.setup(null);
+    outputOperator.setup(null);
+    inputOperator.activate(null);
+    outputOperator.beginWindow(0);
+    outputOperator.input.process(data);
+    outputOperator.endWindow();
+    inputOperator.beginWindow(0);
+    int timeoutMillis = 5000;
+    while (sink.collectedTuples.size() < 5 && timeoutMillis > 0) {
+      inputOperator.emitTuples();
+      timeoutMillis -= 20;
+      Thread.sleep(20);
+    }
+    inputOperator.endWindow();
+    lc.shutdown();
+    inputOperator.teardown();
+    outputOperator.teardown();
+    server.stop();
+    Assert.assertTrue("size of output is 5 ", sink.collectedTuples.size() == 5);
+    for (Object obj : sink.collectedTuples) {
+      Assert.assertEquals("Expected phone number", "5559990", ((Map<String, String>)obj).get("phone"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/test/resources/dt-site-mobile.xml
----------------------------------------------------------------------
diff --git a/examples/mobile/src/test/resources/dt-site-mobile.xml b/examples/mobile/src/test/resources/dt-site-mobile.xml
new file mode 100644
index 0000000..dcb6c98
--- /dev/null
+++ b/examples/mobile/src/test/resources/dt-site-mobile.xml
@@ -0,0 +1,87 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <property>
+    <name>dt.application.MobileExample.class</name>
+    <value>org.apache.apex.examples.mobile.Application</value>
+    <description>An alias for the application</description>
+  </property>
+  <!--property>
+    <name>dt.attr.GATEWAY_CONNECT_ADDRESS</name>
+    <value>localhost:19090</value>
+  </property-->
+  <property>
+    <name>dt.application.MobileExample.totalSeedNumbers</name>
+    <value>0</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.coolDownMillis</name>
+    <value>45000</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.maxThroughput</name>
+    <value>30000</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.minThroughput</name>
+    <value>1</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.Receiver.tuplesBlast</name>
+    <value>200</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.Receiver.tuplesBlastIntervalMillis</name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.Receiver.outputport.integer_data.attr.QUEUE_CAPACITY</name>
+    <value>32768</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.LocationFinder.range</name>
+    <value>20</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.LocationFinder.threshold</name>
+    <value>80</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.LocationFinder.inputport.data.attr.QUEUE_CAPACITY</name>
+    <value>32768</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.LocationResults.topic</name>
+    <value>resultTopic</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.QueryLocation.topic</name>
+    <value>queryTopic</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.operator.*.attr.MEMORY_MB</name>
+    <value>2048</value>
+  </property>
+  <property>
+    <name>dt.application.MobileExample.attr.MASTER_MEMORY_MB</name>
+    <value>1024</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/mobile/src/test/resources/log4j.properties b/examples/mobile/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/mobile/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/pom.xml
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/pom.xml b/examples/mrmonitor/pom.xml
new file mode 100644
index 0000000..527565b
--- /dev/null
+++ b/examples/mrmonitor/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>malhar-examples-mrmonitor</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar MR Monitoring Example</name>
+  <description></description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+
+  <properties>
+    <skipTests>true</skipTests>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-websocket</artifactId>
+      <version>8.1.10.v20130312</version>
+      <scope>test</scope>
+      <type>jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+      <version>8.1.10.v20130312</version>
+      <scope>test</scope>
+      <type>jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.3.5</version>
+      <type>jar</type>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/assemble/appPackage.xml b/examples/mrmonitor/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/mrmonitor/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Application.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Application.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Application.java
new file mode 100644
index 0000000..0557919
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Application.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.SeedEventGenerator;
+
+/**
+ * Application
+ *
+ * @since 2.0.0
+ */
+@ApplicationAnnotation(name = "MyFirstApplication")
+public class Application implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // Sample DAG with 2 operators
+    // Replace this code with the DAG you want to build
+
+    SeedEventGenerator seedGen = dag.addOperator("seedGen", SeedEventGenerator.class);
+    seedGen.setSeedStart(1);
+    seedGen.setSeedEnd(10);
+    seedGen.addKeyData("x", 0, 10);
+    seedGen.addKeyData("y", 0, 100);
+
+    ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
+    cons.setStringFormat("hello: %s");
+
+    dag.addStream("seeddata", seedGen.val_list, cons.input).setLocality(Locality.CONTAINER_LOCAL);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Constants.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Constants.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Constants.java
new file mode 100644
index 0000000..1ed1f26
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Constants.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+/**
+ * <p>Constants class.</p>
+ *
+ * @since 0.3.4
+ */
+public interface Constants
+{
+
+  public static final int MAX_NUMBER_OF_JOBS = 25;
+
+  public static final String REDUCE_TASK_TYPE = "REDUCE";
+  public static final String MAP_TASK_TYPE = "MAP";
+  public static final String TASK_TYPE = "type";
+  public static final String TASK_ID = "id";
+
+  public static final String LEAGACY_TASK_ID = "taskId";
+  public static final int MAX_TASKS = 2000;
+
+  public static final String QUERY_APP_ID = "app_id";
+  public static final String QUERY_JOB_ID = "job_id";
+  public static final String QUERY_HADOOP_VERSION = "hadoop_version";
+  public static final String QUERY_API_VERSION = "api_version";
+  public static final String QUERY_RM_PORT = "rm_port";
+  public static final String QUERY_HS_PORT = "hs_port";
+  public static final String QUERY_HOST_NAME = "hostname";
+  public static final String QUERY_KEY_COMMAND = "command";
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRJobStatusOperator.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRJobStatusOperator.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRJobStatusOperator.java
new file mode 100644
index 0000000..24b3887
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRJobStatusOperator.java
@@ -0,0 +1,622 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.examples.mrmonitor.MRStatusObject.TaskObject;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+
+/**
+ * <p>
+ * MRJobStatusOperator class.
+ * </p>
+ *
+ * @since 0.3.4
+ */
+public class MRJobStatusOperator implements Operator, IdleTimeHandler
+{
+  private static final Logger LOG = LoggerFactory.getLogger(MRJobStatusOperator.class);
+
+  private static final String JOB_PREFIX = "job_";
+  /**
+   * This outputs the meta information of the job
+   */
+  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+  /**
+   * This outputs the map task information of the job
+   */
+  public final transient DefaultOutputPort<String> mapOutput = new DefaultOutputPort<String>();
+  /**
+   * This outputs the reduce task information of the job
+   */
+  public final transient DefaultOutputPort<String> reduceOutput = new DefaultOutputPort<String>();
+  /**
+   * This outputs the counter information of the job
+   */
+  public final transient DefaultOutputPort<String> counterOutput = new DefaultOutputPort<String>();
+  /**
+   * This is time in Ms before making new request for data
+   */
+  private transient int sleepTime = 100;
+  /**
+   * This is the number of consecutive windows of no change before the job is removed from map
+   */
+  private int maxRetrials = 10;
+  /**
+   * The number of minutes for which the status history of map and reduce tasks is stored
+   */
+  private int statusHistoryTime = 60;
+  private Map<String, MRStatusObject> jobMap = new HashMap<String, MRStatusObject>();
+  /**
+   * This represents the maximum number of jobs the single instance of this operator is going to server at any time
+   */
+  private int maxJobs = Constants.MAX_NUMBER_OF_JOBS;
+  private transient Iterator<MRStatusObject> iterator;
+
+  /*
+   * each input string is a map of the following format {"app_id":<>,"hadoop_version":<>,"api_version":<>,"command":<>,
+   * "hostname":<>,"hs_port":<>,"rm_port":<>,"job_id":<>}
+   */
+  public final transient DefaultInputPort<MRStatusObject> input = new DefaultInputPort<MRStatusObject>()
+  {
+    @Override
+    public void process(MRStatusObject mrStatusObj)
+    {
+
+      if (jobMap == null) {
+        jobMap = new HashMap<String, MRStatusObject>();
+      }
+
+      if (jobMap.size() >= maxJobs) {
+        return;
+      }
+
+      if ("delete".equalsIgnoreCase(mrStatusObj.getCommand())) {
+        removeJob(mrStatusObj.getJobId());
+        JSONObject outputJsonObject = new JSONObject();
+        try {
+          outputJsonObject.put("id", mrStatusObj.getJobId());
+          outputJsonObject.put("removed", "true");
+          output.emit(outputJsonObject.toString());
+        } catch (JSONException e) {
+          LOG.warn("Error creating JSON: {}", e.getMessage());
+        }
+        return;
+      }
+      if ("clear".equalsIgnoreCase(mrStatusObj.getCommand())) {
+        clearMap();
+        return;
+      }
+
+      if (jobMap.get(mrStatusObj.getJobId()) != null) {
+        mrStatusObj = jobMap.get(mrStatusObj.getJobId());
+      }
+      if (mrStatusObj.getHadoopVersion() == 2) {
+        getJsonForJob(mrStatusObj);
+      } else if (mrStatusObj.getHadoopVersion() == 1) {
+        getJsonForLegacyJob(mrStatusObj);
+      }
+      mrStatusObj.setStatusHistoryCount(statusHistoryTime);
+      iterator = jobMap.values().iterator();
+      emitHelper(mrStatusObj);
+    }
+  };
+
+  public int getStatusHistoryTime()
+  {
+    return statusHistoryTime;
+  }
+
+  public void setStatusHistoryTime(int statusHistoryTime)
+  {
+    this.statusHistoryTime = statusHistoryTime;
+    if (jobMap != null && jobMap.size() > 0) {
+      for (Entry<String, MRStatusObject> entry : jobMap.entrySet()) {
+        entry.getValue().setStatusHistoryCount(statusHistoryTime);
+      }
+    }
+
+  }
+
+  /**
+   * This method gets the latest status of the job from the Resource Manager for jobs submitted on hadoop 2.x version
+   *
+   * @param statusObj
+   */
+  private void getJsonForJob(MRStatusObject statusObj)
+  {
+
+    String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId();
+    String responseBody = MRUtil.getJsonForURL(url);
+
+    JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
+
+    if (jsonObj == null) {
+      url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId();
+      responseBody = MRUtil.getJsonForURL(url);
+      jsonObj = MRUtil.getJsonObject(responseBody);
+    }
+
+    if (jsonObj != null) {
+      if (jobMap.get(statusObj.getJobId()) != null) {
+        MRStatusObject tempObj = jobMap.get(statusObj.getJobId());
+        if (tempObj.getJsonObject().toString().equals(jsonObj.toString())) {
+          getJsonsForTasks(statusObj);
+          getCounterInfoForJob(statusObj);
+          return;
+        }
+      }
+      statusObj.setModified(true);
+      statusObj.setJsonObject(jsonObj);
+      getCounterInfoForJob(statusObj);
+      getJsonsForTasks(statusObj);
+      jobMap.put(statusObj.getJobId(), statusObj);
+    }
+  }
+
+  /**
+   * This method is used to collect the metric information about the job
+   *
+   * @param statusObj
+   */
+  private void getCounterInfoForJob(MRStatusObject statusObj)
+  {
+    String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId() + "/counters";
+    String responseBody = MRUtil.getJsonForURL(url);
+    JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
+    if (jsonObj == null) {
+      url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId() + "/counters";
+      responseBody = MRUtil.getJsonForURL(url);
+      jsonObj = MRUtil.getJsonObject(responseBody);
+    }
+
+    if (jsonObj != null) {
+      if (statusObj.getMetricObject() == null) {
+        statusObj.setMetricObject(new TaskObject(jsonObj));
+      } else if (!statusObj.getMetricObject().getJsonString().equalsIgnoreCase(jsonObj.toString())) {
+        statusObj.getMetricObject().setJson(jsonObj);
+        statusObj.getMetricObject().setModified(true);
+      }
+    }
+  }
+
+  /**
+   * This method gets the latest status of the tasks for a job from the Resource Manager for jobs submitted on hadoop
+   * 2.x version
+   *
+   * @param statusObj
+   */
+  private void getJsonsForTasks(MRStatusObject statusObj)
+  {
+    String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId() + "/tasks/";
+    String responseBody = MRUtil.getJsonForURL(url);
+
+    JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
+    if (jsonObj == null) {
+      url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId() + "/tasks/";
+      responseBody = MRUtil.getJsonForURL(url);
+
+      jsonObj = MRUtil.getJsonObject(responseBody);
+    }
+
+    if (jsonObj != null) {
+
+      try {
+        Map<String, TaskObject> mapTaskOject = statusObj.getMapJsonObject();
+        Map<String, TaskObject> reduceTaskOject = statusObj.getReduceJsonObject();
+        JSONArray taskJsonArray = jsonObj.getJSONObject("tasks").getJSONArray("task");
+
+        for (int i = 0; i < taskJsonArray.length(); i++) {
+          JSONObject taskObj = taskJsonArray.getJSONObject(i);
+          if (Constants.REDUCE_TASK_TYPE.equalsIgnoreCase(taskObj.getString(Constants.TASK_TYPE))) {
+            if (reduceTaskOject.get(taskObj.getString(Constants.TASK_ID)) != null) {
+              TaskObject tempTaskObj = reduceTaskOject.get(taskObj.getString(Constants.TASK_ID));
+              if (tempTaskObj.getJsonString().equals(taskObj.toString())) {
+                continue;
+              }
+              tempTaskObj.setJson(taskObj);
+              tempTaskObj.setModified(true);
+              reduceTaskOject.put(taskObj.getString(Constants.TASK_ID), tempTaskObj);
+              continue;
+            }
+            reduceTaskOject.put(taskObj.getString(Constants.TASK_ID), new TaskObject(taskObj));
+          } else {
+            if (mapTaskOject.get(taskObj.getString(Constants.TASK_ID)) != null) {
+              TaskObject tempTaskObj = mapTaskOject.get(taskObj.getString(Constants.TASK_ID));
+              if (tempTaskObj.getJsonString().equals(taskObj.toString())) {
+                continue;
+              }
+              tempTaskObj.setJson(taskObj);
+              tempTaskObj.setModified(true);
+              mapTaskOject.put(taskObj.getString(Constants.TASK_ID), tempTaskObj);
+              continue;
+            }
+            mapTaskOject.put(taskObj.getString(Constants.TASK_ID), new TaskObject(taskObj));
+          }
+        }
+        statusObj.setMapJsonObject(mapTaskOject);
+        statusObj.setReduceJsonObject(reduceTaskOject);
+      } catch (Exception e) {
+        LOG.info("exception: {}", e.getMessage());
+      }
+    }
+
+  }
+
+  /**
+   * This method gets the latest status of the job from the Task Manager for jobs submitted on hadoop 1.x version
+   *
+   * @param statusObj
+   */
+  private void getJsonForLegacyJob(MRStatusObject statusObj)
+  {
+
+    String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/jobdetails.jsp?format=json&jobid=job_" + statusObj.getJobId();
+    String responseBody = MRUtil.getJsonForURL(url);
+
+    JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
+    if (jsonObj == null) {
+      return;
+    }
+
+    if (jobMap.get(statusObj.getJobId()) != null) {
+      MRStatusObject tempObj = jobMap.get(statusObj.getJobId());
+      if (tempObj.getJsonObject().toString().equals(jsonObj.toString())) {
+        getJsonsForLegacyTasks(statusObj, "map");
+        getJsonsForLegacyTasks(statusObj, "reduce");
+        // output.emit(jsonObj.toString());
+        // removeJob(statusObj.getJobId());
+        return;
+      }
+    }
+
+    // output.emit(jsonObj.toString());
+    statusObj.setModified(true);
+    statusObj.setJsonObject(jsonObj);
+    getJsonsForLegacyTasks(statusObj, "map");
+    getJsonsForLegacyTasks(statusObj, "reduce");
+    jobMap.put(statusObj.getJobId(), statusObj);
+
+  }
+
+  /**
+   * This method gets the latest status of the tasks for a job from the Task Manager for jobs submitted on hadoop 1.x
+   * version
+   *
+   * @param statusObj
+   * @param type
+   */
+  private void getJsonsForLegacyTasks(MRStatusObject statusObj, String type)
+  {
+    try {
+      JSONObject jobJson = statusObj.getJsonObject();
+      int totalTasks = ((JSONObject)((JSONObject)jobJson.get(type + "TaskSummary")).get("taskStats")).getInt("numTotalTasks");
+      Map<String, TaskObject> taskMap;
+      if (type.equalsIgnoreCase("map")) {
+        taskMap = statusObj.getMapJsonObject();
+      } else {
+        taskMap = statusObj.getReduceJsonObject();
+      }
+
+      int totalPagenums = (totalTasks / Constants.MAX_TASKS) + 1;
+      String baseUrl = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/jobtasks.jsp?type=" + type + "&format=json&jobid=job_" + statusObj.getJobId() + "&pagenum=";
+
+      for (int pagenum = 1; pagenum <= totalPagenums; pagenum++) {
+
+        String url = baseUrl + pagenum;
+        String responseBody = MRUtil.getJsonForURL(url);
+
+        JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
+        if (jsonObj == null) {
+          return;
+        }
+
+        JSONArray taskJsonArray = jsonObj.getJSONArray("tasksInfo");
+
+        for (int i = 0; i < taskJsonArray.length(); i++) {
+          JSONObject taskObj = taskJsonArray.getJSONObject(i);
+          {
+            if (taskMap.get(taskObj.getString(Constants.LEAGACY_TASK_ID)) != null) {
+              TaskObject tempReduceObj = taskMap.get(taskObj.getString(Constants.LEAGACY_TASK_ID));
+              if (tempReduceObj.getJsonString().equals(taskObj.toString())) {
+                // tempReduceObj.setModified(false);
+                // taskMap.put(taskObj.getString(Constants.TASK_ID), tempReduceObj);
+                continue;
+              }
+              tempReduceObj.setJson(taskObj);
+              tempReduceObj.setModified(true);
+              taskMap.put(taskObj.getString(Constants.TASK_ID), tempReduceObj);
+              continue;
+
+            }
+            taskMap.put(taskObj.getString(Constants.LEAGACY_TASK_ID), new TaskObject(taskObj));
+          }
+        }
+      }
+
+      if (type.equalsIgnoreCase("map")) {
+        statusObj.setMapJsonObject(taskMap);
+      } else {
+        statusObj.setReduceJsonObject(taskMap);
+      }
+    } catch (Exception e) {
+      LOG.info(e.getMessage());
+    }
+
+  }
+
+  @Override
+  public void handleIdleTime()
+  {
+    try {
+      Thread.sleep(sleepTime);//
+    } catch (InterruptedException ie) {
+      // If this thread was intrrupted by nother thread
+    }
+    if (!iterator.hasNext()) {
+      iterator = jobMap.values().iterator();
+    }
+
+    if (iterator.hasNext()) {
+      MRStatusObject obj = iterator.next();
+      if (obj.getHadoopVersion() == 2) {
+        getJsonForJob(obj);
+      } else if (obj.getHadoopVersion() == 1) {
+        getJsonForLegacyJob(obj);
+      }
+    }
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    iterator = jobMap.values().iterator();
+    sleepTime = context.getValue(OperatorContext.SPIN_MILLIS);
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  @Override
+  public void beginWindow(long arg0)
+  {
+  }
+
+  private void emitHelper(MRStatusObject obj)
+  {
+    try {
+      obj.setModified(false);
+      output.emit(obj.getJsonObject().toString());
+      JSONObject outputJsonObject = new JSONObject();
+
+      outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
+      outputJsonObject.put("mapHistory", new JSONArray(obj.getMapStatusHistory()));
+      outputJsonObject.put("reduceHistory", new JSONArray(obj.getReduceStatusHistory()));
+      outputJsonObject.put("physicalMemoryHistory", new JSONArray(obj.getPhysicalMemeoryStatusHistory()));
+      outputJsonObject.put("virtualMemoryHistory", new JSONArray(obj.getVirtualMemoryStatusHistory()));
+      outputJsonObject.put("cpuHistory", new JSONArray(obj.getCpuStatusHistory()));
+      output.emit(outputJsonObject.toString());
+      obj.setChangedHistoryStatus(false);
+
+      outputJsonObject = new JSONObject();
+      outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
+      JSONArray arr = new JSONArray();
+
+      for (Map.Entry<String, TaskObject> mapEntry : obj.getMapJsonObject().entrySet()) {
+        TaskObject json = mapEntry.getValue();
+        json.setModified(false);
+        arr.put(json.getJson());
+      }
+
+      outputJsonObject.put("tasks", arr);
+      mapOutput.emit(outputJsonObject.toString());
+
+      outputJsonObject = new JSONObject();
+      outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
+      arr = new JSONArray();
+
+      for (Map.Entry<String, TaskObject> mapEntry : obj.getReduceJsonObject().entrySet()) {
+        TaskObject json = mapEntry.getValue();
+        json.setModified(false);
+        arr.put(json.getJson());
+      }
+
+      outputJsonObject.put("tasks", arr);
+      reduceOutput.emit(outputJsonObject.toString());
+      obj.setRetrials(0);
+    } catch (Exception e) {
+      LOG.warn("error creating json {}", e.getMessage());
+    }
+
+  }
+
+  @Override
+  public void endWindow()
+  {
+    List<String> delList = new ArrayList<String>();
+    try {
+      for (Map.Entry<String, MRStatusObject> entry : jobMap.entrySet()) {
+        MRStatusObject obj = entry.getValue();
+
+        JSONObject outputJsonObject = new JSONObject();
+        outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
+
+        boolean modified = false;
+
+        if (obj.isModified()) {
+          modified = true;
+          obj.setModified(false);
+          output.emit(obj.getJsonObject().toString());
+          if (obj.isChangedHistoryStatus()) {
+            outputJsonObject.put("mapHistory", new JSONArray(obj.getMapStatusHistory()));
+            outputJsonObject.put("reduceHistory", new JSONArray(obj.getReduceStatusHistory()));
+            outputJsonObject.put("physicalMemoryHistory", new JSONArray(obj.getPhysicalMemeoryStatusHistory()));
+            outputJsonObject.put("virtualMemoryHistory", new JSONArray(obj.getVirtualMemoryStatusHistory()));
+            outputJsonObject.put("cpuHistory", new JSONArray(obj.getCpuStatusHistory()));
+            output.emit(outputJsonObject.toString());
+            obj.setChangedHistoryStatus(false);
+          }
+        }
+        outputJsonObject = new JSONObject();
+        outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
+        JSONArray arr = new JSONArray();
+
+        for (Map.Entry<String, TaskObject> mapEntry : obj.getMapJsonObject().entrySet()) {
+          TaskObject json = mapEntry.getValue();
+          if (json.isModified()) {
+            modified = true;
+            json.setModified(false);
+            arr.put(json.getJson());
+          }
+        }
+
+        if (arr.length() > 0) {
+          outputJsonObject.put("tasks", arr);
+          mapOutput.emit(outputJsonObject.toString());
+        }
+
+        outputJsonObject = new JSONObject();
+        outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
+        arr = new JSONArray();
+
+        for (Map.Entry<String, TaskObject> mapEntry : obj.getReduceJsonObject().entrySet()) {
+          TaskObject json = mapEntry.getValue();
+          if (json.isModified()) {
+            modified = true;
+            json.setModified(false);
+            arr.put(json.getJson());
+          }
+        }
+        if (arr.length() > 0) {
+          outputJsonObject.put("tasks", arr);
+          reduceOutput.emit(outputJsonObject.toString());
+        }
+
+        if (obj.getMetricObject() != null && obj.getMetricObject().isModified()) {
+          modified = true;
+          obj.getMetricObject().setModified(false);
+          counterOutput.emit(obj.getMetricObject().getJsonString());
+        }
+
+        if (!modified) {
+          if (obj.getRetrials() >= maxRetrials) {
+            delList.add(obj.getJobId());
+          } else {
+            obj.setRetrials(obj.getRetrials() + 1);
+          }
+        } else {
+          obj.setRetrials(0);
+        }
+      }
+    } catch (Exception ex) {
+      LOG.warn("error creating json {}", ex.getMessage());
+    }
+
+    if (!delList.isEmpty()) {
+      Iterator<String> itr = delList.iterator();
+      while (itr.hasNext()) {
+        removeJob(itr.next());
+      }
+    }
+
+  }
+
+  /**
+   * This method removes the job from the map
+   *
+   * @param jobId
+   */
+  public void removeJob(String jobId)
+  {
+    if (jobMap != null) {
+      jobMap.remove(jobId);
+      iterator = jobMap.values().iterator();
+    }
+  }
+
+  /**
+   * This method clears the job map
+   */
+  public void clearMap()
+  {
+    if (jobMap != null) {
+      jobMap.clear();
+      iterator = jobMap.values().iterator();
+    }
+  }
+
+  /**
+   * This returns the maximum number of jobs the single instance of this operator is going to server at any time
+   *
+   * @return
+   */
+  public int getMaxJobs()
+  {
+    return maxJobs;
+  }
+
+  /**
+   * This sets the maximum number of jobs the single instance of this operator is going to server at any time
+   *
+   * @param maxJobs
+   */
+  public void setMaxJobs(int maxJobs)
+  {
+    this.maxJobs = maxJobs;
+  }
+
+  /**
+   * This sets the number of consecutive windows of no change before the job is removed from map
+   *
+   * @return
+   */
+  public int getMaxRetrials()
+  {
+    return maxRetrials;
+  }
+
+  /**
+   * This returns the number of consecutive windows of no change before the job is removed from map
+   *
+   * @param maxRetrials
+   */
+  public void setMaxRetrials(int maxRetrials)
+  {
+    this.maxRetrials = maxRetrials;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
new file mode 100644
index 0000000..288da84
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import java.net.URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
+
+/**
+ * <p>
+ * MRDebuggerApplication class.
+ * </p>
+ *
+ * @since 0.3.4
+ */
+@ApplicationAnnotation(name = "MRMonitoringExample")
+public class MRMonitoringApplication implements StreamingApplication
+{
+
+  private static final Logger logger = LoggerFactory.getLogger(MRMonitoringApplication.class);
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+    MRJobStatusOperator mrJobOperator = dag.addOperator("JobMonitor", new MRJobStatusOperator());
+    URI uri = URI.create("ws://" + daemonAddress + "/pubsub");
+    logger.info("WebSocket with daemon at {}", daemonAddress);
+
+    PubSubWebSocketInputOperator wsIn = dag.addOperator("Query", new PubSubWebSocketInputOperator());
+    wsIn.setUri(uri);
+
+    MapToMRObjectOperator queryConverter = dag.addOperator("QueryConverter", new MapToMRObjectOperator());
+
+    /**
+     * This is used to emit the meta data about the job
+     */
+    PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator("JobOutput", new PubSubWebSocketOutputOperator<Object>());
+    wsOut.setUri(uri);
+
+    /**
+     * This is used to emit the information of map tasks of the job
+     */
+    PubSubWebSocketOutputOperator<Object> wsMapOut = dag.addOperator("MapJob", new PubSubWebSocketOutputOperator<Object>());
+    wsMapOut.setUri(uri);
+
+    /**
+     * This is used to emit the information of reduce tasks of the job
+     */
+    PubSubWebSocketOutputOperator<Object> wsReduceOut = dag.addOperator("ReduceJob", new PubSubWebSocketOutputOperator<Object>());
+    wsReduceOut.setUri(uri);
+
+    /**
+     * This is used to emit the metric information of the job
+     */
+    PubSubWebSocketOutputOperator<Object> wsCounterOut = dag.addOperator("JobCounter", new PubSubWebSocketOutputOperator<Object>());
+    wsCounterOut.setUri(uri);
+
+    dag.addStream("QueryConversion", wsIn.outputPort, queryConverter.input);
+    dag.addStream("QueryProcessing", queryConverter.output, mrJobOperator.input);
+    dag.addStream("JobData", mrJobOperator.output, wsOut.input);
+    dag.addStream("MapData", mrJobOperator.mapOutput, wsMapOut.input);
+    dag.addStream("ReduceData", mrJobOperator.reduceOutput, wsReduceOut.input);
+    dag.addStream("CounterData", mrJobOperator.counterOutput, wsCounterOut.input);
+  }
+
+}