You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:12 UTC
[07/30] apex-malhar git commit: Renamed demos to examples. Packages
and artifactid names are changed as suggested.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/mobile/src/assemble/appPackage.xml b/examples/mobile/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/mobile/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
----------------------------------------------------------------------
diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
new file mode 100644
index 0000000..f719643
--- /dev/null
+++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mobile;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.lang3.Range;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.counters.BasicCounters;
+import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
+import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+/**
+ * Mobile Example Application:
+ * <p>
+ * This example simulates large number of cell phones in the range of 40K to 200K
+ * and tracks a given cell number across cell towers. It also displays the changing locations of the cell number on a google map.
+ *
+ * This example demonstrates the scalability feature of the Apex platform.
+ * It showcases the ability of the platform to scale up and down as the phone numbers generated increase and decrease respectively.
+ * If the tuples processed per second by the pmove operator increase beyond 30,000, more partitions of the pmove operator gets deployed until
+ * each of the partition processes around 10000 to 30000 tuples per second.
+ * If the tuples processed per second drops below 10,000, the platform merges the operators until the partition count drops down to the original.
+ * The load can be varied using the tuplesBlast property.
+ * If the tuplesBlast is set to 200, 40K cell phones are generated.
+ * If the tuplesBlast is set to 1000, 200K cell phones are generated.
+ * The tuplesBlast property can be set using dtcli command: 'set-operator-property pmove tuplesBlast 1000'.
+ *
+ *
+ * The specs are as such<br>
+ * Depending on the tuplesBlast property, large number of cell phone numbers are generated.
+ * They jump a cell tower frequently. Sometimes
+ * within a second sometimes in 10 seconds. The aim is to demonstrate the
+ * following abilities<br>
+ * <ul>
+ * <li>Entering query dynamically: The phone numbers are added to locate its gps
+ * in run time.</li>
+ * <li>Changing functionality dynamically: The load is changed by making
+ * functional changes on the load generator operator (phonegen)(</li>
+ * <li>Auto Scale up/Down with load: Operator pmove increases and decreases
+ * partitions as per load</li>
+ * <li></li>
+ * </ul>
+ *
+ * Refer to examples/docs/MobileDemo.md for more information.
+ *
+ * <p>
+ *
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see following output on
+ * console: <br>
+ *
+ * <pre>
+ * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
+ * phoneLocationQueryResult: {phone=5554995, location=(10,4), queryId=q1}
+ * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
+ * phoneLocationQueryResult: {phone=5554995, location=(10,4), queryId=q1}
+ * phoneLocationQueryResult: {phone=5554995, location=(10,5), queryId=q1}
+ * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
+ * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1}
+ * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
+ * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
+ * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1}
+ * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1}
+ * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3}
+ * </pre>
+ *
+ * * <b>Application DAG : </b><br>
+ * <img src="doc-files/mobile.png" width=600px > <br>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = "MobileExample")
+public class Application implements StreamingApplication
+{
+ public static final String PHONE_RANGE_PROP = "dt.application.MobileExample.phoneRange";
+ public static final String TOTAL_SEED_NOS = "dt.application.MobileExample.totalSeedNumbers";
+ public static final String COOL_DOWN_MILLIS = "dt.application.MobileExample.coolDownMillis";
+ public static final String MAX_THROUGHPUT = "dt.application.MobileExample.maxThroughput";
+ public static final String MIN_THROUGHPUT = "dt.application.MobileExample.minThroughput";
+ private static final Logger LOG = LoggerFactory.getLogger(Application.class);
+ private Range<Integer> phoneRange = Range.between(5550000, 5559999);
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ String lPhoneRange = conf.get(PHONE_RANGE_PROP, null);
+ if (lPhoneRange != null) {
+ String[] tokens = lPhoneRange.split("-");
+ if (tokens.length != 2) {
+ throw new IllegalArgumentException("Invalid range: " + lPhoneRange);
+ }
+ this.phoneRange = Range.between(Integer.parseInt(tokens[0]), Integer.parseInt(tokens[1]));
+ }
+ LOG.debug("Phone range {}", this.phoneRange);
+
+ RandomEventGenerator phones = dag.addOperator("Receiver", RandomEventGenerator.class);
+ phones.setMinvalue(this.phoneRange.getMinimum());
+ phones.setMaxvalue(this.phoneRange.getMaximum());
+
+ PhoneMovementGenerator movementGen = dag.addOperator("LocationFinder", PhoneMovementGenerator.class);
+ dag.setAttribute(movementGen, OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
+
+ StatelessThroughputBasedPartitioner<PhoneMovementGenerator> partitioner = new StatelessThroughputBasedPartitioner<PhoneMovementGenerator>();
+ partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 45000));
+ partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));
+ partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));
+ dag.setAttribute(movementGen, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));
+ dag.setAttribute(movementGen, OperatorContext.PARTITIONER, partitioner);
+
+ // generate seed numbers
+ Random random = new Random();
+ int maxPhone = phoneRange.getMaximum() - phoneRange.getMinimum();
+ int phonesToDisplay = conf.getInt(TOTAL_SEED_NOS, 10);
+ for (int i = phonesToDisplay; i-- > 0; ) {
+ int phoneNo = phoneRange.getMinimum() + random.nextInt(maxPhone + 1);
+ LOG.info("seed no: " + phoneNo);
+ movementGen.phoneRegister.add(phoneNo);
+ }
+ // done generating data
+ LOG.info("Finished generating seed data.");
+
+ String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+ URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+ PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator("LocationResults", new PubSubWebSocketOutputOperator<Object>());
+ wsOut.setUri(uri);
+ PubSubWebSocketInputOperator<Map<String, String>> wsIn = dag.addOperator("QueryLocation", new PubSubWebSocketInputOperator<Map<String, String>>());
+ wsIn.setUri(uri);
+ // default partitioning: first connected stream to movementGen will be partitioned
+ dag.addStream("Phone-Data", phones.integer_data, movementGen.data);
+ dag.addStream("Results", movementGen.locationQueryResult, wsOut.input);
+ dag.addStream("Query", wsIn.outputPort, movementGen.phoneQuery);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneEntryOperator.java
----------------------------------------------------------------------
diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneEntryOperator.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneEntryOperator.java
new file mode 100644
index 0000000..f8de357
--- /dev/null
+++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneEntryOperator.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mobile;
+
+import java.util.Map;
+import java.util.Random;
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Generates mobile numbers that will be displayed in mobile example just after launch.<br></br>
+ * Operator attributes:<b>
+ * <ul>
+ * <li>initialDisplayCount: No. of seed phone numbers that will be generated.</li>
+ * <li>maxSeedPhoneNumber: The largest seed phone number.</li>
+ * </ul>
+ * </b>
+ *
+ * @since 0.3.5
+ */
+public class PhoneEntryOperator extends BaseOperator
+{
+ private static Logger LOG = LoggerFactory.getLogger(PhoneEntryOperator.class);
+
+ private boolean seedGenerationDone = false;
+
+ @Min(0)
+ private int initialDisplayCount = 0;
+
+ private int maxSeedPhoneNumber = 0;
+ private int rangeLowerEndpoint;
+ private int rangeUpperEndpoint;
+
+ /**
+ * Sets the initial number of phones to display on the google map.
+ *
+ * @param i the count of initial phone numbers to display
+ */
+ public void setInitialDisplayCount(int i)
+ {
+ initialDisplayCount = i;
+ }
+
+ /**
+ * Sets the range for the phone numbers generated by the operator.
+ *
+ * @param i the range within which the phone numbers are randomly generated.
+ */
+ public void setPhoneRange(Range<Integer> phoneRange)
+ {
+ this.rangeLowerEndpoint = phoneRange.lowerEndpoint();
+ this.rangeUpperEndpoint = phoneRange.upperEndpoint();
+ }
+
+ /**
+ * Sets the max seed for random phone number generation
+ *
+ * @param i the number to initialize the random number phone generator.
+ */
+ public void setMaxSeedPhoneNumber(int number)
+ {
+ this.maxSeedPhoneNumber = number;
+ }
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<Map<String, String>> locationQuery = new DefaultInputPort<Map<String, String>>()
+ {
+ @Override
+ public void process(Map<String, String> tuple)
+ {
+ seedPhones.emit(tuple);
+ }
+ };
+
+ public final transient DefaultOutputPort<Map<String, String>> seedPhones = new DefaultOutputPort<Map<String, String>>();
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ if (!seedGenerationDone) {
+ Random random = new Random();
+ int maxPhone = (maxSeedPhoneNumber <= rangeUpperEndpoint && maxSeedPhoneNumber >= rangeLowerEndpoint) ? maxSeedPhoneNumber : rangeUpperEndpoint;
+ maxPhone -= 5550000;
+ int phonesToDisplay = initialDisplayCount > maxPhone ? maxPhone : initialDisplayCount;
+ for (int i = phonesToDisplay; i-- > 0; ) {
+ int phoneNo = 5550000 + random.nextInt(maxPhone + 1);
+ LOG.info("seed no: " + phoneNo);
+ Map<String, String> valueMap = Maps.newHashMap();
+ valueMap.put(PhoneMovementGenerator.KEY_COMMAND, PhoneMovementGenerator.COMMAND_ADD);
+ valueMap.put(PhoneMovementGenerator.KEY_PHONE, Integer.toString(phoneNo));
+ seedPhones.emit(valueMap);
+ }
+ // done generating data
+ seedGenerationDone = true;
+ LOG.info("Finished generating seed data.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneMovementGenerator.java
----------------------------------------------------------------------
diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneMovementGenerator.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneMovementGenerator.java
new file mode 100644
index 0000000..3a293f0
--- /dev/null
+++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneMovementGenerator.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mobile;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.counters.BasicCounters;
+import com.datatorrent.lib.util.HighLow;
+
+/**
+ * <p>
+ * This operator generates the GPS locations for the phone numbers specified.
+ * The range of phone numbers or a specific phone number can be set for which the GPS locations will be generated.
+ * It supports querying the locations of a given phone number.
+ * This is a partionable operator that can partition as the tuplesBlast increases.
+ * </p>
+ *
+ * @since 0.3.2
+ */
+public class PhoneMovementGenerator extends BaseOperator
+{
+ public final transient DefaultInputPort<Integer> data = new DefaultInputPort<Integer>()
+ {
+ @Override
+ public void process(Integer tuple)
+ {
+ HighLow<Integer> loc = gps.get(tuple);
+ if (loc == null) {
+ loc = new HighLow<Integer>(random.nextInt(range), random.nextInt(range));
+ gps.put(tuple, loc);
+ }
+ int xloc = loc.getHigh();
+ int yloc = loc.getLow();
+ int state = rotate % 4;
+
+ // Compute new location
+ int delta = random.nextInt(100);
+ if (delta >= threshold) {
+ if (state < 2) {
+ xloc++;
+ } else {
+ xloc--;
+ }
+ if (xloc < 0) {
+ xloc += range;
+ }
+ }
+ delta = random.nextInt(100);
+ if (delta >= threshold) {
+ if ((state == 1) || (state == 3)) {
+ yloc++;
+ } else {
+ yloc--;
+ }
+ if (yloc < 0) {
+ yloc += range;
+ }
+ }
+ xloc %= range;
+ yloc %= range;
+
+ // Set new location
+ HighLow<Integer> nloc = newgps.get(tuple);
+ if (nloc == null) {
+ newgps.put(tuple, new HighLow<Integer>(xloc, yloc));
+ } else {
+ nloc.setHigh(xloc);
+ nloc.setLow(yloc);
+ }
+ rotate++;
+ }
+ };
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<Map<String,String>> phoneQuery = new DefaultInputPort<Map<String,String>>()
+ {
+ @Override
+ public void process(Map<String,String> tuple)
+ {
+ LOG.info("new query {}", tuple);
+ String command = tuple.get(KEY_COMMAND);
+ if (command != null) {
+ if (command.equals(COMMAND_ADD)) {
+ commandCounters.getCounter(CommandCounters.ADD).increment();
+ String phoneStr = tuple.get(KEY_PHONE);
+ registerPhone(phoneStr);
+ } else if (command.equals(COMMAND_ADD_RANGE)) {
+ commandCounters.getCounter(CommandCounters.ADD_RANGE).increment();
+ registerPhoneRange(tuple.get(KEY_START_PHONE), tuple.get(KEY_END_PHONE));
+ } else if (command.equals(COMMAND_DELETE)) {
+ commandCounters.getCounter(CommandCounters.DELETE).increment();
+ String phoneStr = tuple.get(KEY_PHONE);
+ deregisterPhone(phoneStr);
+ } else if (command.equals(COMMAND_CLEAR)) {
+ commandCounters.getCounter(CommandCounters.CLEAR).increment();
+ clearPhones();
+ }
+ }
+ }
+ };
+
+ public static final String KEY_COMMAND = "command";
+ public static final String KEY_PHONE = "phone";
+ public static final String KEY_LOCATION = "location";
+ public static final String KEY_REMOVED = "removed";
+ public static final String KEY_START_PHONE = "startPhone";
+ public static final String KEY_END_PHONE = "endPhone";
+
+ public static final String COMMAND_ADD = "add";
+ public static final String COMMAND_ADD_RANGE = "addRange";
+ public static final String COMMAND_DELETE = "del";
+ public static final String COMMAND_CLEAR = "clear";
+
+ final Set<Integer> phoneRegister = Sets.newHashSet();
+
+ private final transient HashMap<Integer, HighLow<Integer>> gps = new HashMap<Integer, HighLow<Integer>>();
+ private final Random random = new Random();
+ private int range = 50;
+ private int threshold = 80;
+ private int rotate = 0;
+
+ protected BasicCounters<MutableLong> commandCounters;
+
+ private transient OperatorContext context;
+ private final transient HashMap<Integer, HighLow<Integer>> newgps = new HashMap<Integer, HighLow<Integer>>();
+
+ public PhoneMovementGenerator()
+ {
+ this.commandCounters = new BasicCounters<MutableLong>(MutableLong.class);
+ }
+
+ /**
+ * @return the range of the phone numbers
+ */
+ @Min(0)
+ public int getRange()
+ {
+ return range;
+ }
+
+ /**
+ * Sets the range of phone numbers for which the GPS locations need to be generated.
+ *
+ * @param i the range of phone numbers to set
+ */
+ public void setRange(int i)
+ {
+ range = i;
+ }
+
+ /**
+ * @return the threshold
+ */
+ @Min(0)
+ public int getThreshold()
+ {
+ return threshold;
+ }
+
+ /**
+ * Sets the threshold that decides how frequently the GPS locations are updated.
+ *
+ * @param i the value that decides how frequently the GPS locations change.
+ */
+ public void setThreshold(int i)
+ {
+ threshold = i;
+ }
+
+ private void registerPhone(String phoneStr)
+ {
+ // register the phone channel
+ if (Strings.isNullOrEmpty(phoneStr)) {
+ return;
+ }
+ try {
+ Integer phone = new Integer(phoneStr);
+ registerSinglePhone(phone);
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Invalid no {}", phoneStr);
+ }
+ }
+
+ private void registerPhoneRange(String startPhoneStr, String endPhoneStr)
+ {
+ if (Strings.isNullOrEmpty(startPhoneStr) || Strings.isNullOrEmpty(endPhoneStr)) {
+ LOG.warn("Invalid phone range {} {}", startPhoneStr, endPhoneStr);
+ return;
+ }
+ try {
+ Integer startPhone = new Integer(startPhoneStr);
+ Integer endPhone = new Integer(endPhoneStr);
+ if (endPhone < startPhone) {
+ LOG.warn("Invalid phone range {} {}", startPhone, endPhone);
+ return;
+ }
+ for (int i = startPhone; i <= endPhone; i++) {
+ registerSinglePhone(i);
+ }
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Invalid phone range <{},{}>", startPhoneStr, endPhoneStr);
+ }
+ }
+
+ private void registerSinglePhone(int phone)
+ {
+ phoneRegister.add(phone);
+ LOG.debug("Registered query id with phone {}", phone);
+ emitQueryResult(phone);
+ }
+
+ private void deregisterPhone(String phoneStr)
+ {
+ if (Strings.isNullOrEmpty(phoneStr)) {
+ return;
+ }
+ try {
+ Integer phone = new Integer(phoneStr);
+ // remove the channel
+ if (phoneRegister.contains(phone)) {
+ phoneRegister.remove(phone);
+ LOG.debug("Removing query id {}", phone);
+ emitPhoneRemoved(phone);
+ }
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Invalid phone {}", phoneStr);
+ }
+ }
+
+ private void clearPhones()
+ {
+ phoneRegister.clear();
+ LOG.info("Clearing phones");
+ }
+
+ public final transient DefaultOutputPort<Map<String, String>> locationQueryResult = new DefaultOutputPort<Map<String, String>>();
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ this.context = context;
+ commandCounters.setCounter(CommandCounters.ADD, new MutableLong());
+ commandCounters.setCounter(CommandCounters.ADD_RANGE, new MutableLong());
+ commandCounters.setCounter(CommandCounters.DELETE, new MutableLong());
+ commandCounters.setCounter(CommandCounters.CLEAR, new MutableLong());
+ }
+
+ /**
+ * Emit all the data and clear the hash
+ */
+ @Override
+ public void endWindow()
+ {
+ for (Map.Entry<Integer, HighLow<Integer>> e: newgps.entrySet()) {
+ HighLow<Integer> loc = gps.get(e.getKey());
+ if (loc == null) {
+ gps.put(e.getKey(), e.getValue());
+ } else {
+ loc.setHigh(e.getValue().getHigh());
+ loc.setLow(e.getValue().getLow());
+ }
+ }
+ boolean found = false;
+ for (Integer phone: phoneRegister) {
+ emitQueryResult( phone);
+ found = true;
+ }
+ if (!found) {
+ LOG.debug("No phone number");
+ }
+ newgps.clear();
+ context.setCounters(commandCounters);
+ }
+
+ private void emitQueryResult(Integer phone)
+ {
+ HighLow<Integer> loc = gps.get(phone);
+ if (loc != null) {
+ Map<String, String> queryResult = new HashMap<String, String>();
+ queryResult.put(KEY_PHONE, String.valueOf(phone));
+ queryResult.put(KEY_LOCATION, loc.toString());
+ locationQueryResult.emit(queryResult);
+ }
+ }
+
+ private void emitPhoneRemoved(Integer phone)
+ {
+ Map<String,String> removedResult = Maps.newHashMap();
+ removedResult.put(KEY_PHONE, String.valueOf(phone));
+ removedResult.put(KEY_REMOVED,"true");
+ locationQueryResult.emit(removedResult);
+ }
+
+ public static enum CommandCounters
+ {
+ ADD, ADD_RANGE, DELETE, CLEAR
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(PhoneMovementGenerator.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/doc-files/Mobile.png
----------------------------------------------------------------------
diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/doc-files/Mobile.png b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/doc-files/Mobile.png
new file mode 100644
index 0000000..a25da0d
Binary files /dev/null and b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/doc-files/Mobile.png differ
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/package-info.java
----------------------------------------------------------------------
diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/package-info.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/package-info.java
new file mode 100644
index 0000000..1262504
--- /dev/null
+++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Mobile phones tracking demonstration application.
+ */
+package org.apache.apex.examples.mobile;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/mobile/src/main/resources/META-INF/properties.xml b/examples/mobile/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..e213d18
--- /dev/null
+++ b/examples/mobile/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,82 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <property>
+ <name>dt.application.MobileExample.coolDownMillis</name>
+ <value>45000</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.maxThroughput</name>
+ <value>30000</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.minThroughput</name>
+ <value>10000</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.Receiver.tuplesBlast</name>
+ <value>200</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.Receiver.tuplesBlastIntervalMillis</name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.Receiver.outputport.integer_data.attr.QUEUE_CAPACITY</name>
+ <value>32768</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.LocationFinder.range</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.LocationFinder.threshold</name>
+ <value>80</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.LocationFinder.inputport.data.attr.QUEUE_CAPACITY</name>
+ <value>32768</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.LocationResults.prop.topic</name>
+ <value>examples.mobile.phoneLocationQueryResult</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.QueryLocation.prop.topic</name>
+ <value>examples.mobile.phoneLocationQuery</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.*.attr.MEMORY_MB</name>
+ <value>768</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.*.attr.JVM_OPTIONS</name>
+ <value>-Xmx128m</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
+ <value>256</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.attr.MASTER_MEMORY_MB</name>
+ <value>1024</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
new file mode 100644
index 0000000..ce6ca41
--- /dev/null
+++ b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mobile;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.Servlet;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
+import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+public class ApplicationTest
+{
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
+
+ public ApplicationTest()
+ {
+ }
+
+ /**
+ * Test of getApplication method, of class Application.
+ */
+ @Test
+ public void testGetApplication() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+ conf.addResource("dt-site-mobile.xml");
+ Server server = new Server(0);
+ Servlet servlet = new SamplePubSubWebSocketServlet();
+ ServletHolder sh = new ServletHolder(servlet);
+ ServletContextHandler contextHandler = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
+ contextHandler.addServlet(sh, "/pubsub");
+ contextHandler.addServlet(sh, "/*");
+ server.start();
+ Connector[] connector = server.getConnectors();
+ conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort());
+ URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
+
+ PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>();
+ outputOperator.setUri(uri);
+ outputOperator.setTopic(conf.get("dt.application.MobileExample.operator.QueryLocation.topic"));
+
+ PubSubWebSocketInputOperator<Map<String, String>> inputOperator = new PubSubWebSocketInputOperator<Map<String, String>>();
+ inputOperator.setUri(uri);
+ inputOperator.setTopic(conf.get("dt.application.MobileExample.operator.LocationResults.topic"));
+
+ CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+ inputOperator.outputPort.setSink(sink);
+
+ Map<String, String> data = new HashMap<String, String>();
+ data.put("command", "add");
+ data.put("phone", "5559990");
+
+ Application app = new Application();
+ LocalMode lma = LocalMode.newInstance();
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+ lc.runAsync();
+ Thread.sleep(5000);
+ inputOperator.setup(null);
+ outputOperator.setup(null);
+ inputOperator.activate(null);
+ outputOperator.beginWindow(0);
+ outputOperator.input.process(data);
+ outputOperator.endWindow();
+ inputOperator.beginWindow(0);
+ int timeoutMillis = 5000;
+ while (sink.collectedTuples.size() < 5 && timeoutMillis > 0) {
+ inputOperator.emitTuples();
+ timeoutMillis -= 20;
+ Thread.sleep(20);
+ }
+ inputOperator.endWindow();
+ lc.shutdown();
+ inputOperator.teardown();
+ outputOperator.teardown();
+ server.stop();
+ Assert.assertTrue("size of output is 5 ", sink.collectedTuples.size() == 5);
+ for (Object obj : sink.collectedTuples) {
+ Assert.assertEquals("Expected phone number", "5559990", ((Map<String, String>)obj).get("phone"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/test/resources/dt-site-mobile.xml
----------------------------------------------------------------------
diff --git a/examples/mobile/src/test/resources/dt-site-mobile.xml b/examples/mobile/src/test/resources/dt-site-mobile.xml
new file mode 100644
index 0000000..dcb6c98
--- /dev/null
+++ b/examples/mobile/src/test/resources/dt-site-mobile.xml
@@ -0,0 +1,87 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <property>
+ <name>dt.application.MobileExample.class</name>
+ <value>org.apache.apex.examples.mobile.Application</value>
+ <description>An alias for the application</description>
+ </property>
+ <!--property>
+ <name>dt.attr.GATEWAY_CONNECT_ADDRESS</name>
+ <value>localhost:19090</value>
+ </property-->
+ <property>
+ <name>dt.application.MobileExample.totalSeedNumbers</name>
+ <value>0</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.coolDownMillis</name>
+ <value>45000</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.maxThroughput</name>
+ <value>30000</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.minThroughput</name>
+ <value>1</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.Receiver.tuplesBlast</name>
+ <value>200</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.Receiver.tuplesBlastIntervalMillis</name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.Receiver.outputport.integer_data.attr.QUEUE_CAPACITY</name>
+ <value>32768</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.LocationFinder.range</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.LocationFinder.threshold</name>
+ <value>80</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.LocationFinder.inputport.data.attr.QUEUE_CAPACITY</name>
+ <value>32768</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.LocationResults.topic</name>
+ <value>resultTopic</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.QueryLocation.topic</name>
+ <value>queryTopic</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.operator.*.attr.MEMORY_MB</name>
+ <value>2048</value>
+ </property>
+ <property>
+ <name>dt.application.MobileExample.attr.MASTER_MEMORY_MB</name>
+ <value>1024</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/mobile/src/test/resources/log4j.properties b/examples/mobile/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/mobile/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/pom.xml
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/pom.xml b/examples/mrmonitor/pom.xml
new file mode 100644
index 0000000..527565b
--- /dev/null
+++ b/examples/mrmonitor/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>malhar-examples-mrmonitor</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar MR Monitoring Example</name>
+ <description></description>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.7.0-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <skipTests>true</skipTests>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-websocket</artifactId>
+ <version>8.1.10.v20130312</version>
+ <scope>test</scope>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>8.1.10.v20130312</version>
+ <scope>test</scope>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.3.5</version>
+ <type>jar</type>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/assemble/appPackage.xml b/examples/mrmonitor/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/mrmonitor/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Application.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Application.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Application.java
new file mode 100644
index 0000000..0557919
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Application.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.SeedEventGenerator;
+
+/**
+ * Application
+ *
+ * @since 2.0.0
+ */
+@ApplicationAnnotation(name = "MyFirstApplication")
+public class Application implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ // Sample DAG with 2 operators
+ // Replace this code with the DAG you want to build
+
+ SeedEventGenerator seedGen = dag.addOperator("seedGen", SeedEventGenerator.class);
+ seedGen.setSeedStart(1);
+ seedGen.setSeedEnd(10);
+ seedGen.addKeyData("x", 0, 10);
+ seedGen.addKeyData("y", 0, 100);
+
+ ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
+ cons.setStringFormat("hello: %s");
+
+ dag.addStream("seeddata", seedGen.val_list, cons.input).setLocality(Locality.CONTAINER_LOCAL);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Constants.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Constants.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Constants.java
new file mode 100644
index 0000000..1ed1f26
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Constants.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+/**
+ * <p>Constants class.</p>
+ *
+ * @since 0.3.4
+ */
+public interface Constants
+{
+
+ public static final int MAX_NUMBER_OF_JOBS = 25;
+
+ public static final String REDUCE_TASK_TYPE = "REDUCE";
+ public static final String MAP_TASK_TYPE = "MAP";
+ public static final String TASK_TYPE = "type";
+ public static final String TASK_ID = "id";
+
+ public static final String LEAGACY_TASK_ID = "taskId";
+ public static final int MAX_TASKS = 2000;
+
+ public static final String QUERY_APP_ID = "app_id";
+ public static final String QUERY_JOB_ID = "job_id";
+ public static final String QUERY_HADOOP_VERSION = "hadoop_version";
+ public static final String QUERY_API_VERSION = "api_version";
+ public static final String QUERY_RM_PORT = "rm_port";
+ public static final String QUERY_HS_PORT = "hs_port";
+ public static final String QUERY_HOST_NAME = "hostname";
+ public static final String QUERY_KEY_COMMAND = "command";
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRJobStatusOperator.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRJobStatusOperator.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRJobStatusOperator.java
new file mode 100644
index 0000000..24b3887
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRJobStatusOperator.java
@@ -0,0 +1,622 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.examples.mrmonitor.MRStatusObject.TaskObject;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+
+/**
+ * <p>
+ * MRJobStatusOperator class.
+ * </p>
+ *
+ * @since 0.3.4
+ */
+public class MRJobStatusOperator implements Operator, IdleTimeHandler
+{
+ private static final Logger LOG = LoggerFactory.getLogger(MRJobStatusOperator.class);
+
+ private static final String JOB_PREFIX = "job_";
+ /**
+ * This outputs the meta information of the job
+ */
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+ /**
+ * This outputs the map task information of the job
+ */
+ public final transient DefaultOutputPort<String> mapOutput = new DefaultOutputPort<String>();
+ /**
+ * This outputs the reduce task information of the job
+ */
+ public final transient DefaultOutputPort<String> reduceOutput = new DefaultOutputPort<String>();
+ /**
+ * This outputs the counter information of the job
+ */
+ public final transient DefaultOutputPort<String> counterOutput = new DefaultOutputPort<String>();
+ /**
+ * This is time in Ms before making new request for data
+ */
+ private transient int sleepTime = 100;
+ /**
+ * This is the number of consecutive windows of no change before the job is removed from map
+ */
+ private int maxRetrials = 10;
+ /**
+ * The number of minutes for which the status history of map and reduce tasks is stored
+ */
+ private int statusHistoryTime = 60;
+ private Map<String, MRStatusObject> jobMap = new HashMap<String, MRStatusObject>();
+ /**
+ * This represents the maximum number of jobs the single instance of this operator is going to server at any time
+ */
+ private int maxJobs = Constants.MAX_NUMBER_OF_JOBS;
+ private transient Iterator<MRStatusObject> iterator;
+
+ /*
+ * each input string is a map of the following format {"app_id":<>,"hadoop_version":<>,"api_version":<>,"command":<>,
+ * "hostname":<>,"hs_port":<>,"rm_port":<>,"job_id":<>}
+ */
+ public final transient DefaultInputPort<MRStatusObject> input = new DefaultInputPort<MRStatusObject>()
+ {
+ @Override
+ public void process(MRStatusObject mrStatusObj)
+ {
+
+ if (jobMap == null) {
+ jobMap = new HashMap<String, MRStatusObject>();
+ }
+
+ if (jobMap.size() >= maxJobs) {
+ return;
+ }
+
+ if ("delete".equalsIgnoreCase(mrStatusObj.getCommand())) {
+ removeJob(mrStatusObj.getJobId());
+ JSONObject outputJsonObject = new JSONObject();
+ try {
+ outputJsonObject.put("id", mrStatusObj.getJobId());
+ outputJsonObject.put("removed", "true");
+ output.emit(outputJsonObject.toString());
+ } catch (JSONException e) {
+ LOG.warn("Error creating JSON: {}", e.getMessage());
+ }
+ return;
+ }
+ if ("clear".equalsIgnoreCase(mrStatusObj.getCommand())) {
+ clearMap();
+ return;
+ }
+
+ if (jobMap.get(mrStatusObj.getJobId()) != null) {
+ mrStatusObj = jobMap.get(mrStatusObj.getJobId());
+ }
+ if (mrStatusObj.getHadoopVersion() == 2) {
+ getJsonForJob(mrStatusObj);
+ } else if (mrStatusObj.getHadoopVersion() == 1) {
+ getJsonForLegacyJob(mrStatusObj);
+ }
+ mrStatusObj.setStatusHistoryCount(statusHistoryTime);
+ iterator = jobMap.values().iterator();
+ emitHelper(mrStatusObj);
+ }
+ };
+
+ public int getStatusHistoryTime()
+ {
+ return statusHistoryTime;
+ }
+
+ public void setStatusHistoryTime(int statusHistoryTime)
+ {
+ this.statusHistoryTime = statusHistoryTime;
+ if (jobMap != null && jobMap.size() > 0) {
+ for (Entry<String, MRStatusObject> entry : jobMap.entrySet()) {
+ entry.getValue().setStatusHistoryCount(statusHistoryTime);
+ }
+ }
+
+ }
+
+ /**
+ * This method gets the latest status of the job from the Resource Manager for jobs submitted on hadoop 2.x version
+ *
+ * @param statusObj
+ */
+ private void getJsonForJob(MRStatusObject statusObj)
+ {
+
+ String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId();
+ String responseBody = MRUtil.getJsonForURL(url);
+
+ JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
+
+ if (jsonObj == null) {
+ url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId();
+ responseBody = MRUtil.getJsonForURL(url);
+ jsonObj = MRUtil.getJsonObject(responseBody);
+ }
+
+ if (jsonObj != null) {
+ if (jobMap.get(statusObj.getJobId()) != null) {
+ MRStatusObject tempObj = jobMap.get(statusObj.getJobId());
+ if (tempObj.getJsonObject().toString().equals(jsonObj.toString())) {
+ getJsonsForTasks(statusObj);
+ getCounterInfoForJob(statusObj);
+ return;
+ }
+ }
+ statusObj.setModified(true);
+ statusObj.setJsonObject(jsonObj);
+ getCounterInfoForJob(statusObj);
+ getJsonsForTasks(statusObj);
+ jobMap.put(statusObj.getJobId(), statusObj);
+ }
+ }
+
+ /**
+ * This method is used to collect the metric information about the job
+ *
+ * @param statusObj
+ */
+ private void getCounterInfoForJob(MRStatusObject statusObj)
+ {
+ String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId() + "/counters";
+ String responseBody = MRUtil.getJsonForURL(url);
+ JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
+ if (jsonObj == null) {
+ url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId() + "/counters";
+ responseBody = MRUtil.getJsonForURL(url);
+ jsonObj = MRUtil.getJsonObject(responseBody);
+ }
+
+ if (jsonObj != null) {
+ if (statusObj.getMetricObject() == null) {
+ statusObj.setMetricObject(new TaskObject(jsonObj));
+ } else if (!statusObj.getMetricObject().getJsonString().equalsIgnoreCase(jsonObj.toString())) {
+ statusObj.getMetricObject().setJson(jsonObj);
+ statusObj.getMetricObject().setModified(true);
+ }
+ }
+ }
+
+ /**
+ * This method gets the latest status of the tasks for a job from the Resource Manager for jobs submitted on hadoop
+ * 2.x version
+ *
+ * @param statusObj
+ */
+ private void getJsonsForTasks(MRStatusObject statusObj)
+ {
+ String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId() + "/tasks/";
+ String responseBody = MRUtil.getJsonForURL(url);
+
+ JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
+ if (jsonObj == null) {
+ url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId() + "/tasks/";
+ responseBody = MRUtil.getJsonForURL(url);
+
+ jsonObj = MRUtil.getJsonObject(responseBody);
+ }
+
+ if (jsonObj != null) {
+
+ try {
+ Map<String, TaskObject> mapTaskOject = statusObj.getMapJsonObject();
+ Map<String, TaskObject> reduceTaskOject = statusObj.getReduceJsonObject();
+ JSONArray taskJsonArray = jsonObj.getJSONObject("tasks").getJSONArray("task");
+
+ for (int i = 0; i < taskJsonArray.length(); i++) {
+ JSONObject taskObj = taskJsonArray.getJSONObject(i);
+ if (Constants.REDUCE_TASK_TYPE.equalsIgnoreCase(taskObj.getString(Constants.TASK_TYPE))) {
+ if (reduceTaskOject.get(taskObj.getString(Constants.TASK_ID)) != null) {
+ TaskObject tempTaskObj = reduceTaskOject.get(taskObj.getString(Constants.TASK_ID));
+ if (tempTaskObj.getJsonString().equals(taskObj.toString())) {
+ continue;
+ }
+ tempTaskObj.setJson(taskObj);
+ tempTaskObj.setModified(true);
+ reduceTaskOject.put(taskObj.getString(Constants.TASK_ID), tempTaskObj);
+ continue;
+ }
+ reduceTaskOject.put(taskObj.getString(Constants.TASK_ID), new TaskObject(taskObj));
+ } else {
+ if (mapTaskOject.get(taskObj.getString(Constants.TASK_ID)) != null) {
+ TaskObject tempTaskObj = mapTaskOject.get(taskObj.getString(Constants.TASK_ID));
+ if (tempTaskObj.getJsonString().equals(taskObj.toString())) {
+ continue;
+ }
+ tempTaskObj.setJson(taskObj);
+ tempTaskObj.setModified(true);
+ mapTaskOject.put(taskObj.getString(Constants.TASK_ID), tempTaskObj);
+ continue;
+ }
+ mapTaskOject.put(taskObj.getString(Constants.TASK_ID), new TaskObject(taskObj));
+ }
+ }
+ statusObj.setMapJsonObject(mapTaskOject);
+ statusObj.setReduceJsonObject(reduceTaskOject);
+ } catch (Exception e) {
+ LOG.info("exception: {}", e.getMessage());
+ }
+ }
+
+ }
+
+ /**
+ * This method gets the latest status of the job from the Task Manager for jobs submitted on hadoop 1.x version
+ *
+ * @param statusObj
+ */
+ private void getJsonForLegacyJob(MRStatusObject statusObj)
+ {
+
+ String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/jobdetails.jsp?format=json&jobid=job_" + statusObj.getJobId();
+ String responseBody = MRUtil.getJsonForURL(url);
+
+ JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
+ if (jsonObj == null) {
+ return;
+ }
+
+ if (jobMap.get(statusObj.getJobId()) != null) {
+ MRStatusObject tempObj = jobMap.get(statusObj.getJobId());
+ if (tempObj.getJsonObject().toString().equals(jsonObj.toString())) {
+ getJsonsForLegacyTasks(statusObj, "map");
+ getJsonsForLegacyTasks(statusObj, "reduce");
+ // output.emit(jsonObj.toString());
+ // removeJob(statusObj.getJobId());
+ return;
+ }
+ }
+
+ // output.emit(jsonObj.toString());
+ statusObj.setModified(true);
+ statusObj.setJsonObject(jsonObj);
+ getJsonsForLegacyTasks(statusObj, "map");
+ getJsonsForLegacyTasks(statusObj, "reduce");
+ jobMap.put(statusObj.getJobId(), statusObj);
+
+ }
+
+ /**
+ * This method gets the latest status of the tasks for a job from the Task Manager for jobs submitted on hadoop 1.x
+ * version
+ *
+ * @param statusObj
+ * @param type
+ */
+ private void getJsonsForLegacyTasks(MRStatusObject statusObj, String type)
+ {
+ try {
+ JSONObject jobJson = statusObj.getJsonObject();
+ int totalTasks = ((JSONObject)((JSONObject)jobJson.get(type + "TaskSummary")).get("taskStats")).getInt("numTotalTasks");
+ Map<String, TaskObject> taskMap;
+ if (type.equalsIgnoreCase("map")) {
+ taskMap = statusObj.getMapJsonObject();
+ } else {
+ taskMap = statusObj.getReduceJsonObject();
+ }
+
+ int totalPagenums = (totalTasks / Constants.MAX_TASKS) + 1;
+ String baseUrl = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/jobtasks.jsp?type=" + type + "&format=json&jobid=job_" + statusObj.getJobId() + "&pagenum=";
+
+ for (int pagenum = 1; pagenum <= totalPagenums; pagenum++) {
+
+ String url = baseUrl + pagenum;
+ String responseBody = MRUtil.getJsonForURL(url);
+
+ JSONObject jsonObj = MRUtil.getJsonObject(responseBody);
+ if (jsonObj == null) {
+ return;
+ }
+
+ JSONArray taskJsonArray = jsonObj.getJSONArray("tasksInfo");
+
+ for (int i = 0; i < taskJsonArray.length(); i++) {
+ JSONObject taskObj = taskJsonArray.getJSONObject(i);
+ {
+ if (taskMap.get(taskObj.getString(Constants.LEAGACY_TASK_ID)) != null) {
+ TaskObject tempReduceObj = taskMap.get(taskObj.getString(Constants.LEAGACY_TASK_ID));
+ if (tempReduceObj.getJsonString().equals(taskObj.toString())) {
+ // tempReduceObj.setModified(false);
+ // taskMap.put(taskObj.getString(Constants.TASK_ID), tempReduceObj);
+ continue;
+ }
+ tempReduceObj.setJson(taskObj);
+ tempReduceObj.setModified(true);
+ taskMap.put(taskObj.getString(Constants.TASK_ID), tempReduceObj);
+ continue;
+
+ }
+ taskMap.put(taskObj.getString(Constants.LEAGACY_TASK_ID), new TaskObject(taskObj));
+ }
+ }
+ }
+
+ if (type.equalsIgnoreCase("map")) {
+ statusObj.setMapJsonObject(taskMap);
+ } else {
+ statusObj.setReduceJsonObject(taskMap);
+ }
+ } catch (Exception e) {
+ LOG.info(e.getMessage());
+ }
+
+ }
+
+ @Override
+ public void handleIdleTime()
+ {
+ try {
+ Thread.sleep(sleepTime);//
+ } catch (InterruptedException ie) {
+ // If this thread was intrrupted by nother thread
+ }
+ if (!iterator.hasNext()) {
+ iterator = jobMap.values().iterator();
+ }
+
+ if (iterator.hasNext()) {
+ MRStatusObject obj = iterator.next();
+ if (obj.getHadoopVersion() == 2) {
+ getJsonForJob(obj);
+ } else if (obj.getHadoopVersion() == 1) {
+ getJsonForLegacyJob(obj);
+ }
+ }
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ iterator = jobMap.values().iterator();
+ sleepTime = context.getValue(OperatorContext.SPIN_MILLIS);
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ @Override
+ public void beginWindow(long arg0)
+ {
+ }
+
+ private void emitHelper(MRStatusObject obj)
+ {
+ try {
+ obj.setModified(false);
+ output.emit(obj.getJsonObject().toString());
+ JSONObject outputJsonObject = new JSONObject();
+
+ outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
+ outputJsonObject.put("mapHistory", new JSONArray(obj.getMapStatusHistory()));
+ outputJsonObject.put("reduceHistory", new JSONArray(obj.getReduceStatusHistory()));
+ outputJsonObject.put("physicalMemoryHistory", new JSONArray(obj.getPhysicalMemeoryStatusHistory()));
+ outputJsonObject.put("virtualMemoryHistory", new JSONArray(obj.getVirtualMemoryStatusHistory()));
+ outputJsonObject.put("cpuHistory", new JSONArray(obj.getCpuStatusHistory()));
+ output.emit(outputJsonObject.toString());
+ obj.setChangedHistoryStatus(false);
+
+ outputJsonObject = new JSONObject();
+ outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
+ JSONArray arr = new JSONArray();
+
+ for (Map.Entry<String, TaskObject> mapEntry : obj.getMapJsonObject().entrySet()) {
+ TaskObject json = mapEntry.getValue();
+ json.setModified(false);
+ arr.put(json.getJson());
+ }
+
+ outputJsonObject.put("tasks", arr);
+ mapOutput.emit(outputJsonObject.toString());
+
+ outputJsonObject = new JSONObject();
+ outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
+ arr = new JSONArray();
+
+ for (Map.Entry<String, TaskObject> mapEntry : obj.getReduceJsonObject().entrySet()) {
+ TaskObject json = mapEntry.getValue();
+ json.setModified(false);
+ arr.put(json.getJson());
+ }
+
+ outputJsonObject.put("tasks", arr);
+ reduceOutput.emit(outputJsonObject.toString());
+ obj.setRetrials(0);
+ } catch (Exception e) {
+ LOG.warn("error creating json {}", e.getMessage());
+ }
+
+ }
+
+ @Override
+ public void endWindow()
+ {
+ List<String> delList = new ArrayList<String>();
+ try {
+ for (Map.Entry<String, MRStatusObject> entry : jobMap.entrySet()) {
+ MRStatusObject obj = entry.getValue();
+
+ JSONObject outputJsonObject = new JSONObject();
+ outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
+
+ boolean modified = false;
+
+ if (obj.isModified()) {
+ modified = true;
+ obj.setModified(false);
+ output.emit(obj.getJsonObject().toString());
+ if (obj.isChangedHistoryStatus()) {
+ outputJsonObject.put("mapHistory", new JSONArray(obj.getMapStatusHistory()));
+ outputJsonObject.put("reduceHistory", new JSONArray(obj.getReduceStatusHistory()));
+ outputJsonObject.put("physicalMemoryHistory", new JSONArray(obj.getPhysicalMemeoryStatusHistory()));
+ outputJsonObject.put("virtualMemoryHistory", new JSONArray(obj.getVirtualMemoryStatusHistory()));
+ outputJsonObject.put("cpuHistory", new JSONArray(obj.getCpuStatusHistory()));
+ output.emit(outputJsonObject.toString());
+ obj.setChangedHistoryStatus(false);
+ }
+ }
+ outputJsonObject = new JSONObject();
+ outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
+ JSONArray arr = new JSONArray();
+
+ for (Map.Entry<String, TaskObject> mapEntry : obj.getMapJsonObject().entrySet()) {
+ TaskObject json = mapEntry.getValue();
+ if (json.isModified()) {
+ modified = true;
+ json.setModified(false);
+ arr.put(json.getJson());
+ }
+ }
+
+ if (arr.length() > 0) {
+ outputJsonObject.put("tasks", arr);
+ mapOutput.emit(outputJsonObject.toString());
+ }
+
+ outputJsonObject = new JSONObject();
+ outputJsonObject.put("id", JOB_PREFIX + obj.getJobId());
+ arr = new JSONArray();
+
+ for (Map.Entry<String, TaskObject> mapEntry : obj.getReduceJsonObject().entrySet()) {
+ TaskObject json = mapEntry.getValue();
+ if (json.isModified()) {
+ modified = true;
+ json.setModified(false);
+ arr.put(json.getJson());
+ }
+ }
+ if (arr.length() > 0) {
+ outputJsonObject.put("tasks", arr);
+ reduceOutput.emit(outputJsonObject.toString());
+ }
+
+ if (obj.getMetricObject() != null && obj.getMetricObject().isModified()) {
+ modified = true;
+ obj.getMetricObject().setModified(false);
+ counterOutput.emit(obj.getMetricObject().getJsonString());
+ }
+
+ if (!modified) {
+ if (obj.getRetrials() >= maxRetrials) {
+ delList.add(obj.getJobId());
+ } else {
+ obj.setRetrials(obj.getRetrials() + 1);
+ }
+ } else {
+ obj.setRetrials(0);
+ }
+ }
+ } catch (Exception ex) {
+ LOG.warn("error creating json {}", ex.getMessage());
+ }
+
+ if (!delList.isEmpty()) {
+ Iterator<String> itr = delList.iterator();
+ while (itr.hasNext()) {
+ removeJob(itr.next());
+ }
+ }
+
+ }
+
+ /**
+ * This method removes the job from the map
+ *
+ * @param jobId
+ */
+ public void removeJob(String jobId)
+ {
+ if (jobMap != null) {
+ jobMap.remove(jobId);
+ iterator = jobMap.values().iterator();
+ }
+ }
+
+ /**
+ * This method clears the job map
+ */
+ public void clearMap()
+ {
+ if (jobMap != null) {
+ jobMap.clear();
+ iterator = jobMap.values().iterator();
+ }
+ }
+
+ /**
+ * This returns the maximum number of jobs the single instance of this operator is going to server at any time
+ *
+ * @return
+ */
+ public int getMaxJobs()
+ {
+ return maxJobs;
+ }
+
+ /**
+ * This sets the maximum number of jobs the single instance of this operator is going to server at any time
+ *
+ * @param maxJobs
+ */
+ public void setMaxJobs(int maxJobs)
+ {
+ this.maxJobs = maxJobs;
+ }
+
+ /**
+ * This sets the number of consecutive windows of no change before the job is removed from map
+ *
+ * @return
+ */
+ public int getMaxRetrials()
+ {
+ return maxRetrials;
+ }
+
+ /**
+ * This returns the number of consecutive windows of no change before the job is removed from map
+ *
+ * @param maxRetrials
+ */
+ public void setMaxRetrials(int maxRetrials)
+ {
+ this.maxRetrials = maxRetrials;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
----------------------------------------------------------------------
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
new file mode 100644
index 0000000..288da84
--- /dev/null
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.mrmonitor;
+
+import java.net.URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
+
+/**
+ * <p>
+ * MRDebuggerApplication class.
+ * </p>
+ *
+ * @since 0.3.4
+ */
+@ApplicationAnnotation(name = "MRMonitoringExample")
+public class MRMonitoringApplication implements StreamingApplication
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(MRMonitoringApplication.class);
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+ MRJobStatusOperator mrJobOperator = dag.addOperator("JobMonitor", new MRJobStatusOperator());
+ URI uri = URI.create("ws://" + daemonAddress + "/pubsub");
+ logger.info("WebSocket with daemon at {}", daemonAddress);
+
+ PubSubWebSocketInputOperator wsIn = dag.addOperator("Query", new PubSubWebSocketInputOperator());
+ wsIn.setUri(uri);
+
+ MapToMRObjectOperator queryConverter = dag.addOperator("QueryConverter", new MapToMRObjectOperator());
+
+ /**
+ * This is used to emit the meta data about the job
+ */
+ PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator("JobOutput", new PubSubWebSocketOutputOperator<Object>());
+ wsOut.setUri(uri);
+
+ /**
+ * This is used to emit the information of map tasks of the job
+ */
+ PubSubWebSocketOutputOperator<Object> wsMapOut = dag.addOperator("MapJob", new PubSubWebSocketOutputOperator<Object>());
+ wsMapOut.setUri(uri);
+
+ /**
+ * This is used to emit the information of reduce tasks of the job
+ */
+ PubSubWebSocketOutputOperator<Object> wsReduceOut = dag.addOperator("ReduceJob", new PubSubWebSocketOutputOperator<Object>());
+ wsReduceOut.setUri(uri);
+
+ /**
+ * This is used to emit the metric information of the job
+ */
+ PubSubWebSocketOutputOperator<Object> wsCounterOut = dag.addOperator("JobCounter", new PubSubWebSocketOutputOperator<Object>());
+ wsCounterOut.setUri(uri);
+
+ dag.addStream("QueryConversion", wsIn.outputPort, queryConverter.input);
+ dag.addStream("QueryProcessing", queryConverter.output, mrJobOperator.input);
+ dag.addStream("JobData", mrJobOperator.output, wsOut.input);
+ dag.addStream("MapData", mrJobOperator.mapOutput, wsMapOut.input);
+ dag.addStream("ReduceData", mrJobOperator.reduceOutput, wsReduceOut.input);
+ dag.addStream("CounterData", mrJobOperator.counterOutput, wsCounterOut.input);
+ }
+
+}