You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:11 UTC

[06/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRStatusObject.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRStatusObject.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRStatusObject.java
new file mode 100644
index 0000000..ae8b551
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRStatusObject.java
@@ -0,0 +1,501 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * MRStatusObject class.
+ * </p>
+ *
+ * @since 0.3.4
+ */
+public class MRStatusObject
+{
+  private String command;
+  /**
+   * This stores the Resource Manager/ Task Manager's host information
+   */
+  private String uri;
+  /**
+   * This field stores the job id
+   */
+  private String jobId;
+  /**
+   * This field stores the api version of the rest apis
+   */
+  private String apiVersion;
+  /**
+   * This field stores the hadoop version 1 for 1.x and 2 for 2.x
+   */
+  private int hadoopVersion;
+  /**
+   * This field stores the app id for the hadoop 2.x
+   */
+  private String appId;
+  /**
+   * This field stores the RM port information for hadoop 2.x / Task Manager server port for hadoop 1.X from where we
+   * can get the job information
+   */
+  private int rmPort;
+  /**
+   * This field stores the history server information for hadoop 2.x from where we can get the job information
+   */
+  private int historyServerPort;
+  /**
+   * This field stores the job information as json object
+   */
+  private JSONObject jsonObject;
+  /**
+   * This field tells if the object has been modified
+   */
+  private boolean modified;
+  /**
+   * This stores the mapping of map task ids to the TaskObject
+   */
+  private Map<String, TaskObject> mapJsonObject;
+  /**
+   * This stores the mapping of reduce task ids to the TaskObject
+   */
+  private Map<String, TaskObject> reduceJsonObject;
+  /**
+   * This holds the information about the various metrics like MAP_OUTPUT_RECORDS etc for this job
+   */
+  private TaskObject metricObject;
+
+  /**
+   * This holds the number of windows occurred when the new data was retrieved for this job
+   */
+  private int retrials;
+
+  /**
+   * The scheduler is used to store the job status every 1 minute
+   */
+  private transient ScheduledExecutorService statusScheduler;
+
+  /**
+   * This stores the progress of the map tasks
+   */
+  Queue<String> mapStatusHistory;
+
+  /**
+   * This stores the progress of the reduce tasks
+   */
+  Queue<String> reduceStatusHistory;
+
+  /**
+   * This stores the history of the physical memory usage
+   */
+  Queue<String> physicalMemoryStatusHistory;
+
+  /**
+   * This stores the history of the virtual memory usage
+   */
+  Queue<String> virtualMemoryStatusHistory;
+
+  /**
+   * This stores the history of the cpu
+   */
+  Queue<String> cpuStatusHistory;
+
+  /**
+   * The number of minutes for which the status history of map and reduce tasks is stored
+   */
+  private int statusHistoryCount = 60;
+
+  /**
+   * This field notifies if history status queues have changed over time
+   */
+  private boolean changedHistoryStatus;
+
+  public MRStatusObject()
+  {
+    retrials = 0;
+    modified = true;
+    mapJsonObject = new ConcurrentHashMap<String, TaskObject>();
+    reduceJsonObject = new ConcurrentHashMap<String, TaskObject>();
+    mapStatusHistory = new LinkedList<String>();
+    reduceStatusHistory = new LinkedList<String>();
+    physicalMemoryStatusHistory = new LinkedList<String>();
+    virtualMemoryStatusHistory = new LinkedList<String>();
+    cpuStatusHistory = new LinkedList<String>();
+    statusScheduler = Executors.newScheduledThreadPool(1);
+    statusScheduler.scheduleAtFixedRate(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        if (jsonObject != null) {
+          changedHistoryStatus = true;
+          if (mapStatusHistory.size() > statusHistoryCount) {
+            mapStatusHistory.poll();
+            reduceStatusHistory.poll();
+            physicalMemoryStatusHistory.poll();
+            virtualMemoryStatusHistory.poll();
+            cpuStatusHistory.poll();
+          }
+          if (hadoopVersion == 2) {
+            try {
+              mapStatusHistory.add(jsonObject.getJSONObject("job").getString("mapProgress"));
+              reduceStatusHistory.add(jsonObject.getJSONObject("job").getString("reduceProgress"));
+              if (metricObject.getJson() != null) {
+                JSONArray arr = metricObject.getJson().getJSONObject("jobCounters").getJSONArray("counterGroup");
+                int length = arr.length();
+                for (int i = 0; i < length; i++) {
+                  if (arr.getJSONObject(i).get("counterGroupName").equals("org.apache.hadoop.mapreduce.TaskCounter")) {
+                    JSONArray counters = arr.getJSONObject(i).getJSONArray("counter");
+                    for (int j = 0; j < counters.length(); j++) {
+                      JSONObject counterObj = counters.getJSONObject(j);
+                      if (counterObj.get("name").equals("PHYSICAL_MEMORY_BYTES")) {
+                        physicalMemoryStatusHistory.add(counterObj.getString("totalCounterValue"));
+                      } else if (counterObj.get("name").equals("VIRTUAL_MEMORY_BYTES")) {
+                        virtualMemoryStatusHistory.add(counterObj.getString("totalCounterValue"));
+                      } else if (counterObj.get("name").equals("CPU_MILLISECONDS")) {
+                        cpuStatusHistory.add(counterObj.getString("totalCounterValue"));
+                      }
+                    }
+                    break;
+                  }
+                }
+              }
+            } catch (JSONException e) {
+              logger.error("error setting status history {}", e.getMessage());
+            }
+          } else {
+            try {
+              mapStatusHistory.add(jsonObject.getJSONObject("mapTaskSummary").getString("progressPercentage"));
+              reduceStatusHistory.add(jsonObject.getJSONObject("reduceTaskSummary").getString("progressPercentage"));
+              JSONArray arr = jsonObject.getJSONArray("jobCounterGroupsInfo");
+              int length = arr.length();
+              for (int i = 0; i < length; i++) {
+                if (arr.getJSONObject(i).get("groupName").equals("Map-Reduce Framework")) {
+                  JSONArray counters = arr.getJSONObject(i).getJSONArray("jobCountersInfo");
+                  for (int j = 0; j < counters.length(); j++) {
+                    JSONObject counterObj = counters.getJSONObject(j);
+                    if (counterObj.get("name").equals("Physical memory (bytes) snapshot")) {
+                      physicalMemoryStatusHistory.add(counterObj.getString("totalValue"));
+                    } else if (counterObj.get("name").equals("Virtual memory (bytes) snapshot")) {
+                      virtualMemoryStatusHistory.add(counterObj.getString("totalValue"));
+                    } else if (counterObj.get("name").equals("CPU time spent (ms)")) {
+                      cpuStatusHistory.add(counterObj.getString("totalValue"));
+                    }
+                  }
+                  break;
+                }
+              }
+            } catch (JSONException e) {
+              logger.error("error setting status history {}", e.getMessage());
+            }
+          }
+        }
+      }
+    }, 0, 1, TimeUnit.MINUTES);
+  }
+
+  public Map<String, TaskObject> getMapJsonObject()
+  {
+    return mapJsonObject;
+  }
+
+  public void setMapJsonObject(Map<String, TaskObject> mapJsonObject)
+  {
+    this.mapJsonObject = mapJsonObject;
+  }
+
+  public Map<String, TaskObject> getReduceJsonObject()
+  {
+    return reduceJsonObject;
+  }
+
+  public void setReduceJsonObject(Map<String, TaskObject> reduceJsonObject)
+  {
+    this.reduceJsonObject = reduceJsonObject;
+  }
+
+  public String getUri()
+  {
+    return uri;
+  }
+
+  public void setUri(String uri)
+  {
+    this.uri = uri;
+  }
+
+  public String getJobId()
+  {
+    return jobId;
+  }
+
+  public void setJobId(String jobId)
+  {
+    this.jobId = jobId;
+  }
+
+  public String getApiVersion()
+  {
+    return apiVersion;
+  }
+
+  public void setApiVersion(String apiVersion)
+  {
+    this.apiVersion = apiVersion;
+  }
+
+  public int getHadoopVersion()
+  {
+    return hadoopVersion;
+  }
+
+  public void setHadoopVersion(int hadoopVersion)
+  {
+    this.hadoopVersion = hadoopVersion;
+  }
+
+  public String getAppId()
+  {
+    return appId;
+  }
+
+  public void setAppId(String appId)
+  {
+    this.appId = appId;
+  }
+
+  public int getRmPort()
+  {
+    return rmPort;
+  }
+
+  public void setRmPort(int rmPort)
+  {
+    this.rmPort = rmPort;
+  }
+
+  public int getHistoryServerPort()
+  {
+    return historyServerPort;
+  }
+
+  public void setHistoryServerPort(int historyServerPort)
+  {
+    this.historyServerPort = historyServerPort;
+  }
+
+  public JSONObject getJsonObject()
+  {
+    return jsonObject;
+  }
+
+  public void setJsonObject(JSONObject jsonObject)
+  {
+    this.jsonObject = jsonObject;
+  }
+
+  public boolean isChangedHistoryStatus()
+  {
+    return changedHistoryStatus;
+  }
+
+  public void setChangedHistoryStatus(boolean changedHistoryStatus)
+  {
+    this.changedHistoryStatus = changedHistoryStatus;
+  }
+
+  @Override
+  public boolean equals(Object that)
+  {
+    if (this == that) {
+      return true;
+    }
+    if (!(that instanceof MRStatusObject)) {
+      return false;
+    }
+    if (this.hashCode() == that.hashCode()) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return (uri + jobId + apiVersion + String.valueOf(hadoopVersion)).hashCode();
+
+  }
+
+  public String getCommand()
+  {
+    return command;
+  }
+
+  public void setCommand(String command)
+  {
+    this.command = command;
+  }
+
+  public boolean isModified()
+  {
+    return modified;
+  }
+
+  public void setModified(boolean modified)
+  {
+    this.modified = modified;
+  }
+
+  public int getRetrials()
+  {
+    return retrials;
+  }
+
+  public void setRetrials(int retrials)
+  {
+    this.retrials = retrials;
+  }
+
+  public TaskObject getMetricObject()
+  {
+    return metricObject;
+  }
+
+  public void setMetricObject(TaskObject metricObject)
+  {
+    this.metricObject = metricObject;
+  }
+
+  public int getStatusHistoryCount()
+  {
+    return statusHistoryCount;
+  }
+
+  public void setStatusHistoryCount(int statusHistoryCount)
+  {
+    this.statusHistoryCount = statusHistoryCount;
+  }
+
+  public Queue<String> getMapStatusHistory()
+  {
+    return mapStatusHistory;
+  }
+
+  public Queue<String> getReduceStatusHistory()
+  {
+    return reduceStatusHistory;
+  }
+
+  public Queue<String> getPhysicalMemeoryStatusHistory()
+  {
+    return physicalMemoryStatusHistory;
+  }
+
+  public Queue<String> getVirtualMemoryStatusHistory()
+  {
+    return virtualMemoryStatusHistory;
+  }
+
+  public Queue<String> getCpuStatusHistory()
+  {
+    return cpuStatusHistory;
+  }
+
+  public static class TaskObject
+  {
+    /**
+     * This field stores the task information as json
+     */
+    private JSONObject json;
+    /**
+     * This field tells if the object was modified
+     */
+    private boolean modified;
+
+    public TaskObject(JSONObject json)
+    {
+      modified = true;
+      this.json = json;
+    }
+
+    /**
+     * This returns the task information as json
+     *
+     * @return
+     */
+    public JSONObject getJson()
+    {
+      return json;
+    }
+
+    /**
+     * This stores the task information as json
+     *
+     * @param json
+     */
+    public void setJson(JSONObject json)
+    {
+      this.json = json;
+    }
+
+    /**
+     * This returns if the json object has been modified
+     *
+     * @return
+     */
+    public boolean isModified()
+    {
+      return modified;
+    }
+
+    /**
+     * This sets if the json object is modified
+     *
+     * @param modified
+     */
+    public void setModified(boolean modified)
+    {
+      this.modified = modified;
+    }
+
+    /**
+     * This returns the string format of the json object
+     *
+     * @return
+     */
+    public String getJsonString()
+    {
+      return json.toString();
+    }
+  }
+
+  private static Logger logger = LoggerFactory.getLogger(MRStatusObject.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRUtil.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRUtil.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRUtil.java
new file mode 100644
index 0000000..1a82b99
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import java.io.IOException;
+
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+/**
+ * <p>
+ * Util class.
+ * </p>
+ *
+ * @since 0.3.4
+ */
+public class MRUtil
+{
+
+  private static final Logger logger = LoggerFactory.getLogger(MRUtil.class);
+
+  /**
+   * This method returns the response content for a given url
+   * @param url
+   * @return
+   */
+  public static String getJsonForURL(String url)
+  {
+    HttpClient httpclient = new DefaultHttpClient();
+    logger.debug(url);
+    try {
+
+
+      HttpGet httpget = new HttpGet(url);
+
+      // Create a response handler
+      ResponseHandler<String> responseHandler = new BasicResponseHandler();
+      String responseBody;
+      try {
+        responseBody = httpclient.execute(httpget, responseHandler);
+
+      } catch (ClientProtocolException e) {
+        logger.debug(e.getMessage());
+        return null;
+
+      } catch (IOException e) {
+        logger.debug(e.getMessage());
+        return null;
+      } catch (Exception e) {
+        logger.debug(e.getMessage());
+        return null;
+      }
+      return responseBody.trim();
+    } finally {
+      httpclient.getConnectionManager().shutdown();
+    }
+  }
+
+  /**
+   * This method returns the JSONObject for a given string
+   * @param json
+   * @return
+   */
+  public static JSONObject getJsonObject(String json)
+  {
+    try {
+      JSONObject jsonObj = new JSONObject(json);
+      return jsonObj;
+    } catch (Exception e) {
+      logger.debug("{}", e.getMessage());
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MapToMRObjectOperator.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MapToMRObjectOperator.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MapToMRObjectOperator.java
new file mode 100644
index 0000000..f4ce4fb
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MapToMRObjectOperator.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import java.util.Map;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+
+/**
+ * <p>MapToMRObjectOperator class.</p>
+ *
+ * @since 0.9.0
+ */
+public class MapToMRObjectOperator implements Operator
+{
+
+  public final transient DefaultInputPort<Map<String, String>> input = new DefaultInputPort<Map<String, String>>()
+  {
+    @Override
+    public void process(Map<String, String> tuple)
+    {
+      MRStatusObject mrStatusObj = new MRStatusObject();
+
+      for (Map.Entry<String, String> e : tuple.entrySet()) {
+        if (e.getKey().equals(Constants.QUERY_KEY_COMMAND)) {
+          mrStatusObj.setCommand(e.getValue());
+        } else if (e.getKey().equals(Constants.QUERY_API_VERSION)) {
+          mrStatusObj.setApiVersion(e.getValue());
+        } else if (e.getKey().equals(Constants.QUERY_APP_ID)) {
+          mrStatusObj.setAppId(e.getValue());
+        } else if (e.getKey().equals(Constants.QUERY_HADOOP_VERSION)) {
+          mrStatusObj.setHadoopVersion(Integer.parseInt(e.getValue()));
+        } else if (e.getKey().equals(Constants.QUERY_HOST_NAME)) {
+          mrStatusObj.setUri(e.getValue());
+        } else if (e.getKey().equals(Constants.QUERY_HS_PORT)) {
+          mrStatusObj.setHistoryServerPort(Integer.parseInt(e.getValue()));
+        } else if (e.getKey().equals(Constants.QUERY_JOB_ID)) {
+          mrStatusObj.setJobId(e.getValue());
+        } else if (e.getKey().equals(Constants.QUERY_RM_PORT)) {
+          mrStatusObj.setRmPort(Integer.parseInt(e.getValue()));
+        }
+      }
+      output.emit(mrStatusObj);
+
+    }
+  };
+
+  public final transient DefaultOutputPort<MRStatusObject> output = new DefaultOutputPort<MRStatusObject>();
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/resources/META-INF/properties.xml b/examples/mrmonitor/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..fdda52b
--- /dev/null
+++ b/examples/mrmonitor/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.JobMonitor.attr.PARTITIONER</name>
+    <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.JobMonitor.attr.APPLICATION_WINDOW_COUNT</name>
+    <value>4</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.JobMonitor.prop.maxJobs</name>
+    <value>25</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.JobMonitor.prop.maxJobs</name>
+    <value>25</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.Query.prop.topic</name>
+    <value>contrib.summit.mrDebugger.mrDebuggerQuery</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.JobOutput.prop.topic</name>
+    <value>contrib.summit.mrDebugger.jobResult</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.MapJob.prop.topic</name>
+    <value>contrib.summit.mrDebugger.mapResult</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.ReduceJob.prop.topic</name>
+    <value>contrib.summit.mrDebugger.reduceResult</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.JobCounter.prop.topic</name>
+    <value>contrib.summit.mrDebugger.counterResult</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.stream.QueryConversion.locality</name>
+    <value>CONTAINER_LOCAL</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/resources/mrdebugger.html
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/resources/mrdebugger.html b/examples/mrmonitor/src/main/resources/mrdebugger.html
new file mode 100644
index 0000000..dddde06
--- /dev/null
+++ b/examples/mrmonitor/src/main/resources/mrdebugger.html
@@ -0,0 +1,237 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!doctype html>
+<html>
+<head>
+<title>Mobile Example</title>
+
+<META HTTP-EQUIV="CACHE-CONTROL" CONTENT="NO-CACHE">
+<meta name="viewport" content="initial-scale=1.0, user-scalable=no">
+<meta charset="utf-8">
+<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
+<script src="http://ajax.googleapis.com/ajax/libs/jqueryui/1.10.3/jquery-ui.min.js"></script>
+<!--script src="js/vendor/jquery/dist/jquery.js"></script-->
+<style>
+	body {
+        margin: 0;
+	}
+
+    .phone-input {
+        margin-left: 0.5em;
+        margin-right: 0.5em;
+    }
+</style>
+
+</head>
+
+
+<body>
+
+
+<script>
+
+
+
+var map;
+var markers = {};
+
+$(function() {
+
+    $("#query1AddButton").click(function() {
+    
+	    var app_id = $("input#app_id").val();
+	    var hostname = $("input#hostname").val();
+	    var job_id = $("input#job_id").val();
+	    var hadoop_version= $("input#hadoop_version").val();
+	    var api_version = $("input#api_version").val();
+	    var rm_port = $("input#rm_port").val();
+	    var hs_port = $("input#hs_port").val();
+	    
+	    var jsonData = {
+		command : 'add',
+		hostname:hostname,
+		app_id:app_id,
+		job_id:job_id,
+		hadoop_version:hadoop_version,
+		api_version:api_version,
+		rm_port:rm_port,
+		hs_port:hs_port
+	    };
+    
+	    sendQuery(jsonData, function() {
+	        $('#query1SubmitConfirm').html("<div id='message'></div>");
+	        $('#message').html("<h2>Add   submitted to application!</h2>")
+	        .append("<p>Result will appear on page shortly.</p>");
+            });
+	    
+	    return false;
+    });
+
+    $("#query1DeleteButton").click(function() {
+    
+	    var job_id = $("input#job_id").val();
+	    
+	    var jsonData = {
+		command : 'delete',
+	        query : job_id 
+	    };
+    
+	    sendQuery(jsonData, function() {
+	        $('#query1SubmitConfirm').html("<div id='message'></div>");
+	        $('#message').html("<h2>Add " + phone + " submitted to application!</h2>")
+	        .append("<p>Result will appear on page shortly.</p>");
+            });
+	    
+	    return false;
+    });
+
+    function sendQuery(jsonData, callback) {
+        var ws = new WebSocket('ws://'+window.location.host+'/pubsub');
+
+        ws.onopen = function () {
+          var topic = "contrib.summit.mrDebugger.mrDebuggerQuery";  
+          var msg = JSON.stringify({ "type" : "publish", "topic" : topic, "data" : jsonData });
+          ws.send(msg);
+          console.log("published to: " + topic + " data: " + msg);
+          ws.close();
+          if (callback) callback();
+        };
+
+        ws.onerror = function (error) {
+          console.log('WebSocket Error ' + error);
+        };
+
+        ws.onmessage = function (e) {
+          console.log('Server: ' + e.data);
+        };
+        ws.onclose = function (e) {
+            console.log('close: ' , e);
+        };
+
+    }
+
+    var ws = new WebSocket('ws://'+window.location.host+'/pubsub');
+    var topic = "contrib.summit.mrDebugger.jobResult";  
+
+    ws.onopen = function () {
+      var msg = JSON.stringify({ "type":"subscribe", "topic": topic});
+      console.log("sending: " + msg);
+      ws.send(msg);
+    };
+
+    ws.onerror = function (error) {
+      console.log('WebSocket Error ' + error);
+    };
+
+    ws.onmessage = function (e){ 
+
+	$('#jobQueryResult').append(e.data+"\n");	
+    };      
+    
+
+    var mapws = new WebSocket('ws://'+window.location.host+'/pubsub');
+    var maptopic = "contrib.summit.mrDebugger.mapResult";  
+
+    mapws.onopen = function () {
+      var msg = JSON.stringify({ "type":"subscribe", "topic": maptopic});
+      console.log("sending: " + msg);
+      mapws.send(msg);
+    };
+
+    mapws.onerror = function (error) {
+      console.log('WebSocket Error ' + error);
+    };
+
+    mapws.onmessage = function (e){ 
+
+	$('#jobMapQueryResult').append(e.data+"\n");	
+    };      
+
+
+    var reducews = new WebSocket('ws://'+window.location.host+'/pubsub');
+    var reducetopic = "contrib.summit.mrDebugger.reduceResult";  
+
+    reducews.onopen = function () {
+      var msg = JSON.stringify({ "type":"subscribe", "topic": reducetopic});
+      console.log("sending: " + msg);
+      reducews.send(msg);
+    };
+
+    reducews.onerror = function (error) {
+      console.log('WebSocket Error ' + error);
+    };
+
+    reducews.onmessage = function (e){ 
+
+	$('#jobReduceQueryResult').append(e.data+"\n");	
+    };      
+    
+  });
+
+
+</script>
+
+
+<div id="query1FormDiv">
+		  <form name="query1" action="">
+		    <p>
+		      <label for="phone" id="app_id_label">Application Id</label>
+		      <input type="text" name="app_id" id="app_id" size="30" value="" class="phone-input" />
+		    </p>
+		    <p>
+		      <label for="phone" id="job_id_label">Job Id</label>
+		      <input type="text" name="job_id" id="job_id" size="30" value="" class="phone-input" />
+		    </p>
+		    <p>
+		      <label for="phone" id="hostname_label">Hostname</label>
+		      <input type="text" name="hostname" id="hostname" size="30" value="" class="phone-input" />
+		    </p>
+		    <p>
+		      <label for="phone" id="rm_port_label">RM port</label>
+		      <input type="text" name="rm_port" id="rm_port" size="30" value="" class="phone-input" />
+		    </p>
+		    <p>
+		      <label for="phone" id="hs_port_label">History Server port</label>
+		      <input type="text" name="hs_port" id="hs_port" size="30" value="" class="phone-input" />
+		    </p>
+		    <p>
+		      <label for="phone" id="hadoop_version_label">Hadoop Version</label>
+		      <input type="text" name="hadoop_version" id="hadoop_version" size="30" value="" class="phone-input" />
+		    </p>
+		    <p>
+		      <label for="phone" id="api_version_label">API Version</label>
+		      <input type="text" name="api_version" id="api_version" size="30" value="" class="phone-input" />
+		    </p>
+            <p>
+		      <input type="submit" name="command" class="button" id="query1AddButton" value="Add" />
+		      <input type="submit" name="command" class="button" id="query1DeleteButton" value="Delete" />
+		      <input type="submit" name="command" class="button" id="query1ClearButton" value="Clear" />
+            </p>
+		  </form>
+		  <div id="query1SubmitConfirm"></div>
+		  <div>Job: <span id="jobQueryResult"></span></div>
+		  <div>Map Task: <span id="jobMapQueryResult"></span></div>
+		  <div>Reduce Job: <span id="jobReduceQueryResult"></span></div>
+		</div>
+
+</body>
+</html>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/test/java/org/apache/apex/examples/mrmonitor/MrMonitoringApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/test/java/org/apache/apex/examples/mrmonitor/MrMonitoringApplicationTest.java b/examples/mrmonitor/src/test/java/org/apache/apex/examples/mrmonitor/MrMonitoringApplicationTest.java
new file mode 100644
index 0000000..b094ddb
--- /dev/null
+++ b/examples/mrmonitor/src/test/java/org/apache/apex/examples/mrmonitor/MrMonitoringApplicationTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import javax.servlet.Servlet;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
+
+/**
+ * <p>MapReduceDebuggerApplicationTest class.</p>
+ *
+ * @since 0.3.4
+ */
+
+public class MrMonitoringApplicationTest
+{
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    Configuration conf = new Configuration(false);
+    conf.addResource("dt-site-monitoring.xml");
+    Server server = new Server(0);
+    Servlet servlet = new SamplePubSubWebSocketServlet();
+    ServletHolder sh = new ServletHolder(servlet);
+    ServletContextHandler contextHandler = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
+    contextHandler.addServlet(sh, "/pubsub");
+    contextHandler.addServlet(sh, "/*");
+    server.start();
+    Connector[] connector = server.getConnectors();
+    conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort());
+
+    MRMonitoringApplication application = new MRMonitoringApplication();
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(application, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.run(10000);
+    server.stop();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/test/resources/dt-site-monitoring.xml
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/test/resources/dt-site-monitoring.xml b/examples/mrmonitor/src/test/resources/dt-site-monitoring.xml
new file mode 100644
index 0000000..88bae44
--- /dev/null
+++ b/examples/mrmonitor/src/test/resources/dt-site-monitoring.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.JobMonitor.attr.INITIAL_PARTITION_COUNT</name>
+    <value>1</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.JobMonitor.attr.APPLICATION_WINDOW_COUNT</name>
+    <value>4</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.JobMonitor.prop.maxJobs</name>
+    <value>25</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.JobMonitor.prop.maxJobs</name>
+    <value>25</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.Query.prop.topic</name>
+    <value>contrib.summit.mrDebugger.mrDebuggerQuery</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.JobOutput.prop.topic</name>
+    <value>contrib.summit.mrDebugger.jobResult</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.MapJob.prop.topic</name>
+    <value>contrib.summit.mrDebugger.mapResult</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.ReduceJob.prop.topic</name>
+    <value>contrib.summit.mrDebugger.reduceResult</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.operator.JobCounter.prop.topic</name>
+    <value>contrib.summit.mrDebugger.counterResult</value>
+  </property>
+  <property>
+    <name>dt.application.MRMonitoringExample.stream.QueryConversion.locality</name>
+    <value>CONTAINER_LOCAL</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/test/resources/log4j.properties b/examples/mrmonitor/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/mrmonitor/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/pom.xml
----------------------------------------------------------------------
diff --git a/examples/mroperator/pom.xml b/examples/mroperator/pom.xml
new file mode 100644
index 0000000..c9a7b65
--- /dev/null
+++ b/examples/mroperator/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  
+  <artifactId>malhar-examples-mroperator</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar MR Operator Example</name>
+  <description></description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+
+  <properties>
+    <skipTests>true</skipTests>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/assemble/appPackage.xml b/examples/mroperator/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/mroperator/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/DateWritable.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/DateWritable.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/DateWritable.java
new file mode 100644
index 0000000..4cf1f5a
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/DateWritable.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * <p>DateWritable class.</p>
+ *
+ * @since 0.9.0
+ */
+public class DateWritable implements WritableComparable<DateWritable>
+{
+  private static final SimpleDateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd' T 'HH:mm:ss.SSS" );
+  private Date date;
+
+  public Date getDate()
+  {
+    return date;
+  }
+
+  public void setDate( Date date )
+  {
+    this.date = date;
+  }
+
+  public void readFields( DataInput in ) throws IOException
+  {
+    date = new Date( in.readLong() );
+  }
+
+  public void write( DataOutput out ) throws IOException
+  {
+    out.writeLong( date.getTime() );
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    return toString().equals(o.toString());
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return toString().hashCode();
+  }
+
+  public String toString()
+  {
+    return formatter.format( date);
+  }
+
+  public int compareTo( DateWritable other )
+  {
+    return date.compareTo( other.getDate() );
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/HdfsKeyValOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/HdfsKeyValOutputOperator.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/HdfsKeyValOutputOperator.java
new file mode 100644
index 0000000..94e17c1
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/HdfsKeyValOutputOperator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator;
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/**
+ * Adapter for writing KeyHashValPair objects to HDFS
+ * <p>
+ * Serializes tuples into a HDFS file.<br/>
+ * </p>
+ *
+ * @param <K> Key type
+ * @param <V> Value type
+ * @since 0.9.4
+ */
+public class HdfsKeyValOutputOperator<K, V> extends AbstractSingleFileOutputOperator<KeyHashValPair<K, V>>
+{
+  @Override
+  public byte[] getBytesForTuple(KeyHashValPair<K,V> t)
+  {
+    return (t.toString() + "\n").getBytes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/InvertedIndexApplication.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/InvertedIndexApplication.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/InvertedIndexApplication.java
new file mode 100644
index 0000000..af97bc4
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/InvertedIndexApplication.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+/**
+ * <p>InvertedIndexApplication class.</p>
+ *
+ * @since 0.9.0
+ */
+@ApplicationAnnotation(name = "InvertedIndexExample")
+public class InvertedIndexApplication extends MapReduceApplication<LongWritable, Text, Text, Text>
+{
+
+  InvertedIndexApplication()
+  {
+    setMapClass(LineIndexer.LineIndexMapper.class);
+    setReduceClass(LineIndexer.LineIndexReducer.class);
+
+    setInputFormat(TextInputFormat.class);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LineIndexer.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LineIndexer.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LineIndexer.java
new file mode 100644
index 0000000..a2c589d
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LineIndexer.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * <p>LineIndexer class.</p>
+ *
+ * @since 0.9.0
+ */
+public class LineIndexer
+{
+
+  public static class LineIndexMapper extends MapReduceBase
+      implements Mapper<LongWritable, Text, Text, Text>
+  {
+    private static final Text word = new Text();
+    private static final Text location = new Text();
+
+    public void map(LongWritable key, Text val,
+        OutputCollector<Text, Text> output, Reporter reporter) throws IOException
+    {
+      FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
+      String fileName = fileSplit.getPath().getName();
+      location.set(fileName);
+
+      String line = val.toString();
+      StringTokenizer itr = new StringTokenizer(line.toLowerCase());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        output.collect(word, location);
+      }
+    }
+  }
+
+
+
+  public static class LineIndexReducer extends MapReduceBase
+      implements Reducer<Text, Text, Text, Text>
+  {
+    public void reduce(Text key, Iterator<Text> values,
+        OutputCollector<Text, Text> output, Reporter reporter) throws IOException
+    {
+      boolean first = true;
+      StringBuilder toReturn = new StringBuilder();
+      while (values.hasNext()) {
+        if (!first) {
+          toReturn.append(", ");
+        }
+        first = false;
+        toReturn.append(values.next().toString());
+      }
+
+      output.collect(key, new Text(toReturn.toString()));
+    }
+  }
+
+
+  /**
+   * The actual main() method for our program; this is the
+   * "driver" for the MapReduce job.
+   */
+  public static void main(String[] args)
+  {
+    JobClient client = new JobClient();
+    JobConf conf = new JobConf(LineIndexer.class);
+
+    conf.setJobName("LineIndexer");
+
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class);
+
+    FileInputFormat.addInputPath(conf, new Path("input"));
+    FileOutputFormat.setOutputPath(conf, new Path("output"));
+
+    conf.setMapperClass(LineIndexMapper.class);
+    conf.setReducerClass(LineIndexReducer.class);
+
+    client.setConf(conf);
+
+    try {
+      JobClient.runJob(conf);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogCountsPerHour.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogCountsPerHour.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogCountsPerHour.java
new file mode 100644
index 0000000..6255810
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogCountsPerHour.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * <p>LogCountsPerHour class.</p>
+ *
+ * @since 0.9.0
+ */
+public class LogCountsPerHour extends Configured implements Tool
+{
+
+  public static class LogMapClass extends MapReduceBase
+      implements Mapper<LongWritable, Text, DateWritable, IntWritable>
+  {
+    private DateWritable date = new DateWritable();
+    private static final IntWritable one = new IntWritable(1);
+
+    public void map(LongWritable key, Text value, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException
+    {
+      // Get the value as a String; it is of the format:
+      // 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
+      String text = value.toString();
+
+      // Get the date and time
+      int openBracket = text.indexOf('[');
+      int closeBracket = text.indexOf(']');
+      if (openBracket != -1 && closeBracket != -1) {
+        // Read the date
+        String dateString = text.substring(text.indexOf('[') + 1, text.indexOf(']'));
+
+        // Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500
+        int index = 0;
+        int nextIndex = dateString.indexOf('/');
+        int day = Integer.parseInt(dateString.substring(index, nextIndex));
+
+        index = nextIndex;
+        nextIndex = dateString.indexOf('/', index + 1);
+        String month = dateString.substring(index + 1, nextIndex);
+
+        index = nextIndex;
+        nextIndex = dateString.indexOf(':', index);
+        int year = Integer.parseInt(dateString.substring(index + 1, nextIndex));
+
+        index = nextIndex;
+        nextIndex = dateString.indexOf(':', index + 1);
+        int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex));
+
+        // Build a calendar object for this date
+        Calendar calendar = Calendar.getInstance();
+        calendar.set(Calendar.DATE, day);
+        calendar.set(Calendar.YEAR, year);
+        calendar.set(Calendar.HOUR, hour);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND, 0);
+
+        if (month.equalsIgnoreCase("dec")) {
+          calendar.set(Calendar.MONTH, Calendar.DECEMBER);
+        } else if (month.equalsIgnoreCase("nov")) {
+          calendar.set(Calendar.MONTH, Calendar.NOVEMBER);
+        } else if (month.equalsIgnoreCase("oct")) {
+          calendar.set(Calendar.MONTH, Calendar.OCTOBER);
+        } else if (month.equalsIgnoreCase("sep")) {
+          calendar.set(Calendar.MONTH, Calendar.SEPTEMBER);
+        } else if (month.equalsIgnoreCase("aug")) {
+          calendar.set(Calendar.MONTH, Calendar.AUGUST);
+        } else if (month.equalsIgnoreCase("jul")) {
+          calendar.set(Calendar.MONTH, Calendar.JULY);
+        } else if (month.equalsIgnoreCase("jun")) {
+          calendar.set(Calendar.MONTH, Calendar.JUNE);
+        } else if (month.equalsIgnoreCase("may")) {
+          calendar.set(Calendar.MONTH, Calendar.MAY);
+        } else if (month.equalsIgnoreCase("apr")) {
+          calendar.set(Calendar.MONTH, Calendar.APRIL);
+        } else if (month.equalsIgnoreCase("mar")) {
+          calendar.set(Calendar.MONTH, Calendar.MARCH);
+        } else if (month.equalsIgnoreCase("feb")) {
+          calendar.set(Calendar.MONTH, Calendar.FEBRUARY);
+        } else if (month.equalsIgnoreCase("jan")) {
+          calendar.set(Calendar.MONTH, Calendar.JANUARY);
+        }
+
+
+        // Output the date as the key and 1 as the value
+        date.setDate(calendar.getTime());
+        output.collect(date, one);
+      }
+    }
+  }
+
+  public static class LogReduce extends MapReduceBase
+      implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable>
+  {
+    public void reduce(DateWritable key, Iterator<IntWritable> values, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException
+    {
+      // Iterate over all of the values (counts of occurrences of this word)
+      int count = 0;
+      while (values.hasNext()) {
+        // Add the value to our count
+        count += values.next().get();
+      }
+
+      // Output the word with its count (wrapped in an IntWritable)
+      output.collect(key, new IntWritable(count));
+    }
+  }
+
+
+  public int run(String[] args) throws Exception
+  {
+    // Create a configuration
+    Configuration conf = getConf();
+
+    // Create a job from the default configuration that will use the WordCount class
+    JobConf job = new JobConf(conf, LogCountsPerHour.class);
+
+    // Define our input path as the first command line argument and our output path as the second
+    Path in = new Path(args[0]);
+    Path out = new Path(args[1]);
+
+    // Create File Input/Output formats for these paths (in the job)
+    FileInputFormat.setInputPaths(job, in);
+    FileOutputFormat.setOutputPath(job, out);
+
+    // Configure the job: name, mapper, reducer, and combiner
+    job.setJobName("LogAveragePerHour");
+    job.setMapperClass(LogMapClass.class);
+    job.setReducerClass(LogReduce.class);
+    job.setCombinerClass(LogReduce.class);
+
+    // Configure the output
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setOutputKeyClass(DateWritable.class);
+    job.setOutputValueClass(IntWritable.class);
+
+    // Run the job
+    JobClient.runJob(job);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    // Start the LogCountsPerHour MapReduce application
+    int res = ToolRunner.run(new Configuration(), new LogCountsPerHour(), args);
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogsCountApplication.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogsCountApplication.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogsCountApplication.java
new file mode 100644
index 0000000..51b082d
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogsCountApplication.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+/**
+ * <p>LogsCountApplication class.</p>
+ *
+ * @since 0.9.0
+ */
+@ApplicationAnnotation(name = "LogsCountExample")
+public class LogsCountApplication extends MapReduceApplication<LongWritable, Text, DateWritable, IntWritable>
+{
+
+  public void LogsCountApplication()
+  {
+    setMapClass(LogCountsPerHour.LogMapClass.class);
+    // setCombineClass(LogCountsPerHour.LogReduce.class);
+    setReduceClass(LogCountsPerHour.LogReduce.class);
+    setInputFormat(TextInputFormat.class);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapOperator.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapOperator.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapOperator.java
new file mode 100644
index 0000000..ce00f54
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapOperator.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.examples.mroperator.ReporterImpl.ReporterType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Partitioner;
+
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/**
+ * <p>
+ * MapOperator class.
+ * </p>
+ *
+ * @since 0.9.0
+ */
+@SuppressWarnings({ "unchecked"})
+public class MapOperator<K1, V1, K2, V2>  implements InputOperator, Partitioner<MapOperator<K1, V1, K2, V2>>
+{
+
+  private static final Logger logger = LoggerFactory.getLogger(MapOperator.class);
+  private String dirName;
+  private boolean emitPartitioningCountOnce = false;
+  private boolean emitLastCountOnce = false;
+  private int operatorId;
+  private Class<? extends InputFormat<K1, V1>> inputFormatClass;
+  private transient InputFormat<K1, V1> inputFormat;
+  private transient InputSplit inputSplit;
+  private Class<? extends InputSplit> inputSplitClass;
+  private ByteArrayOutputStream outstream = new ByteArrayOutputStream();
+  private transient RecordReader<K1, V1> reader;
+  private boolean emittedAll = false;
+  public final transient DefaultOutputPort<KeyHashValPair<Integer, Integer>> outputCount = new DefaultOutputPort<KeyHashValPair<Integer, Integer>>();
+  public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>();
+  private transient JobConf jobConf;
+  @Min(1)
+  private int partitionCount = 1;
+
+  public Class<? extends InputSplit> getInputSplitClass()
+  {
+    return inputSplitClass;
+  }
+
+  public void setInputSplitClass(Class<? extends InputSplit> inputSplitClass)
+  {
+    this.inputSplitClass = inputSplitClass;
+  }
+
+  public Class<? extends InputFormat<K1, V1>> getInputFormatClass()
+  {
+    return inputFormatClass;
+  }
+
+  public void setInputFormatClass(Class<? extends InputFormat<K1, V1>> inputFormatClass)
+  {
+    this.inputFormatClass = inputFormatClass;
+  }
+
+  public String getDirName()
+  {
+    return dirName;
+  }
+
+  public void setDirName(String dirName)
+  {
+    this.dirName = dirName;
+  }
+
+  public int getPartitionCount()
+  {
+    return partitionCount;
+  }
+
+  public void setPartitionCount(int partitionCount)
+  {
+    this.partitionCount = partitionCount;
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    if (!emitPartitioningCountOnce) {
+      outputCount.emit(new KeyHashValPair<Integer, Integer>(operatorId, 1));
+      emitPartitioningCountOnce = true;
+    }
+    if (reader == null) {
+      try {
+        reader = inputFormat.getRecordReader(inputSplit, new JobConf(new Configuration()), reporter);
+      } catch (IOException e) {
+        logger.info("error getting record reader {}", e.getMessage());
+      }
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    if (context != null) {
+      operatorId = context.getId();
+    }
+    reporter = new ReporterImpl(ReporterType.Mapper, new Counters());
+    outputCollector = new OutputCollectorImpl<K2, V2>();
+    Configuration conf = new Configuration();
+    try {
+      inputFormat = inputFormatClass.newInstance();
+      SerializationFactory serializationFactory = new SerializationFactory(conf);
+      Deserializer keyDesiralizer = serializationFactory.getDeserializer(inputSplitClass);
+      keyDesiralizer.open(new ByteArrayInputStream(outstream.toByteArray()));
+      inputSplit = (InputSplit)keyDesiralizer.deserialize(null);
+      ((ReporterImpl)reporter).setInputSplit(inputSplit);
+      reader = inputFormat.getRecordReader(inputSplit, new JobConf(conf), reporter);
+    } catch (Exception e) {
+      logger.info("failed to initialize inputformat obj {}", inputFormat);
+      throw new RuntimeException(e);
+    }
+    InputStream stream = null;
+    if (configFile != null && configFile.length() > 0) {
+      stream = ClassLoader.getSystemResourceAsStream("/" + configFile);
+      if (stream == null) {
+        stream = ClassLoader.getSystemResourceAsStream(configFile);
+      }
+    }
+    if (stream != null) {
+      conf.addResource(stream);
+    }
+    jobConf = new JobConf(conf);
+    if (mapClass != null) {
+      try {
+        mapObject = mapClass.newInstance();
+      } catch (Exception e) {
+        logger.info("can't instantiate object {}", e.getMessage());
+      }
+
+      mapObject.configure(jobConf);
+    }
+    if (combineClass != null) {
+      try {
+        combineObject = combineClass.newInstance();
+      } catch (Exception e) {
+        logger.info("can't instantiate object {}", e.getMessage());
+      }
+      combineObject.configure(jobConf);
+    }
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    if (!emittedAll) {
+      try {
+        K1 key = reader.createKey();
+        V1 val = reader.createValue();
+        emittedAll = !reader.next(key, val);
+        if (!emittedAll) {
+          KeyHashValPair<K1, V1> keyValue = new KeyHashValPair<K1, V1>(key, val);
+          mapObject.map(keyValue.getKey(), keyValue.getValue(), outputCollector, reporter);
+          if (combineObject == null) {
+            List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList();
+            for (KeyHashValPair<K2, V2> e : list) {
+              output.emit(e);
+            }
+            list.clear();
+          }
+        }
+      } catch (IOException ex) {
+        logger.debug(ex.toString());
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList();
+    if (combineObject != null) {
+      Map<K2, List<V2>> cacheObject = new HashMap<K2, List<V2>>();
+      for (KeyHashValPair<K2, V2> tuple : list) {
+        List<V2> cacheList = cacheObject.get(tuple.getKey());
+        if (cacheList == null) {
+          cacheList = new ArrayList<V2>();
+          cacheList.add(tuple.getValue());
+          cacheObject.put(tuple.getKey(), cacheList);
+        } else {
+          cacheList.add(tuple.getValue());
+        }
+      }
+      list.clear();
+      OutputCollector<K2, V2> tempOutputCollector = new OutputCollectorImpl<K2, V2>();
+      for (Map.Entry<K2, List<V2>> e : cacheObject.entrySet()) {
+        try {
+          combineObject.reduce(e.getKey(), e.getValue().iterator(), tempOutputCollector, reporter);
+        } catch (IOException e1) {
+          logger.info(e1.getMessage());
+        }
+      }
+      list = ((OutputCollectorImpl<K2, V2>)tempOutputCollector).getList();
+      for (KeyHashValPair<K2, V2> e : list) {
+        output.emit(e);
+      }
+    }
+    if (!emitLastCountOnce && emittedAll) {
+      outputCount.emit(new KeyHashValPair<Integer, Integer>(operatorId, -1));
+      logger.info("emitting end of file {}", new KeyHashValPair<Integer, Integer>(operatorId, -1));
+      emitLastCountOnce = true;
+    }
+    list.clear();
+  }
+
+  private InputSplit[] getSplits(JobConf conf, int numSplits, String path) throws Exception
+  {
+    FileInputFormat.setInputPaths(conf, new Path(path));
+    if (inputFormat == null) {
+      inputFormat = inputFormatClass.newInstance();
+      String inputFormatClassName = inputFormatClass.getName();
+      if (inputFormatClassName.equals("org.apache.hadoop.mapred.TextInputFormat")) {
+        ((TextInputFormat)inputFormat).configure(conf);
+      } else if (inputFormatClassName.equals("org.apache.hadoop.mapred.KeyValueTextInputFormat")) {
+        ((KeyValueTextInputFormat)inputFormat).configure(conf);
+      }
+    }
+    return inputFormat.getSplits(conf, numSplits);
+    // return null;
+  }
+
+  @Override
+  public void partitioned(Map<Integer, Partition<MapOperator<K1, V1, K2, V2>>> partitions)
+  {
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Collection<Partition<MapOperator<K1, V1, K2, V2>>> definePartitions(Collection<Partition<MapOperator<K1, V1, K2, V2>>> partitions, PartitioningContext context)
+  {
+    int tempPartitionCount = partitionCount;
+
+    Collection c = partitions;
+    Collection<Partition<MapOperator<K1, V1, K2, V2>>> operatorPartitions = c;
+    Partition<MapOperator<K1, V1, K2, V2>> template;
+    Iterator<Partition<MapOperator<K1, V1, K2, V2>>> itr = operatorPartitions.iterator();
+    template = itr.next();
+    Configuration conf = new Configuration();
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    if (outstream.size() == 0) {
+      InputSplit[] splits;
+      try {
+        splits = getSplits(new JobConf(conf), tempPartitionCount, template.getPartitionedInstance().getDirName());
+      } catch (Exception e1) {
+        logger.info(" can't get splits {}", e1.getMessage());
+        throw new RuntimeException(e1);
+      }
+      Collection<Partition<MapOperator<K1, V1, K2, V2>>> operList = new ArrayList<Partition<MapOperator<K1, V1, K2, V2>>>();
+      itr = operatorPartitions.iterator();
+      int size = splits.length;
+      Serializer keySerializer = serializationFactory.getSerializer(splits[0].getClass());
+      while (size > 0 && itr.hasNext()) {
+        Partition<MapOperator<K1, V1, K2, V2>> p = itr.next();
+        MapOperator<K1, V1, K2, V2> opr = p.getPartitionedInstance();
+        opr.setInputFormatClass(inputFormatClass);
+        opr.setMapClass(mapClass);
+        opr.setCombineClass(combineClass);
+        opr.setConfigFile(configFile);
+        try {
+          keySerializer.open(opr.getOutstream());
+          keySerializer.serialize(splits[size - 1]);
+          opr.setInputSplitClass(splits[size - 1].getClass());
+        } catch (IOException e) {
+          logger.info("error while serializing {}", e.getMessage());
+        }
+        size--;
+        operList.add(p);
+      }
+      while (size > 0) {
+        MapOperator<K1, V1, K2, V2> opr = new MapOperator<K1, V1, K2, V2>();
+        opr.setInputFormatClass(inputFormatClass);
+        opr.setMapClass(mapClass);
+        opr.setCombineClass(combineClass);
+        opr.setConfigFile(configFile);
+        try {
+          keySerializer.open(opr.getOutstream());
+          keySerializer.serialize(splits[size - 1]);
+          opr.setInputSplitClass(splits[size - 1].getClass());
+        } catch (IOException e) {
+          logger.info("error while serializing {}", e.getMessage());
+        }
+        size--;
+        operList.add(new DefaultPartition<MapOperator<K1, V1, K2, V2>>(opr));
+      }
+      try {
+        keySerializer.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return operList;
+    }
+    return null;
+  }
+
+  public ByteArrayOutputStream getOutstream()
+  {
+    return outstream;
+  }
+
+  public void setOutstream(ByteArrayOutputStream outstream)
+  {
+    this.outstream = outstream;
+  }
+
+  /**
+   * adding map code
+   */
+
+  private Class<? extends Mapper<K1, V1, K2, V2>> mapClass;
+  private Class<? extends Reducer<K2, V2, K2, V2>> combineClass;
+
+  private transient Mapper<K1, V1, K2, V2> mapObject;
+  private transient Reducer<K2, V2, K2, V2> combineObject;
+  private transient Reporter reporter;
+
+  private String configFile;
+
+  public String getConfigFile()
+  {
+    return configFile;
+  }
+
+  public void setConfigFile(String configFile)
+  {
+    this.configFile = configFile;
+  }
+
+  private transient OutputCollector<K2, V2> outputCollector;
+
+  public Class<? extends Mapper<K1, V1, K2, V2>> getMapClass()
+  {
+    return mapClass;
+  }
+
+  public void setMapClass(Class<? extends Mapper<K1, V1, K2, V2>> mapClass)
+  {
+    this.mapClass = mapClass;
+  }
+
+  public Class<? extends Reducer<K2, V2, K2, V2>> getCombineClass()
+  {
+    return combineClass;
+  }
+
+  public void setCombineClass(Class<? extends Reducer<K2, V2, K2, V2>> combineClass)
+  {
+    this.combineClass = combineClass;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapReduceApplication.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapReduceApplication.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapReduceApplication.java
new file mode 100644
index 0000000..98f4dc7
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapReduceApplication.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+
+/**
+ * <p>
+ * Abstract MapReduceApplication class.
+ * </p>
+ *
+ * @since 0.9.0
+ */
+@ApplicationAnnotation(name = "MapReduceExample")
+public abstract class MapReduceApplication<K1, V1, K2, V2> implements StreamingApplication
+{
+  Class<? extends InputFormat<K1, V1>> inputFormat;
+  Class<? extends Mapper<K1, V1, K2, V2>> mapClass;
+  Class<? extends Reducer<K2, V2, K2, V2>> reduceClass;
+  Class<? extends Reducer<K2, V2, K2, V2>> combineClass;
+
+  public Class<? extends Reducer<K2, V2, K2, V2>> getCombineClass()
+  {
+    return combineClass;
+  }
+
+  public void setCombineClass(Class<? extends Reducer<K2, V2, K2, V2>> combineClass)
+  {
+    this.combineClass = combineClass;
+  }
+
+  public void setInputFormat(Class<? extends InputFormat<K1, V1>> inputFormat)
+  {
+    this.inputFormat = inputFormat;
+  }
+
+  public Class<? extends Mapper<K1, V1, K2, V2>> getMapClass()
+  {
+    return mapClass;
+  }
+
+  public void setMapClass(Class<? extends Mapper<K1, V1, K2, V2>> mapClass)
+  {
+    this.mapClass = mapClass;
+  }
+
+  public Class<? extends Reducer<K2, V2, K2, V2>> getReduceClass()
+  {
+    return reduceClass;
+  }
+
+  public void setReduceClass(Class<? extends Reducer<K2, V2, K2, V2>> reduceClass)
+  {
+    this.reduceClass = reduceClass;
+  }
+
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    String configurationFilePath = conf.get(this.getClass().getSimpleName() + ".configFile", "");
+
+    MapOperator<K1, V1, K2, V2> inputOperator = dag.addOperator("Mapper", new MapOperator<K1, V1, K2, V2>());
+    inputOperator.setInputFormatClass(inputFormat);
+
+    String configFileName = null;
+    if (configurationFilePath != null && !configurationFilePath.isEmpty()) {
+      StringTokenizer configFileTokenizer = new StringTokenizer(configurationFilePath, "/");
+      configFileName = configFileTokenizer.nextToken();
+      while (configFileTokenizer.hasMoreTokens()) {
+        configFileName = configFileTokenizer.nextToken();
+      }
+    }
+
+    inputOperator.setMapClass(mapClass);
+    inputOperator.setConfigFile(configFileName);
+    inputOperator.setCombineClass(combineClass);
+
+    ReduceOperator<K2, V2, K2, V2> reduceOpr = dag.addOperator("Reducer", new ReduceOperator<K2, V2, K2, V2>());
+    reduceOpr.setReduceClass(reduceClass);
+    reduceOpr.setConfigFile(configFileName);
+
+    HdfsKeyValOutputOperator<K2, V2> console = dag.addOperator("Console", new HdfsKeyValOutputOperator<K2, V2>());
+
+    dag.addStream("Mapped-Output", inputOperator.output, reduceOpr.input);
+    dag.addStream("Mapper-Count", inputOperator.outputCount, reduceOpr.inputCount);
+    dag.addStream("Reduced-Output", reduceOpr.output, console.input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/NewWordCountApplication.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/NewWordCountApplication.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/NewWordCountApplication.java
new file mode 100644
index 0000000..db32b4a
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/NewWordCountApplication.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+/**
+ * <p>NewWordCountApplication class.</p>
+ *
+ * @since 0.9.0
+ */
+@ApplicationAnnotation(name = "WordCountExample")
+public class NewWordCountApplication extends MapReduceApplication<LongWritable, Text, Text, IntWritable>
+{
+
+  public void NewWordCountApplication()
+  {
+    setMapClass(WordCount.Map.class);
+    setReduceClass(WordCount.Reduce.class);
+    setCombineClass(WordCount.Reduce.class);
+    setInputFormat(TextInputFormat.class);
+  }
+}