You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:11 UTC
[06/30] apex-malhar git commit: Renamed demos to examples. Packages
and artifactid names are changed as suggested.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRStatusObject.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRStatusObject.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRStatusObject.java
new file mode 100644
index 0000000..ae8b551
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRStatusObject.java
@@ -0,0 +1,501 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * MRStatusObject class.
+ * </p>
+ *
+ * @since 0.3.4
+ */
+public class MRStatusObject
+{
+ private String command;
+ /**
+ * This stores the Resource Manager/ Task Manager's host information
+ */
+ private String uri;
+ /**
+ * This field stores the job id
+ */
+ private String jobId;
+ /**
+ * This field stores the api version of the rest apis
+ */
+ private String apiVersion;
+ /**
+ * This field stores the hadoop version 1 for 1.x and 2 for 2.x
+ */
+ private int hadoopVersion;
+ /**
+ * This field stores the app id for the hadoop 2.x
+ */
+ private String appId;
+ /**
+ * This field stores the RM port information for hadoop 2.x / Task Manager server port for hadoop 1.X from where we
+ * can get the job information
+ */
+ private int rmPort;
+ /**
+ * This field stores the history server information for hadoop 2.x from where we can get the job information
+ */
+ private int historyServerPort;
+ /**
+ * This field stores the job information as json object
+ */
+ private JSONObject jsonObject;
+ /**
+ * This field tells if the object has been modified
+ */
+ private boolean modified;
+ /**
+ * This stores the mapping of map task ids to the TaskObject
+ */
+ private Map<String, TaskObject> mapJsonObject;
+ /**
+ * This stores the mapping of reduce task ids to the TaskObject
+ */
+ private Map<String, TaskObject> reduceJsonObject;
+ /**
+ * This holds the information about the various metrics like MAP_OUTPUT_RECORDS etc for this job
+ */
+ private TaskObject metricObject;
+
+ /**
+ * This holds the number of windows occurred when the new data was retrieved for this job
+ */
+ private int retrials;
+
+ /**
+ * The scheduler is used to store the job status every 1 minute
+ */
+ private transient ScheduledExecutorService statusScheduler;
+
+ /**
+ * This stores the progress of the map tasks
+ */
+ Queue<String> mapStatusHistory;
+
+ /**
+ * This stores the progress of the reduce tasks
+ */
+ Queue<String> reduceStatusHistory;
+
+ /**
+ * This stores the history of the physical memory usage
+ */
+ Queue<String> physicalMemoryStatusHistory;
+
+ /**
+ * This stores the history of the virtual memory usage
+ */
+ Queue<String> virtualMemoryStatusHistory;
+
+ /**
+ * This stores the history of the cpu
+ */
+ Queue<String> cpuStatusHistory;
+
+ /**
+ * The number of minutes for which the status history of map and reduce tasks is stored
+ */
+ private int statusHistoryCount = 60;
+
+ /**
+ * This field notifies if history status queues have changed over time
+ */
+ private boolean changedHistoryStatus;
+
+ public MRStatusObject()
+ {
+ retrials = 0;
+ modified = true;
+ mapJsonObject = new ConcurrentHashMap<String, TaskObject>();
+ reduceJsonObject = new ConcurrentHashMap<String, TaskObject>();
+ mapStatusHistory = new LinkedList<String>();
+ reduceStatusHistory = new LinkedList<String>();
+ physicalMemoryStatusHistory = new LinkedList<String>();
+ virtualMemoryStatusHistory = new LinkedList<String>();
+ cpuStatusHistory = new LinkedList<String>();
+ statusScheduler = Executors.newScheduledThreadPool(1);
+ statusScheduler.scheduleAtFixedRate(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if (jsonObject != null) {
+ changedHistoryStatus = true;
+ if (mapStatusHistory.size() > statusHistoryCount) {
+ mapStatusHistory.poll();
+ reduceStatusHistory.poll();
+ physicalMemoryStatusHistory.poll();
+ virtualMemoryStatusHistory.poll();
+ cpuStatusHistory.poll();
+ }
+ if (hadoopVersion == 2) {
+ try {
+ mapStatusHistory.add(jsonObject.getJSONObject("job").getString("mapProgress"));
+ reduceStatusHistory.add(jsonObject.getJSONObject("job").getString("reduceProgress"));
+ if (metricObject.getJson() != null) {
+ JSONArray arr = metricObject.getJson().getJSONObject("jobCounters").getJSONArray("counterGroup");
+ int length = arr.length();
+ for (int i = 0; i < length; i++) {
+ if (arr.getJSONObject(i).get("counterGroupName").equals("org.apache.hadoop.mapreduce.TaskCounter")) {
+ JSONArray counters = arr.getJSONObject(i).getJSONArray("counter");
+ for (int j = 0; j < counters.length(); j++) {
+ JSONObject counterObj = counters.getJSONObject(j);
+ if (counterObj.get("name").equals("PHYSICAL_MEMORY_BYTES")) {
+ physicalMemoryStatusHistory.add(counterObj.getString("totalCounterValue"));
+ } else if (counterObj.get("name").equals("VIRTUAL_MEMORY_BYTES")) {
+ virtualMemoryStatusHistory.add(counterObj.getString("totalCounterValue"));
+ } else if (counterObj.get("name").equals("CPU_MILLISECONDS")) {
+ cpuStatusHistory.add(counterObj.getString("totalCounterValue"));
+ }
+ }
+ break;
+ }
+ }
+ }
+ } catch (JSONException e) {
+ logger.error("error setting status history {}", e.getMessage());
+ }
+ } else {
+ try {
+ mapStatusHistory.add(jsonObject.getJSONObject("mapTaskSummary").getString("progressPercentage"));
+ reduceStatusHistory.add(jsonObject.getJSONObject("reduceTaskSummary").getString("progressPercentage"));
+ JSONArray arr = jsonObject.getJSONArray("jobCounterGroupsInfo");
+ int length = arr.length();
+ for (int i = 0; i < length; i++) {
+ if (arr.getJSONObject(i).get("groupName").equals("Map-Reduce Framework")) {
+ JSONArray counters = arr.getJSONObject(i).getJSONArray("jobCountersInfo");
+ for (int j = 0; j < counters.length(); j++) {
+ JSONObject counterObj = counters.getJSONObject(j);
+ if (counterObj.get("name").equals("Physical memory (bytes) snapshot")) {
+ physicalMemoryStatusHistory.add(counterObj.getString("totalValue"));
+ } else if (counterObj.get("name").equals("Virtual memory (bytes) snapshot")) {
+ virtualMemoryStatusHistory.add(counterObj.getString("totalValue"));
+ } else if (counterObj.get("name").equals("CPU time spent (ms)")) {
+ cpuStatusHistory.add(counterObj.getString("totalValue"));
+ }
+ }
+ break;
+ }
+ }
+ } catch (JSONException e) {
+ logger.error("error setting status history {}", e.getMessage());
+ }
+ }
+ }
+ }
+ }, 0, 1, TimeUnit.MINUTES);
+ }
+
+ public Map<String, TaskObject> getMapJsonObject()
+ {
+ return mapJsonObject;
+ }
+
+ public void setMapJsonObject(Map<String, TaskObject> mapJsonObject)
+ {
+ this.mapJsonObject = mapJsonObject;
+ }
+
+ public Map<String, TaskObject> getReduceJsonObject()
+ {
+ return reduceJsonObject;
+ }
+
+ public void setReduceJsonObject(Map<String, TaskObject> reduceJsonObject)
+ {
+ this.reduceJsonObject = reduceJsonObject;
+ }
+
+ public String getUri()
+ {
+ return uri;
+ }
+
+ public void setUri(String uri)
+ {
+ this.uri = uri;
+ }
+
+ public String getJobId()
+ {
+ return jobId;
+ }
+
+ public void setJobId(String jobId)
+ {
+ this.jobId = jobId;
+ }
+
+ public String getApiVersion()
+ {
+ return apiVersion;
+ }
+
+ public void setApiVersion(String apiVersion)
+ {
+ this.apiVersion = apiVersion;
+ }
+
+ public int getHadoopVersion()
+ {
+ return hadoopVersion;
+ }
+
+ public void setHadoopVersion(int hadoopVersion)
+ {
+ this.hadoopVersion = hadoopVersion;
+ }
+
+ public String getAppId()
+ {
+ return appId;
+ }
+
+ public void setAppId(String appId)
+ {
+ this.appId = appId;
+ }
+
+ public int getRmPort()
+ {
+ return rmPort;
+ }
+
+ public void setRmPort(int rmPort)
+ {
+ this.rmPort = rmPort;
+ }
+
+ public int getHistoryServerPort()
+ {
+ return historyServerPort;
+ }
+
+ public void setHistoryServerPort(int historyServerPort)
+ {
+ this.historyServerPort = historyServerPort;
+ }
+
+ public JSONObject getJsonObject()
+ {
+ return jsonObject;
+ }
+
+ public void setJsonObject(JSONObject jsonObject)
+ {
+ this.jsonObject = jsonObject;
+ }
+
+ public boolean isChangedHistoryStatus()
+ {
+ return changedHistoryStatus;
+ }
+
+ public void setChangedHistoryStatus(boolean changedHistoryStatus)
+ {
+ this.changedHistoryStatus = changedHistoryStatus;
+ }
+
+ @Override
+ public boolean equals(Object that)
+ {
+ if (this == that) {
+ return true;
+ }
+ if (!(that instanceof MRStatusObject)) {
+ return false;
+ }
+ if (this.hashCode() == that.hashCode()) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return (uri + jobId + apiVersion + String.valueOf(hadoopVersion)).hashCode();
+
+ }
+
+ public String getCommand()
+ {
+ return command;
+ }
+
+ public void setCommand(String command)
+ {
+ this.command = command;
+ }
+
+ public boolean isModified()
+ {
+ return modified;
+ }
+
+ public void setModified(boolean modified)
+ {
+ this.modified = modified;
+ }
+
+ public int getRetrials()
+ {
+ return retrials;
+ }
+
+ public void setRetrials(int retrials)
+ {
+ this.retrials = retrials;
+ }
+
+ public TaskObject getMetricObject()
+ {
+ return metricObject;
+ }
+
+ public void setMetricObject(TaskObject metricObject)
+ {
+ this.metricObject = metricObject;
+ }
+
+ public int getStatusHistoryCount()
+ {
+ return statusHistoryCount;
+ }
+
+ public void setStatusHistoryCount(int statusHistoryCount)
+ {
+ this.statusHistoryCount = statusHistoryCount;
+ }
+
+ public Queue<String> getMapStatusHistory()
+ {
+ return mapStatusHistory;
+ }
+
+ public Queue<String> getReduceStatusHistory()
+ {
+ return reduceStatusHistory;
+ }
+
+ public Queue<String> getPhysicalMemeoryStatusHistory()
+ {
+ return physicalMemoryStatusHistory;
+ }
+
+ public Queue<String> getVirtualMemoryStatusHistory()
+ {
+ return virtualMemoryStatusHistory;
+ }
+
+ public Queue<String> getCpuStatusHistory()
+ {
+ return cpuStatusHistory;
+ }
+
+ public static class TaskObject
+ {
+ /**
+ * This field stores the task information as json
+ */
+ private JSONObject json;
+ /**
+ * This field tells if the object was modified
+ */
+ private boolean modified;
+
+ public TaskObject(JSONObject json)
+ {
+ modified = true;
+ this.json = json;
+ }
+
+ /**
+ * This returns the task information as json
+ *
+ * @return
+ */
+ public JSONObject getJson()
+ {
+ return json;
+ }
+
+ /**
+ * This stores the task information as json
+ *
+ * @param json
+ */
+ public void setJson(JSONObject json)
+ {
+ this.json = json;
+ }
+
+ /**
+ * This returns if the json object has been modified
+ *
+ * @return
+ */
+ public boolean isModified()
+ {
+ return modified;
+ }
+
+ /**
+ * This sets if the json object is modified
+ *
+ * @param modified
+ */
+ public void setModified(boolean modified)
+ {
+ this.modified = modified;
+ }
+
+ /**
+ * This returns the string format of the json object
+ *
+ * @return
+ */
+ public String getJsonString()
+ {
+ return json.toString();
+ }
+ }
+
+ private static Logger logger = LoggerFactory.getLogger(MRStatusObject.class);
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRUtil.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRUtil.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRUtil.java
new file mode 100644
index 0000000..1a82b99
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import java.io.IOException;
+
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+/**
+ * <p>
+ * Util class.
+ * </p>
+ *
+ * @since 0.3.4
+ */
+public class MRUtil
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(MRUtil.class);
+
+ /**
+ * This method returns the response content for a given url
+ * @param url
+ * @return
+ */
+ public static String getJsonForURL(String url)
+ {
+ HttpClient httpclient = new DefaultHttpClient();
+ logger.debug(url);
+ try {
+
+
+ HttpGet httpget = new HttpGet(url);
+
+ // Create a response handler
+ ResponseHandler<String> responseHandler = new BasicResponseHandler();
+ String responseBody;
+ try {
+ responseBody = httpclient.execute(httpget, responseHandler);
+
+ } catch (ClientProtocolException e) {
+ logger.debug(e.getMessage());
+ return null;
+
+ } catch (IOException e) {
+ logger.debug(e.getMessage());
+ return null;
+ } catch (Exception e) {
+ logger.debug(e.getMessage());
+ return null;
+ }
+ return responseBody.trim();
+ } finally {
+ httpclient.getConnectionManager().shutdown();
+ }
+ }
+
+ /**
+ * This method returns the JSONObject for a given string
+ * @param json
+ * @return
+ */
+ public static JSONObject getJsonObject(String json)
+ {
+ try {
+ JSONObject jsonObj = new JSONObject(json);
+ return jsonObj;
+ } catch (Exception e) {
+ logger.debug("{}", e.getMessage());
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MapToMRObjectOperator.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MapToMRObjectOperator.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MapToMRObjectOperator.java
new file mode 100644
index 0000000..f4ce4fb
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MapToMRObjectOperator.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import java.util.Map;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+
+/**
+ * <p>MapToMRObjectOperator class.</p>
+ *
+ * @since 0.9.0
+ */
+public class MapToMRObjectOperator implements Operator
+{
+
+ public final transient DefaultInputPort<Map<String, String>> input = new DefaultInputPort<Map<String, String>>()
+ {
+ @Override
+ public void process(Map<String, String> tuple)
+ {
+ MRStatusObject mrStatusObj = new MRStatusObject();
+
+ for (Map.Entry<String, String> e : tuple.entrySet()) {
+ if (e.getKey().equals(Constants.QUERY_KEY_COMMAND)) {
+ mrStatusObj.setCommand(e.getValue());
+ } else if (e.getKey().equals(Constants.QUERY_API_VERSION)) {
+ mrStatusObj.setApiVersion(e.getValue());
+ } else if (e.getKey().equals(Constants.QUERY_APP_ID)) {
+ mrStatusObj.setAppId(e.getValue());
+ } else if (e.getKey().equals(Constants.QUERY_HADOOP_VERSION)) {
+ mrStatusObj.setHadoopVersion(Integer.parseInt(e.getValue()));
+ } else if (e.getKey().equals(Constants.QUERY_HOST_NAME)) {
+ mrStatusObj.setUri(e.getValue());
+ } else if (e.getKey().equals(Constants.QUERY_HS_PORT)) {
+ mrStatusObj.setHistoryServerPort(Integer.parseInt(e.getValue()));
+ } else if (e.getKey().equals(Constants.QUERY_JOB_ID)) {
+ mrStatusObj.setJobId(e.getValue());
+ } else if (e.getKey().equals(Constants.QUERY_RM_PORT)) {
+ mrStatusObj.setRmPort(Integer.parseInt(e.getValue()));
+ }
+ }
+ output.emit(mrStatusObj);
+
+ }
+ };
+
+ public final transient DefaultOutputPort<MRStatusObject> output = new DefaultOutputPort<MRStatusObject>();
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/resources/META-INF/properties.xml b/examples/mrmonitor/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..fdda52b
--- /dev/null
+++ b/examples/mrmonitor/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.JobMonitor.attr.PARTITIONER</name>
+ <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.JobMonitor.attr.APPLICATION_WINDOW_COUNT</name>
+ <value>4</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.JobMonitor.prop.maxJobs</name>
+ <value>25</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.JobMonitor.prop.maxJobs</name>
+ <value>25</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.Query.prop.topic</name>
+ <value>contrib.summit.mrDebugger.mrDebuggerQuery</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.JobOutput.prop.topic</name>
+ <value>contrib.summit.mrDebugger.jobResult</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.MapJob.prop.topic</name>
+ <value>contrib.summit.mrDebugger.mapResult</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.ReduceJob.prop.topic</name>
+ <value>contrib.summit.mrDebugger.reduceResult</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.JobCounter.prop.topic</name>
+ <value>contrib.summit.mrDebugger.counterResult</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.stream.QueryConversion.locality</name>
+ <value>CONTAINER_LOCAL</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/resources/mrdebugger.html
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/resources/mrdebugger.html b/examples/mrmonitor/src/main/resources/mrdebugger.html
new file mode 100644
index 0000000..dddde06
--- /dev/null
+++ b/examples/mrmonitor/src/main/resources/mrdebugger.html
@@ -0,0 +1,237 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!doctype html>
+<html>
+<head>
+<title>Mobile Example</title>
+
+<META HTTP-EQUIV="CACHE-CONTROL" CONTENT="NO-CACHE">
+<meta name="viewport" content="initial-scale=1.0, user-scalable=no">
+<meta charset="utf-8">
+<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
+<script src="http://ajax.googleapis.com/ajax/libs/jqueryui/1.10.3/jquery-ui.min.js"></script>
+<!--script src="js/vendor/jquery/dist/jquery.js"></script-->
+<style>
+ body {
+ margin: 0;
+ }
+
+ .phone-input {
+ margin-left: 0.5em;
+ margin-right: 0.5em;
+ }
+</style>
+
+</head>
+
+
+<body>
+
+
+<script>
+
+
+
+var map;
+var markers = {};
+
+$(function() {
+
+ $("#query1AddButton").click(function() {
+
+ var app_id = $("input#app_id").val();
+ var hostname = $("input#hostname").val();
+ var job_id = $("input#job_id").val();
+ var hadoop_version= $("input#hadoop_version").val();
+ var api_version = $("input#api_version").val();
+ var rm_port = $("input#rm_port").val();
+ var hs_port = $("input#hs_port").val();
+
+ var jsonData = {
+ command : 'add',
+ hostname:hostname,
+ app_id:app_id,
+ job_id:job_id,
+ hadoop_version:hadoop_version,
+ api_version:api_version,
+ rm_port:rm_port,
+ hs_port:hs_port
+ };
+
+ sendQuery(jsonData, function() {
+ $('#query1SubmitConfirm').html("<div id='message'></div>");
+ $('#message').html("<h2>Add submitted to application!</h2>")
+ .append("<p>Result will appear on page shortly.</p>");
+ });
+
+ return false;
+ });
+
+ $("#query1DeleteButton").click(function() {
+
+ var job_id = $("input#job_id").val();
+
+ var jsonData = {
+ command : 'delete',
+ query : job_id
+ };
+
+ sendQuery(jsonData, function() {
+ $('#query1SubmitConfirm').html("<div id='message'></div>");
+ $('#message').html("<h2>Add " + phone + " submitted to application!</h2>")
+ .append("<p>Result will appear on page shortly.</p>");
+ });
+
+ return false;
+ });
+
+ function sendQuery(jsonData, callback) {
+ var ws = new WebSocket('ws://'+window.location.host+'/pubsub');
+
+ ws.onopen = function () {
+ var topic = "contrib.summit.mrDebugger.mrDebuggerQuery";
+ var msg = JSON.stringify({ "type" : "publish", "topic" : topic, "data" : jsonData });
+ ws.send(msg);
+ console.log("published to: " + topic + " data: " + msg);
+ ws.close();
+ if (callback) callback();
+ };
+
+ ws.onerror = function (error) {
+ console.log('WebSocket Error ' + error);
+ };
+
+ ws.onmessage = function (e) {
+ console.log('Server: ' + e.data);
+ };
+ ws.onclose = function (e) {
+ console.log('close: ' , e);
+ };
+
+ }
+
+ var ws = new WebSocket('ws://'+window.location.host+'/pubsub');
+ var topic = "contrib.summit.mrDebugger.jobResult";
+
+ ws.onopen = function () {
+ var msg = JSON.stringify({ "type":"subscribe", "topic": topic});
+ console.log("sending: " + msg);
+ ws.send(msg);
+ };
+
+ ws.onerror = function (error) {
+ console.log('WebSocket Error ' + error);
+ };
+
+ ws.onmessage = function (e){
+
+ $('#jobQueryResult').append(e.data+"\n");
+ };
+
+
+ var mapws = new WebSocket('ws://'+window.location.host+'/pubsub');
+ var maptopic = "contrib.summit.mrDebugger.mapResult";
+
+ mapws.onopen = function () {
+ var msg = JSON.stringify({ "type":"subscribe", "topic": maptopic});
+ console.log("sending: " + msg);
+ mapws.send(msg);
+ };
+
+ mapws.onerror = function (error) {
+ console.log('WebSocket Error ' + error);
+ };
+
+ mapws.onmessage = function (e){
+
+ $('#jobMapQueryResult').append(e.data+"\n");
+ };
+
+
+ var reducews = new WebSocket('ws://'+window.location.host+'/pubsub');
+ var reducetopic = "contrib.summit.mrDebugger.reduceResult";
+
+ reducews.onopen = function () {
+ var msg = JSON.stringify({ "type":"subscribe", "topic": reducetopic});
+ console.log("sending: " + msg);
+ reducews.send(msg);
+ };
+
+ reducews.onerror = function (error) {
+ console.log('WebSocket Error ' + error);
+ };
+
+ reducews.onmessage = function (e){
+
+ $('#jobReduceQueryResult').append(e.data+"\n");
+ };
+
+ });
+
+
+</script>
+
+
+<div id="query1FormDiv">
+ <form name="query1" action="">
+ <p>
+ <label for="phone" id="app_id_label">Application Id</label>
+ <input type="text" name="app_id" id="app_id" size="30" value="" class="phone-input" />
+ </p>
+ <p>
+ <label for="phone" id="job_id_label">Job Id</label>
+ <input type="text" name="job_id" id="job_id" size="30" value="" class="phone-input" />
+ </p>
+ <p>
+ <label for="phone" id="hostname_label">Hostname</label>
+ <input type="text" name="hostname" id="hostname" size="30" value="" class="phone-input" />
+ </p>
+ <p>
+ <label for="phone" id="rm_port_label">RM port</label>
+ <input type="text" name="rm_port" id="rm_port" size="30" value="" class="phone-input" />
+ </p>
+ <p>
+ <label for="phone" id="hs_port_label">History Server port</label>
+ <input type="text" name="hs_port" id="hs_port" size="30" value="" class="phone-input" />
+ </p>
+ <p>
+ <label for="phone" id="hadoop_version_label">Hadoop Version</label>
+ <input type="text" name="hadoop_version" id="hadoop_version" size="30" value="" class="phone-input" />
+ </p>
+ <p>
+ <label for="phone" id="api_version_label">API Version</label>
+ <input type="text" name="api_version" id="api_version" size="30" value="" class="phone-input" />
+ </p>
+ <p>
+ <input type="submit" name="command" class="button" id="query1AddButton" value="Add" />
+ <input type="submit" name="command" class="button" id="query1DeleteButton" value="Delete" />
+ <input type="submit" name="command" class="button" id="query1ClearButton" value="Clear" />
+ </p>
+ </form>
+ <div id="query1SubmitConfirm"></div>
+ <div>Job: <span id="jobQueryResult"></span></div>
+ <div>Map Task: <span id="jobMapQueryResult"></span></div>
+ <div>Reduce Job: <span id="jobReduceQueryResult"></span></div>
+ </div>
+
+</body>
+</html>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/test/java/org/apache/apex/examples/mrmonitor/MrMonitoringApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/test/java/org/apache/apex/examples/mrmonitor/MrMonitoringApplicationTest.java b/examples/mrmonitor/src/test/java/org/apache/apex/examples/mrmonitor/MrMonitoringApplicationTest.java
new file mode 100644
index 0000000..b094ddb
--- /dev/null
+++ b/examples/mrmonitor/src/test/java/org/apache/apex/examples/mrmonitor/MrMonitoringApplicationTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import javax.servlet.Servlet;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
+
+/**
+ * <p>MapReduceDebuggerApplicationTest class.</p>
+ *
+ * @since 0.3.4
+ */
+
+public class MrMonitoringApplicationTest
+{
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+ conf.addResource("dt-site-monitoring.xml");
+ Server server = new Server(0);
+ Servlet servlet = new SamplePubSubWebSocketServlet();
+ ServletHolder sh = new ServletHolder(servlet);
+ ServletContextHandler contextHandler = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
+ contextHandler.addServlet(sh, "/pubsub");
+ contextHandler.addServlet(sh, "/*");
+ server.start();
+ Connector[] connector = server.getConnectors();
+ conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort());
+
+ MRMonitoringApplication application = new MRMonitoringApplication();
+ LocalMode lma = LocalMode.newInstance();
+ lma.prepareDAG(application, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(10000);
+ server.stop();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/test/resources/dt-site-monitoring.xml
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/test/resources/dt-site-monitoring.xml b/examples/mrmonitor/src/test/resources/dt-site-monitoring.xml
new file mode 100644
index 0000000..88bae44
--- /dev/null
+++ b/examples/mrmonitor/src/test/resources/dt-site-monitoring.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.JobMonitor.attr.INITIAL_PARTITION_COUNT</name>
+ <value>1</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.JobMonitor.attr.APPLICATION_WINDOW_COUNT</name>
+ <value>4</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.JobMonitor.prop.maxJobs</name>
+ <value>25</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.JobMonitor.prop.maxJobs</name>
+ <value>25</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.Query.prop.topic</name>
+ <value>contrib.summit.mrDebugger.mrDebuggerQuery</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.JobOutput.prop.topic</name>
+ <value>contrib.summit.mrDebugger.jobResult</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.MapJob.prop.topic</name>
+ <value>contrib.summit.mrDebugger.mapResult</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.ReduceJob.prop.topic</name>
+ <value>contrib.summit.mrDebugger.reduceResult</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.operator.JobCounter.prop.topic</name>
+ <value>contrib.summit.mrDebugger.counterResult</value>
+ </property>
+ <property>
+ <name>dt.application.MRMonitoringExample.stream.QueryConversion.locality</name>
+ <value>CONTAINER_LOCAL</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/test/resources/log4j.properties b/examples/mrmonitor/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/mrmonitor/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/pom.xml
----------------------------------------------------------------------
diff --git a/examples/mroperator/pom.xml b/examples/mroperator/pom.xml
new file mode 100644
index 0000000..c9a7b65
--- /dev/null
+++ b/examples/mroperator/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>malhar-examples-mroperator</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar MR Operator Example</name>
+ <description></description>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.7.0-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <skipTests>true</skipTests>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/assemble/appPackage.xml b/examples/mroperator/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/mroperator/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/DateWritable.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/DateWritable.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/DateWritable.java
new file mode 100644
index 0000000..4cf1f5a
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/DateWritable.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * <p>DateWritable class.</p>
+ *
+ * @since 0.9.0
+ */
+public class DateWritable implements WritableComparable<DateWritable>
+{
+ private static final SimpleDateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd' T 'HH:mm:ss.SSS" );
+ private Date date;
+
+ public Date getDate()
+ {
+ return date;
+ }
+
+ public void setDate( Date date )
+ {
+ this.date = date;
+ }
+
+ public void readFields( DataInput in ) throws IOException
+ {
+ date = new Date( in.readLong() );
+ }
+
+ public void write( DataOutput out ) throws IOException
+ {
+ out.writeLong( date.getTime() );
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return toString().equals(o.toString());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ public String toString()
+ {
+ return formatter.format( date);
+ }
+
+ public int compareTo( DateWritable other )
+ {
+ return date.compareTo( other.getDate() );
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/HdfsKeyValOutputOperator.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/HdfsKeyValOutputOperator.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/HdfsKeyValOutputOperator.java
new file mode 100644
index 0000000..94e17c1
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/HdfsKeyValOutputOperator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator;
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/**
+ * Adapter for writing KeyHashValPair objects to HDFS
+ * <p>
+ * Serializes tuples into a HDFS file.<br/>
+ * </p>
+ *
+ * @param <K> Key type
+ * @param <V> Value type
+ * @since 0.9.4
+ */
+public class HdfsKeyValOutputOperator<K, V> extends AbstractSingleFileOutputOperator<KeyHashValPair<K, V>>
+{
+ @Override
+ public byte[] getBytesForTuple(KeyHashValPair<K,V> t)
+ {
+ return (t.toString() + "\n").getBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/InvertedIndexApplication.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/InvertedIndexApplication.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/InvertedIndexApplication.java
new file mode 100644
index 0000000..af97bc4
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/InvertedIndexApplication.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+/**
+ * <p>InvertedIndexApplication class.</p>
+ *
+ * @since 0.9.0
+ */
+@ApplicationAnnotation(name = "InvertedIndexExample")
+public class InvertedIndexApplication extends MapReduceApplication<LongWritable, Text, Text, Text>
+{
+
+ InvertedIndexApplication()
+ {
+ setMapClass(LineIndexer.LineIndexMapper.class);
+ setReduceClass(LineIndexer.LineIndexReducer.class);
+
+ setInputFormat(TextInputFormat.class);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LineIndexer.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LineIndexer.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LineIndexer.java
new file mode 100644
index 0000000..a2c589d
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LineIndexer.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * <p>LineIndexer class.</p>
+ *
+ * @since 0.9.0
+ */
+public class LineIndexer
+{
+
+ public static class LineIndexMapper extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, Text>
+ {
+ private static final Text word = new Text();
+ private static final Text location = new Text();
+
+ public void map(LongWritable key, Text val,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException
+ {
+ FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
+ String fileName = fileSplit.getPath().getName();
+ location.set(fileName);
+
+ String line = val.toString();
+ StringTokenizer itr = new StringTokenizer(line.toLowerCase());
+ while (itr.hasMoreTokens()) {
+ word.set(itr.nextToken());
+ output.collect(word, location);
+ }
+ }
+ }
+
+
+
+ public static class LineIndexReducer extends MapReduceBase
+ implements Reducer<Text, Text, Text, Text>
+ {
+ public void reduce(Text key, Iterator<Text> values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException
+ {
+ boolean first = true;
+ StringBuilder toReturn = new StringBuilder();
+ while (values.hasNext()) {
+ if (!first) {
+ toReturn.append(", ");
+ }
+ first = false;
+ toReturn.append(values.next().toString());
+ }
+
+ output.collect(key, new Text(toReturn.toString()));
+ }
+ }
+
+
+ /**
+ * The actual main() method for our program; this is the
+ * "driver" for the MapReduce job.
+ */
+ public static void main(String[] args)
+ {
+ JobClient client = new JobClient();
+ JobConf conf = new JobConf(LineIndexer.class);
+
+ conf.setJobName("LineIndexer");
+
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(Text.class);
+
+ FileInputFormat.addInputPath(conf, new Path("input"));
+ FileOutputFormat.setOutputPath(conf, new Path("output"));
+
+ conf.setMapperClass(LineIndexMapper.class);
+ conf.setReducerClass(LineIndexReducer.class);
+
+ client.setConf(conf);
+
+ try {
+ JobClient.runJob(conf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogCountsPerHour.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogCountsPerHour.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogCountsPerHour.java
new file mode 100644
index 0000000..6255810
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogCountsPerHour.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * <p>LogCountsPerHour class.</p>
+ *
+ * @since 0.9.0
+ */
+public class LogCountsPerHour extends Configured implements Tool
+{
+
+ public static class LogMapClass extends MapReduceBase
+ implements Mapper<LongWritable, Text, DateWritable, IntWritable>
+ {
+ private DateWritable date = new DateWritable();
+ private static final IntWritable one = new IntWritable(1);
+
+ public void map(LongWritable key, Text value, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException
+ {
+ // Get the value as a String; it is of the format:
+ // 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
+ String text = value.toString();
+
+ // Get the date and time
+ int openBracket = text.indexOf('[');
+ int closeBracket = text.indexOf(']');
+ if (openBracket != -1 && closeBracket != -1) {
+ // Read the date
+ String dateString = text.substring(text.indexOf('[') + 1, text.indexOf(']'));
+
+ // Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500
+ int index = 0;
+ int nextIndex = dateString.indexOf('/');
+ int day = Integer.parseInt(dateString.substring(index, nextIndex));
+
+ index = nextIndex;
+ nextIndex = dateString.indexOf('/', index + 1);
+ String month = dateString.substring(index + 1, nextIndex);
+
+ index = nextIndex;
+ nextIndex = dateString.indexOf(':', index);
+ int year = Integer.parseInt(dateString.substring(index + 1, nextIndex));
+
+ index = nextIndex;
+ nextIndex = dateString.indexOf(':', index + 1);
+ int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex));
+
+ // Build a calendar object for this date
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(Calendar.DATE, day);
+ calendar.set(Calendar.YEAR, year);
+ calendar.set(Calendar.HOUR, hour);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+
+ if (month.equalsIgnoreCase("dec")) {
+ calendar.set(Calendar.MONTH, Calendar.DECEMBER);
+ } else if (month.equalsIgnoreCase("nov")) {
+ calendar.set(Calendar.MONTH, Calendar.NOVEMBER);
+ } else if (month.equalsIgnoreCase("oct")) {
+ calendar.set(Calendar.MONTH, Calendar.OCTOBER);
+ } else if (month.equalsIgnoreCase("sep")) {
+ calendar.set(Calendar.MONTH, Calendar.SEPTEMBER);
+ } else if (month.equalsIgnoreCase("aug")) {
+ calendar.set(Calendar.MONTH, Calendar.AUGUST);
+ } else if (month.equalsIgnoreCase("jul")) {
+ calendar.set(Calendar.MONTH, Calendar.JULY);
+ } else if (month.equalsIgnoreCase("jun")) {
+ calendar.set(Calendar.MONTH, Calendar.JUNE);
+ } else if (month.equalsIgnoreCase("may")) {
+ calendar.set(Calendar.MONTH, Calendar.MAY);
+ } else if (month.equalsIgnoreCase("apr")) {
+ calendar.set(Calendar.MONTH, Calendar.APRIL);
+ } else if (month.equalsIgnoreCase("mar")) {
+ calendar.set(Calendar.MONTH, Calendar.MARCH);
+ } else if (month.equalsIgnoreCase("feb")) {
+ calendar.set(Calendar.MONTH, Calendar.FEBRUARY);
+ } else if (month.equalsIgnoreCase("jan")) {
+ calendar.set(Calendar.MONTH, Calendar.JANUARY);
+ }
+
+
+ // Output the date as the key and 1 as the value
+ date.setDate(calendar.getTime());
+ output.collect(date, one);
+ }
+ }
+ }
+
+ public static class LogReduce extends MapReduceBase
+ implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable>
+ {
+ public void reduce(DateWritable key, Iterator<IntWritable> values, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException
+ {
+ // Iterate over all of the values (counts of occurrences of this word)
+ int count = 0;
+ while (values.hasNext()) {
+ // Add the value to our count
+ count += values.next().get();
+ }
+
+ // Output the word with its count (wrapped in an IntWritable)
+ output.collect(key, new IntWritable(count));
+ }
+ }
+
+
+ public int run(String[] args) throws Exception
+ {
+ // Create a configuration
+ Configuration conf = getConf();
+
+ // Create a job from the default configuration that will use the WordCount class
+ JobConf job = new JobConf(conf, LogCountsPerHour.class);
+
+ // Define our input path as the first command line argument and our output path as the second
+ Path in = new Path(args[0]);
+ Path out = new Path(args[1]);
+
+ // Create File Input/Output formats for these paths (in the job)
+ FileInputFormat.setInputPaths(job, in);
+ FileOutputFormat.setOutputPath(job, out);
+
+ // Configure the job: name, mapper, reducer, and combiner
+ job.setJobName("LogAveragePerHour");
+ job.setMapperClass(LogMapClass.class);
+ job.setReducerClass(LogReduce.class);
+ job.setCombinerClass(LogReduce.class);
+
+ // Configure the output
+ job.setOutputFormat(TextOutputFormat.class);
+ job.setOutputKeyClass(DateWritable.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ // Run the job
+ JobClient.runJob(job);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ // Start the LogCountsPerHour MapReduce application
+ int res = ToolRunner.run(new Configuration(), new LogCountsPerHour(), args);
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogsCountApplication.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogsCountApplication.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogsCountApplication.java
new file mode 100644
index 0000000..51b082d
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogsCountApplication.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+/**
+ * <p>LogsCountApplication class.</p>
+ *
+ * @since 0.9.0
+ */
+@ApplicationAnnotation(name = "LogsCountExample")
+public class LogsCountApplication extends MapReduceApplication<LongWritable, Text, DateWritable, IntWritable>
+{
+
+ public void LogsCountApplication()
+ {
+ setMapClass(LogCountsPerHour.LogMapClass.class);
+ // setCombineClass(LogCountsPerHour.LogReduce.class);
+ setReduceClass(LogCountsPerHour.LogReduce.class);
+ setInputFormat(TextInputFormat.class);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapOperator.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapOperator.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapOperator.java
new file mode 100644
index 0000000..ce00f54
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapOperator.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.examples.mroperator.ReporterImpl.ReporterType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Partitioner;
+
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/**
+ * <p>
+ * MapOperator class.
+ * </p>
+ *
+ * @since 0.9.0
+ */
+@SuppressWarnings({ "unchecked"})
+public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<MapOperator<K1, V1, K2, V2>>
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(MapOperator.class);
+ private String dirName;
+ private boolean emitPartitioningCountOnce = false;
+ private boolean emitLastCountOnce = false;
+ private int operatorId;
+ private Class<? extends InputFormat<K1, V1>> inputFormatClass;
+ private transient InputFormat<K1, V1> inputFormat;
+ private transient InputSplit inputSplit;
+ private Class<? extends InputSplit> inputSplitClass;
+ private ByteArrayOutputStream outstream = new ByteArrayOutputStream();
+ private transient RecordReader<K1, V1> reader;
+ private boolean emittedAll = false;
+ public final transient DefaultOutputPort<KeyHashValPair<Integer, Integer>> outputCount = new DefaultOutputPort<KeyHashValPair<Integer, Integer>>();
+ public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>();
+ private transient JobConf jobConf;
+ @Min(1)
+ private int partitionCount = 1;
+
+ public Class<? extends InputSplit> getInputSplitClass()
+ {
+ return inputSplitClass;
+ }
+
+ public void setInputSplitClass(Class<? extends InputSplit> inputSplitClass)
+ {
+ this.inputSplitClass = inputSplitClass;
+ }
+
+ public Class<? extends InputFormat<K1, V1>> getInputFormatClass()
+ {
+ return inputFormatClass;
+ }
+
+ public void setInputFormatClass(Class<? extends InputFormat<K1, V1>> inputFormatClass)
+ {
+ this.inputFormatClass = inputFormatClass;
+ }
+
+ public String getDirName()
+ {
+ return dirName;
+ }
+
+ public void setDirName(String dirName)
+ {
+ this.dirName = dirName;
+ }
+
+ public int getPartitionCount()
+ {
+ return partitionCount;
+ }
+
+ public void setPartitionCount(int partitionCount)
+ {
+ this.partitionCount = partitionCount;
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ if (!emitPartitioningCountOnce) {
+ outputCount.emit(new KeyHashValPair<Integer, Integer>(operatorId, 1));
+ emitPartitioningCountOnce = true;
+ }
+ if (reader == null) {
+ try {
+ reader = inputFormat.getRecordReader(inputSplit, new JobConf(new Configuration()), reporter);
+ } catch (IOException e) {
+ logger.info("error getting record reader {}", e.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ if (context != null) {
+ operatorId = context.getId();
+ }
+ reporter = new ReporterImpl(ReporterType.Mapper, new Counters());
+ outputCollector = new OutputCollectorImpl<K2, V2>();
+ Configuration conf = new Configuration();
+ try {
+ inputFormat = inputFormatClass.newInstance();
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+ Deserializer keyDesiralizer = serializationFactory.getDeserializer(inputSplitClass);
+ keyDesiralizer.open(new ByteArrayInputStream(outstream.toByteArray()));
+ inputSplit = (InputSplit)keyDesiralizer.deserialize(null);
+ ((ReporterImpl)reporter).setInputSplit(inputSplit);
+ reader = inputFormat.getRecordReader(inputSplit, new JobConf(conf), reporter);
+ } catch (Exception e) {
+ logger.info("failed to initialize inputformat obj {}", inputFormat);
+ throw new RuntimeException(e);
+ }
+ InputStream stream = null;
+ if (configFile != null && configFile.length() > 0) {
+ stream = ClassLoader.getSystemResourceAsStream("/" + configFile);
+ if (stream == null) {
+ stream = ClassLoader.getSystemResourceAsStream(configFile);
+ }
+ }
+ if (stream != null) {
+ conf.addResource(stream);
+ }
+ jobConf = new JobConf(conf);
+ if (mapClass != null) {
+ try {
+ mapObject = mapClass.newInstance();
+ } catch (Exception e) {
+ logger.info("can't instantiate object {}", e.getMessage());
+ }
+
+ mapObject.configure(jobConf);
+ }
+ if (combineClass != null) {
+ try {
+ combineObject = combineClass.newInstance();
+ } catch (Exception e) {
+ logger.info("can't instantiate object {}", e.getMessage());
+ }
+ combineObject.configure(jobConf);
+ }
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (!emittedAll) {
+ try {
+ K1 key = reader.createKey();
+ V1 val = reader.createValue();
+ emittedAll = !reader.next(key, val);
+ if (!emittedAll) {
+ KeyHashValPair<K1, V1> keyValue = new KeyHashValPair<K1, V1>(key, val);
+ mapObject.map(keyValue.getKey(), keyValue.getValue(), outputCollector, reporter);
+ if (combineObject == null) {
+ List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList();
+ for (KeyHashValPair<K2, V2> e : list) {
+ output.emit(e);
+ }
+ list.clear();
+ }
+ }
+ } catch (IOException ex) {
+ logger.debug(ex.toString());
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList();
+ if (combineObject != null) {
+ Map<K2, List<V2>> cacheObject = new HashMap<K2, List<V2>>();
+ for (KeyHashValPair<K2, V2> tuple : list) {
+ List<V2> cacheList = cacheObject.get(tuple.getKey());
+ if (cacheList == null) {
+ cacheList = new ArrayList<V2>();
+ cacheList.add(tuple.getValue());
+ cacheObject.put(tuple.getKey(), cacheList);
+ } else {
+ cacheList.add(tuple.getValue());
+ }
+ }
+ list.clear();
+ OutputCollector<K2, V2> tempOutputCollector = new OutputCollectorImpl<K2, V2>();
+ for (Map.Entry<K2, List<V2>> e : cacheObject.entrySet()) {
+ try {
+ combineObject.reduce(e.getKey(), e.getValue().iterator(), tempOutputCollector, reporter);
+ } catch (IOException e1) {
+ logger.info(e1.getMessage());
+ }
+ }
+ list = ((OutputCollectorImpl<K2, V2>)tempOutputCollector).getList();
+ for (KeyHashValPair<K2, V2> e : list) {
+ output.emit(e);
+ }
+ }
+ if (!emitLastCountOnce && emittedAll) {
+ outputCount.emit(new KeyHashValPair<Integer, Integer>(operatorId, -1));
+ logger.info("emitting end of file {}", new KeyHashValPair<Integer, Integer>(operatorId, -1));
+ emitLastCountOnce = true;
+ }
+ list.clear();
+ }
+
+ private InputSplit[] getSplits(JobConf conf, int numSplits, String path) throws Exception
+ {
+ FileInputFormat.setInputPaths(conf, new Path(path));
+ if (inputFormat == null) {
+ inputFormat = inputFormatClass.newInstance();
+ String inputFormatClassName = inputFormatClass.getName();
+ if (inputFormatClassName.equals("org.apache.hadoop.mapred.TextInputFormat")) {
+ ((TextInputFormat)inputFormat).configure(conf);
+ } else if (inputFormatClassName.equals("org.apache.hadoop.mapred.KeyValueTextInputFormat")) {
+ ((KeyValueTextInputFormat)inputFormat).configure(conf);
+ }
+ }
+ return inputFormat.getSplits(conf, numSplits);
+ // return null;
+ }
+
+ @Override
+ public void partitioned(Map<Integer, Partition<MapOperator<K1, V1, K2, V2>>> partitions)
+ {
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Collection<Partition<MapOperator<K1, V1, K2, V2>>> definePartitions(Collection<Partition<MapOperator<K1, V1, K2, V2>>> partitions, PartitioningContext context)
+ {
+ int tempPartitionCount = partitionCount;
+
+ Collection c = partitions;
+ Collection<Partition<MapOperator<K1, V1, K2, V2>>> operatorPartitions = c;
+ Partition<MapOperator<K1, V1, K2, V2>> template;
+ Iterator<Partition<MapOperator<K1, V1, K2, V2>>> itr = operatorPartitions.iterator();
+ template = itr.next();
+ Configuration conf = new Configuration();
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+ if (outstream.size() == 0) {
+ InputSplit[] splits;
+ try {
+ splits = getSplits(new JobConf(conf), tempPartitionCount, template.getPartitionedInstance().getDirName());
+ } catch (Exception e1) {
+ logger.info(" can't get splits {}", e1.getMessage());
+ throw new RuntimeException(e1);
+ }
+ Collection<Partition<MapOperator<K1, V1, K2, V2>>> operList = new ArrayList<Partition<MapOperator<K1, V1, K2, V2>>>();
+ itr = operatorPartitions.iterator();
+ int size = splits.length;
+ Serializer keySerializer = serializationFactory.getSerializer(splits[0].getClass());
+ while (size > 0 && itr.hasNext()) {
+ Partition<MapOperator<K1, V1, K2, V2>> p = itr.next();
+ MapOperator<K1, V1, K2, V2> opr = p.getPartitionedInstance();
+ opr.setInputFormatClass(inputFormatClass);
+ opr.setMapClass(mapClass);
+ opr.setCombineClass(combineClass);
+ opr.setConfigFile(configFile);
+ try {
+ keySerializer.open(opr.getOutstream());
+ keySerializer.serialize(splits[size - 1]);
+ opr.setInputSplitClass(splits[size - 1].getClass());
+ } catch (IOException e) {
+ logger.info("error while serializing {}", e.getMessage());
+ }
+ size--;
+ operList.add(p);
+ }
+ while (size > 0) {
+ MapOperator<K1, V1, K2, V2> opr = new MapOperator<K1, V1, K2, V2>();
+ opr.setInputFormatClass(inputFormatClass);
+ opr.setMapClass(mapClass);
+ opr.setCombineClass(combineClass);
+ opr.setConfigFile(configFile);
+ try {
+ keySerializer.open(opr.getOutstream());
+ keySerializer.serialize(splits[size - 1]);
+ opr.setInputSplitClass(splits[size - 1].getClass());
+ } catch (IOException e) {
+ logger.info("error while serializing {}", e.getMessage());
+ }
+ size--;
+ operList.add(new DefaultPartition<MapOperator<K1, V1, K2, V2>>(opr));
+ }
+ try {
+ keySerializer.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return operList;
+ }
+ return null;
+ }
+
+ public ByteArrayOutputStream getOutstream()
+ {
+ return outstream;
+ }
+
+ public void setOutstream(ByteArrayOutputStream outstream)
+ {
+ this.outstream = outstream;
+ }
+
+ /**
+ * adding map code
+ */
+
+ private Class<? extends Mapper<K1, V1, K2, V2>> mapClass;
+ private Class<? extends Reducer<K2, V2, K2, V2>> combineClass;
+
+ private transient Mapper<K1, V1, K2, V2> mapObject;
+ private transient Reducer<K2, V2, K2, V2> combineObject;
+ private transient Reporter reporter;
+
+ private String configFile;
+
+ public String getConfigFile()
+ {
+ return configFile;
+ }
+
+ public void setConfigFile(String configFile)
+ {
+ this.configFile = configFile;
+ }
+
+ private transient OutputCollector<K2, V2> outputCollector;
+
+ public Class<? extends Mapper<K1, V1, K2, V2>> getMapClass()
+ {
+ return mapClass;
+ }
+
+ public void setMapClass(Class<? extends Mapper<K1, V1, K2, V2>> mapClass)
+ {
+ this.mapClass = mapClass;
+ }
+
+ public Class<? extends Reducer<K2, V2, K2, V2>> getCombineClass()
+ {
+ return combineClass;
+ }
+
+ public void setCombineClass(Class<? extends Reducer<K2, V2, K2, V2>> combineClass)
+ {
+ this.combineClass = combineClass;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapReduceApplication.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapReduceApplication.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapReduceApplication.java
new file mode 100644
index 0000000..98f4dc7
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapReduceApplication.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+
+/**
+ * <p>
+ * Abstract MapReduceApplication class.
+ * </p>
+ *
+ * @since 0.9.0
+ */
+@ApplicationAnnotation(name = "MapReduceExample")
+public abstract class MapReduceApplication<K1, V1, K2, V2> implements StreamingApplication
+{
+ Class<? extends InputFormat<K1, V1>> inputFormat;
+ Class<? extends Mapper<K1, V1, K2, V2>> mapClass;
+ Class<? extends Reducer<K2, V2, K2, V2>> reduceClass;
+ Class<? extends Reducer<K2, V2, K2, V2>> combineClass;
+
+ public Class<? extends Reducer<K2, V2, K2, V2>> getCombineClass()
+ {
+ return combineClass;
+ }
+
+ public void setCombineClass(Class<? extends Reducer<K2, V2, K2, V2>> combineClass)
+ {
+ this.combineClass = combineClass;
+ }
+
+ public void setInputFormat(Class<? extends InputFormat<K1, V1>> inputFormat)
+ {
+ this.inputFormat = inputFormat;
+ }
+
+ public Class<? extends Mapper<K1, V1, K2, V2>> getMapClass()
+ {
+ return mapClass;
+ }
+
+ public void setMapClass(Class<? extends Mapper<K1, V1, K2, V2>> mapClass)
+ {
+ this.mapClass = mapClass;
+ }
+
+ public Class<? extends Reducer<K2, V2, K2, V2>> getReduceClass()
+ {
+ return reduceClass;
+ }
+
+ public void setReduceClass(Class<? extends Reducer<K2, V2, K2, V2>> reduceClass)
+ {
+ this.reduceClass = reduceClass;
+ }
+
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ String configurationFilePath = conf.get(this.getClass().getSimpleName() + ".configFile", "");
+
+ MapOperator<K1, V1, K2, V2> inputOperator = dag.addOperator("Mapper", new MapOperator<K1, V1, K2, V2>());
+ inputOperator.setInputFormatClass(inputFormat);
+
+ String configFileName = null;
+ if (configurationFilePath != null && !configurationFilePath.isEmpty()) {
+ StringTokenizer configFileTokenizer = new StringTokenizer(configurationFilePath, "/");
+ configFileName = configFileTokenizer.nextToken();
+ while (configFileTokenizer.hasMoreTokens()) {
+ configFileName = configFileTokenizer.nextToken();
+ }
+ }
+
+ inputOperator.setMapClass(mapClass);
+ inputOperator.setConfigFile(configFileName);
+ inputOperator.setCombineClass(combineClass);
+
+ ReduceOperator<K2, V2, K2, V2> reduceOpr = dag.addOperator("Reducer", new ReduceOperator<K2, V2, K2, V2>());
+ reduceOpr.setReduceClass(reduceClass);
+ reduceOpr.setConfigFile(configFileName);
+
+ HdfsKeyValOutputOperator<K2, V2> console = dag.addOperator("Console", new HdfsKeyValOutputOperator<K2, V2>());
+
+ dag.addStream("Mapped-Output", inputOperator.output, reduceOpr.input);
+ dag.addStream("Mapper-Count", inputOperator.outputCount, reduceOpr.inputCount);
+ dag.addStream("Reduced-Output", reduceOpr.output, console.input);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/NewWordCountApplication.java
----------------------------------------------------------------------
diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/NewWordCountApplication.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/NewWordCountApplication.java
new file mode 100644
index 0000000..db32b4a
--- /dev/null
+++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/NewWordCountApplication.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mroperator;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+/**
+ * <p>NewWordCountApplication class.</p>
+ *
+ * @since 0.9.0
+ */
+@ApplicationAnnotation(name = "WordCountExample")
+public class NewWordCountApplication extends MapReduceApplication<LongWritable, Text, Text, IntWritable>
+{
+
+ public void NewWordCountApplication()
+ {
+ setMapClass(WordCount.Map.class);
+ setReduceClass(WordCount.Reduce.class);
+ setCombineClass(WordCount.Reduce.class);
+ setInputFormat(TextInputFormat.class);
+ }
+}