You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/26 21:10:21 UTC
[3/6] apex-malhar git commit: Fixed checkstyle errors for demos.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
index d140f77..8f68dab 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/CalculatorOperator.java
@@ -20,8 +20,6 @@ package com.datatorrent.demos.machinedata.operator;
import java.io.Serializable;
import java.math.BigDecimal;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -33,14 +31,16 @@ import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
-import com.datatorrent.lib.util.KeyValPair;
-
-import com.datatorrent.api.*;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamCodec;
import com.datatorrent.common.util.BaseOperator;
-
-import com.datatorrent.demos.machinedata.data.*;
+import com.datatorrent.demos.machinedata.data.MachineInfo;
+import com.datatorrent.demos.machinedata.data.MachineKey;
+import com.datatorrent.demos.machinedata.data.ResourceType;
import com.datatorrent.demos.machinedata.util.DataTable;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.KeyValPair;
/**
* <p>
@@ -179,13 +179,12 @@ public class CalculatorOperator extends BaseOperator
{
double val = (kthPercentile * sorted.size()) / 100.0;
- if (val == (int) val) {
+ if (val == (int)val) {
// Whole number
- int idx = (int) val - 1;
+ int idx = (int)val - 1;
return (sorted.get(idx) + sorted.get(idx + 1)) / 2.0;
- }
- else {
- int idx = (int) Math.round(val) - 1;
+ } else {
+ int idx = (int)Math.round(val) - 1;
return sorted.get(idx);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
index 29f700f..bbfd547 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingOperator.java
@@ -18,18 +18,6 @@
*/
package com.datatorrent.demos.machinedata.operator;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.demos.machinedata.data.AverageData;
-import com.datatorrent.lib.util.KeyHashValPair;
-import com.datatorrent.lib.util.KeyValPair;
-
-import com.google.common.collect.Maps;
-
import java.math.BigDecimal;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@@ -38,6 +26,18 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.demos.machinedata.data.AverageData;
+import com.datatorrent.demos.machinedata.data.MachineInfo;
+import com.datatorrent.demos.machinedata.data.MachineKey;
+import com.datatorrent.lib.util.KeyHashValPair;
+import com.datatorrent.lib.util.KeyValPair;
+
/**
* This class calculates the average for various resources across different devices for a given key
* <p>MachineInfoAveragingOperator class.</p>
@@ -184,7 +184,7 @@ public class MachineInfoAveragingOperator extends BaseOperator
{
StringBuilder sb = new StringBuilder();
if (key instanceof MachineKey) {
- MachineKey mkey = (MachineKey) key;
+ MachineKey mkey = (MachineKey)key;
Integer customer = mkey.getCustomer();
if (customer != null) {
sb.append("customer: " + customer + "\n");
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
index 15e6f07..cb5fa5a 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingPrerequisitesOperator.java
@@ -18,19 +18,18 @@
*/
package com.datatorrent.demos.machinedata.operator;
-import com.datatorrent.common.util.BaseOperator;
+import java.util.HashMap;
+import java.util.Map;
+
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
+import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.demos.machinedata.data.AverageData;
+import com.datatorrent.demos.machinedata.data.MachineInfo;
+import com.datatorrent.demos.machinedata.data.MachineKey;
import com.datatorrent.lib.util.KeyHashValPair;
-
-import java.util.HashMap;
-import java.util.Map;
-
/**
* This class calculates the partial sum and count for tuples generated by upstream Operator
* <p> MachineInfoAveragingPrerequisitesOperator class. </p>
@@ -51,8 +50,6 @@ public class MachineInfoAveragingPrerequisitesOperator extends BaseOperator
MachineInfoAveragingUnifier unifier = new MachineInfoAveragingUnifier();
return unifier;
}
-
- ;
};
public transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>()
@@ -66,8 +63,7 @@ public class MachineInfoAveragingPrerequisitesOperator extends BaseOperator
if (averageData == null) {
averageData = new AverageData(tuple.getCpu(), tuple.getHdd(), tuple.getRam(), 1);
sums.put(key, averageData);
- }
- else {
+ } else {
averageData.setCpu(averageData.getCpu() + tuple.getCpu());
averageData.setRam(averageData.getRam() + tuple.getRam());
averageData.setHdd(averageData.getHdd() + tuple.getHdd());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
index 40995b2..e0b67f3 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/operator/MachineInfoAveragingUnifier.java
@@ -21,8 +21,8 @@ package com.datatorrent.demos.machinedata.operator;
import java.util.HashMap;
import java.util.Map;
-import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator.Unifier;
import com.datatorrent.demos.machinedata.data.AverageData;
@@ -80,8 +80,7 @@ public class MachineInfoAveragingUnifier implements Unifier<KeyHashValPair<Machi
AverageData tupleValue = arg0.getValue();
if (averageData == null) {
sums.put(tupleKey, tupleValue);
- }
- else {
+ } else {
averageData.setCpu(averageData.getCpu() + tupleValue.getCpu());
averageData.setRam(averageData.getRam() + tupleValue.getRam());
averageData.setHdd(averageData.getHdd() + tupleValue.getHdd());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
index 88ee35d..6c4256a 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/Combinatorics.java
@@ -18,7 +18,11 @@
*/
package com.datatorrent.demos.machinedata.util;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* Generate combinations of elements for the given array of elements.
@@ -27,66 +31,71 @@ import java.util.*;
*
* @since 0.3.5
*/
-public class Combinatorics<T> {
+public class Combinatorics<T>
+{
- private T[] values;
- private int size = -1;
- private List<T> result;
- private Map<Integer, List<T>> resultMap = new HashMap<Integer, List<T>>();
- private int resultMapSize = 0;
+ private T[] values;
+ private int size = -1;
+ private List<T> result;
+ private Map<Integer, List<T>> resultMap = new HashMap<Integer, List<T>>();
+ private int resultMapSize = 0;
- /**
- * Generates all possible combinations with all the sizes.
- *
- * @param values
- */
- public Combinatorics(T[] values) {
- this.values = values;
- this.size = -1;
- this.result = new ArrayList<>();
- }
+ /**
+ * Generates all possible combinations with all the sizes.
+ *
+ * @param values
+ */
+ public Combinatorics(T[] values)
+ {
+ this.values = values;
+ this.size = -1;
+ this.result = new ArrayList<>();
+ }
- /**
- * Generates all possible combinations with the given size.
- *
- * @param values
- * @param size
- */
- public Combinatorics(T[] values, int size) {
- this.values = values;
- this.size = size;
- this.result = new ArrayList<>();
- }
+ /**
+ * Generates all possible combinations with the given size.
+ *
+ * @param values
+ * @param size
+ */
+ public Combinatorics(T[] values, int size)
+ {
+ this.values = values;
+ this.size = size;
+ this.result = new ArrayList<>();
+ }
- public Map<Integer, List<T>> generate() {
+ public Map<Integer, List<T>> generate()
+ {
- if (size == -1) {
- size = values.length;
- for (int i = 1; i <= size; i++) {
- int[] tmp = new int[i];
- Arrays.fill(tmp, -1);
- generateCombinations(0, 0, tmp);
- }
- } else {
- int[] tmp = new int[size];
- Arrays.fill(tmp, -1);
- generateCombinations(0, 0, tmp);
- }
- return resultMap;
+ if (size == -1) {
+ size = values.length;
+ for (int i = 1; i <= size; i++) {
+ int[] tmp = new int[i];
+ Arrays.fill(tmp, -1);
+ generateCombinations(0, 0, tmp);
+ }
+ } else {
+ int[] tmp = new int[size];
+ Arrays.fill(tmp, -1);
+ generateCombinations(0, 0, tmp);
}
+ return resultMap;
+ }
- public void generateCombinations(int start, int depth, int[] tmp) {
- if (depth == tmp.length) {
- for (int j = 0; j < depth; j++) {
- result.add(values[tmp[j]]);
- }
- resultMap.put(++resultMapSize, result);
- result = new ArrayList<>();
- return;
- }
- for (int i = start; i < values.length; i++) {
- tmp[depth] = i;
- generateCombinations(i + 1, depth + 1, tmp);
- }
+ public void generateCombinations(int start, int depth, int[] tmp)
+ {
+ if (depth == tmp.length) {
+ for (int j = 0; j < depth; j++) {
+ result.add(values[tmp[j]]);
+ }
+ resultMap.put(++resultMapSize, result);
+ result = new ArrayList<>();
+ return;
+ }
+ for (int i = start; i < values.length; i++) {
+ tmp[depth] = i;
+ generateCombinations(i + 1, depth + 1, tmp);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
index 8820400..f8f2d33 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/util/DataTable.java
@@ -30,36 +30,46 @@ import com.google.common.collect.Maps;
*
* @since 0.3.5
*/
-public class DataTable<R,C,E> {
+public class DataTable<R,C,E>
+{
- //machineKey, [cpu,ram,hdd] -> value
- private final Map<R,Map<C,E>> table= Maps.newHashMap();
+ //machineKey, [cpu,ram,hdd] -> value
+ private final Map<R,Map<C,E>> table = Maps.newHashMap();
- public boolean containsRow(R rowKey){
- return table.containsKey(rowKey);
- }
+ public boolean containsRow(R rowKey)
+ {
+ return table.containsKey(rowKey);
+ }
- public void put(R rowKey,C colKey, E entry){
- if(!containsRow(rowKey)){
- table.put(rowKey, Maps.<C,E>newHashMap());
- }
- table.get(rowKey).put(colKey, entry);
- }
+ public void put(R rowKey,C colKey, E entry)
+ {
+ if (!containsRow(rowKey)) {
+ table.put(rowKey, Maps.<C,E>newHashMap());
+ }
+ table.get(rowKey).put(colKey, entry);
+ }
- @Nullable public E get(R rowKey, C colKey){
- if(!containsRow(rowKey)) return null;
- return table.get(rowKey).get(colKey);
- }
+ @Nullable
+ public E get(R rowKey, C colKey)
+ {
+ if (!containsRow(rowKey)) {
+ return null;
+ }
+ return table.get(rowKey).get(colKey);
+ }
- public Set<R> rowKeySet(){
- return table.keySet();
- }
+ public Set<R> rowKeySet()
+ {
+ return table.keySet();
+ }
- public void clear(){
- table.clear();
- }
+ public void clear()
+ {
+ table.clear();
+ }
- public Map<R,Map<C,E>> getTable(){
- return table;
- }
+ public Map<R,Map<C,E>> getTable()
+ {
+ return table;
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java b/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
index 1a26bd1..0e397be 100644
--- a/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
+++ b/demos/machinedata/src/test/java/com/datatorrent/demos/machinedata/CalculatorOperatorTest.java
@@ -18,6 +18,20 @@
*/
package com.datatorrent.demos.machinedata;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
import com.datatorrent.demos.machinedata.data.MachineInfo;
import com.datatorrent.demos.machinedata.data.MachineKey;
import com.datatorrent.demos.machinedata.data.ResourceType;
@@ -26,20 +40,6 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.TimeBucketKey;
-import com.google.common.collect.ImmutableList;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
/**
* @since 0.3.5
*/
@@ -94,7 +94,7 @@ public class CalculatorOperatorTest
Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
for (Object o : sortSink.collectedTuples) {
LOG.debug(o.toString());
- KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>) o;
+ KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
Assert.assertEquals("emitted value for 'cpu' was ", 2.0, keyValPair.getValue().get(ResourceType.CPU), 0);
Assert.assertEquals("emitted value for 'hdd' was ", 1.0, keyValPair.getValue().get(ResourceType.HDD), 0);
Assert.assertEquals("emitted value for 'ram' was ", 1.0, keyValPair.getValue().get(ResourceType.RAM), 0);
@@ -132,7 +132,7 @@ public class CalculatorOperatorTest
Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
for (Object o : sortSink.collectedTuples) {
LOG.debug(o.toString());
- KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>) o;
+ KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
Assert.assertEquals("emitted value for 'cpu' was ", getSD(ImmutableList.of(1, 2, 3)), keyValPair.getValue().get(ResourceType.CPU), 0);
Assert.assertEquals("emitted value for 'hdd' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.HDD), 0);
Assert.assertEquals("emitted value for 'ram' was ", getSD(ImmutableList.of(1, 1, 1)), keyValPair.getValue().get(ResourceType.RAM), 0);
@@ -184,7 +184,7 @@ public class CalculatorOperatorTest
Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size());
for (Object o : sortSink.collectedTuples) {
LOG.debug(o.toString());
- KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>) o;
+ KeyValPair<TimeBucketKey, Map<ResourceType, Double>> keyValPair = (KeyValPair<TimeBucketKey, Map<ResourceType, Double>>)o;
Assert.assertEquals("emitted value for 'cpu' was ", 3, keyValPair.getValue().get(ResourceType.CPU), 0);
Assert.assertEquals("emitted value for 'hdd' was ", 1, keyValPair.getValue().get(ResourceType.HDD), 0);
Assert.assertEquals("emitted value for 'ram' was ", 1, keyValPair.getValue().get(ResourceType.RAM), 0);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
index 9d9f31b..30d7281 100644
--- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
+++ b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
@@ -18,6 +18,18 @@
*/
package com.datatorrent.demos.mobile;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.lang3.Range;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StatsListener;
@@ -28,16 +40,6 @@ import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.commons.lang.mutable.MutableLong;
-import org.apache.commons.lang3.Range;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Random;
/**
* Mobile Demo Application:
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
index 3b1e49d..8964d84 100644
--- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
+++ b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java
@@ -18,18 +18,20 @@
*/
package com.datatorrent.demos.mobile;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
+import java.util.Map;
+import java.util.Random;
+import javax.validation.constraints.Min;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.validation.constraints.Min;
-import java.util.Map;
-import java.util.Random;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
/**
* Generates mobile numbers that will be displayed in mobile demo just after launch.<br></br>
@@ -99,7 +101,8 @@ public class PhoneEntryOperator extends BaseOperator
public final transient DefaultOutputPort<Map<String, String>> seedPhones = new DefaultOutputPort<Map<String, String>>();
@Override
- public void beginWindow(long windowId){
+ public void beginWindow(long windowId)
+ {
if (!seedGenerationDone) {
Random random = new Random();
int maxPhone = (maxSeedPhoneNumber <= rangeUpperEndpoint && maxSeedPhoneNumber >= rangeLowerEndpoint) ? maxSeedPhoneNumber : rangeUpperEndpoint;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java
index 8db74cd..a46e6d4 100644
--- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java
+++ b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java
@@ -25,20 +25,20 @@ import java.util.Set;
import javax.validation.constraints.Min;
-import org.apache.commons.lang.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.mutable.MutableLong;
+
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-
+import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.util.HighLow;
@@ -73,8 +73,7 @@ public class PhoneMovementGenerator extends BaseOperator
if (delta >= threshold) {
if (state < 2) {
xloc++;
- }
- else {
+ } else {
xloc--;
}
if (xloc < 0) {
@@ -85,8 +84,7 @@ public class PhoneMovementGenerator extends BaseOperator
if (delta >= threshold) {
if ((state == 1) || (state == 3)) {
yloc++;
- }
- else {
+ } else {
yloc--;
}
if (yloc < 0) {
@@ -100,8 +98,7 @@ public class PhoneMovementGenerator extends BaseOperator
HighLow<Integer> nloc = newgps.get(tuple);
if (nloc == null) {
newgps.put(tuple, new HighLow<Integer>(xloc, yloc));
- }
- else {
+ } else {
nloc.setHigh(xloc);
nloc.setLow(yloc);
}
@@ -109,7 +106,7 @@ public class PhoneMovementGenerator extends BaseOperator
}
};
- @InputPortFieldAnnotation(optional=true)
+ @InputPortFieldAnnotation(optional = true)
public final transient DefaultInputPort<Map<String,String>> phoneQuery = new DefaultInputPort<Map<String,String>>()
{
@Override
@@ -120,19 +117,16 @@ public class PhoneMovementGenerator extends BaseOperator
if (command != null) {
if (command.equals(COMMAND_ADD)) {
commandCounters.getCounter(CommandCounters.ADD).increment();
- String phoneStr= tuple.get(KEY_PHONE);
+ String phoneStr = tuple.get(KEY_PHONE);
registerPhone(phoneStr);
- }
- else if (command.equals(COMMAND_ADD_RANGE)) {
+ } else if (command.equals(COMMAND_ADD_RANGE)) {
commandCounters.getCounter(CommandCounters.ADD_RANGE).increment();
registerPhoneRange(tuple.get(KEY_START_PHONE), tuple.get(KEY_END_PHONE));
- }
- else if (command.equals(COMMAND_DELETE)) {
+ } else if (command.equals(COMMAND_DELETE)) {
commandCounters.getCounter(CommandCounters.DELETE).increment();
- String phoneStr= tuple.get(KEY_PHONE);
+ String phoneStr = tuple.get(KEY_PHONE);
deregisterPhone(phoneStr);
- }
- else if (command.equals(COMMAND_CLEAR)) {
+ } else if (command.equals(COMMAND_CLEAR)) {
commandCounters.getCounter(CommandCounters.CLEAR).increment();
clearPhones();
}
@@ -181,7 +175,7 @@ public class PhoneMovementGenerator extends BaseOperator
/**
* Sets the range of phone numbers for which the GPS locations need to be generated.
- *
+ *
* @param i the range of phone numbers to set
*/
public void setRange(int i)
@@ -190,7 +184,7 @@ public class PhoneMovementGenerator extends BaseOperator
}
/**
- * @return the threshold
+ * @return the threshold
*/
@Min(0)
public int getThreshold()
@@ -200,7 +194,7 @@ public class PhoneMovementGenerator extends BaseOperator
/**
* Sets the threshold that decides how frequently the GPS locations are updated.
- *
+ *
* @param i the value that decides how frequently the GPS locations change.
*/
public void setThreshold(int i)
@@ -217,8 +211,7 @@ public class PhoneMovementGenerator extends BaseOperator
try {
Integer phone = new Integer(phoneStr);
registerSinglePhone(phone);
- }
- catch (NumberFormatException nfe) {
+ } catch (NumberFormatException nfe) {
LOG.warn("Invalid no {}", phoneStr);
}
}
@@ -239,8 +232,7 @@ public class PhoneMovementGenerator extends BaseOperator
for (int i = startPhone; i <= endPhone; i++) {
registerSinglePhone(i);
}
- }
- catch (NumberFormatException nfe) {
+ } catch (NumberFormatException nfe) {
LOG.warn("Invalid phone range <{},{}>", startPhoneStr, endPhoneStr);
}
}
@@ -265,13 +257,13 @@ public class PhoneMovementGenerator extends BaseOperator
LOG.debug("Removing query id {}", phone);
emitPhoneRemoved(phone);
}
- }
- catch (NumberFormatException nfe) {
+ } catch (NumberFormatException nfe) {
LOG.warn("Invalid phone {}", phoneStr);
}
}
- private void clearPhones() {
+ private void clearPhones()
+ {
phoneRegister.clear();
LOG.info("Clearing phones");
}
@@ -298,8 +290,7 @@ public class PhoneMovementGenerator extends BaseOperator
HighLow<Integer> loc = gps.get(e.getKey());
if (loc == null) {
gps.put(e.getKey(), e.getValue());
- }
- else {
+ } else {
loc.setHigh(e.getValue().getHigh());
loc.setLow(e.getValue().getLow());
}
@@ -316,7 +307,8 @@ public class PhoneMovementGenerator extends BaseOperator
context.setCounters(commandCounters);
}
- private void emitQueryResult(Integer phone) {
+ private void emitQueryResult(Integer phone)
+ {
HighLow<Integer> loc = gps.get(phone);
if (loc != null) {
Map<String, String> queryResult = new HashMap<String, String>();
@@ -328,7 +320,7 @@ public class PhoneMovementGenerator extends BaseOperator
private void emitPhoneRemoved(Integer phone)
{
- Map<String,String> removedResult= Maps.newHashMap();
+ Map<String,String> removedResult = Maps.newHashMap();
removedResult.put(KEY_PHONE, String.valueOf(phone));
removedResult.put(KEY_REMOVED,"true");
locationQueryResult.emit(removedResult);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java b/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
index 72d6514..87e40bf 100644
--- a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
+++ b/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
@@ -35,13 +35,13 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+
import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.api.LocalMode;
-
public class ApplicationTest
{
private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
@@ -65,7 +65,7 @@ public class ApplicationTest
contextHandler.addServlet(sh, "/pubsub");
contextHandler.addServlet(sh, "/*");
server.start();
- Connector connector[] = server.getConnectors();
+ Connector[] connector = server.getConnectors();
conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort());
URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
@@ -111,7 +111,7 @@ public class ApplicationTest
server.stop();
Assert.assertTrue("size of output is 5 ", sink.collectedTuples.size() == 5);
for (Object obj : sink.collectedTuples) {
- Assert.assertEquals("Expected phone number", "5559990", ((Map<String, String>) obj).get("phone"));
+ Assert.assertEquals("Expected phone number", "5559990", ((Map<String, String>)obj).get("phone"));
}
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java
index 245d9c4..5625439 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java
@@ -20,20 +20,19 @@ package com.datatorrent.demos.mrmonitor;
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.testbench.SeedEventGenerator;
-
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.SeedEventGenerator;
/**
* Application
*
* @since 2.0.0
*/
-@ApplicationAnnotation(name="MyFirstApplication")
+@ApplicationAnnotation(name = "MyFirstApplication")
public class Application implements StreamingApplication
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java
index 2f3d651..7930405 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java
@@ -26,23 +26,23 @@ package com.datatorrent.demos.mrmonitor;
public interface Constants
{
- public final static int MAX_NUMBER_OF_JOBS = 25;
+ public static final int MAX_NUMBER_OF_JOBS = 25;
- public final static String REDUCE_TASK_TYPE = "REDUCE";
- public final static String MAP_TASK_TYPE = "MAP";
- public final static String TASK_TYPE = "type";
- public final static String TASK_ID = "id";
+ public static final String REDUCE_TASK_TYPE = "REDUCE";
+ public static final String MAP_TASK_TYPE = "MAP";
+ public static final String TASK_TYPE = "type";
+ public static final String TASK_ID = "id";
- public final static String LEAGACY_TASK_ID = "taskId";
- public final static int MAX_TASKS = 2000;
+ public static final String LEAGACY_TASK_ID = "taskId";
+ public static final int MAX_TASKS = 2000;
- public final static String QUERY_APP_ID = "app_id";
- public final static String QUERY_JOB_ID = "job_id";
- public final static String QUERY_HADOOP_VERSION = "hadoop_version";
- public final static String QUERY_API_VERSION = "api_version";
- public final static String QUERY_RM_PORT = "rm_port";
- public final static String QUERY_HS_PORT = "hs_port";
- public final static String QUERY_HOST_NAME = "hostname";
+ public static final String QUERY_APP_ID = "app_id";
+ public static final String QUERY_JOB_ID = "job_id";
+ public static final String QUERY_HADOOP_VERSION = "hadoop_version";
+ public static final String QUERY_API_VERSION = "api_version";
+ public static final String QUERY_RM_PORT = "rm_port";
+ public static final String QUERY_HS_PORT = "hs_port";
+ public static final String QUERY_HOST_NAME = "hostname";
public static final String QUERY_KEY_COMMAND = "command";
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java
index 88863e2..263a1a7 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java
@@ -18,7 +18,11 @@
*/
package com.datatorrent.demos.mrmonitor;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import org.codehaus.jettison.json.JSONArray;
@@ -107,8 +111,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
outputJsonObject.put("id", mrStatusObj.getJobId());
outputJsonObject.put("removed", "true");
output.emit(outputJsonObject.toString());
- }
- catch (JSONException e) {
+ } catch (JSONException e) {
LOG.warn("Error creating JSON: {}", e.getMessage());
}
return;
@@ -123,8 +126,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
}
if (mrStatusObj.getHadoopVersion() == 2) {
getJsonForJob(mrStatusObj);
- }
- else if (mrStatusObj.getHadoopVersion() == 1) {
+ } else if (mrStatusObj.getHadoopVersion() == 1) {
getJsonForLegacyJob(mrStatusObj);
}
mrStatusObj.setStatusHistoryCount(statusHistoryTime);
@@ -204,8 +206,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
if (jsonObj != null) {
if (statusObj.getMetricObject() == null) {
statusObj.setMetricObject(new TaskObject(jsonObj));
- }
- else if (!statusObj.getMetricObject().getJsonString().equalsIgnoreCase(jsonObj.toString())) {
+ } else if (!statusObj.getMetricObject().getJsonString().equalsIgnoreCase(jsonObj.toString())) {
statusObj.getMetricObject().setJson(jsonObj);
statusObj.getMetricObject().setModified(true);
}
@@ -252,8 +253,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
continue;
}
reduceTaskOject.put(taskObj.getString(Constants.TASK_ID), new TaskObject(taskObj));
- }
- else {
+ } else {
if (mapTaskOject.get(taskObj.getString(Constants.TASK_ID)) != null) {
TaskObject tempTaskObj = mapTaskOject.get(taskObj.getString(Constants.TASK_ID));
if (tempTaskObj.getJsonString().equals(taskObj.toString())) {
@@ -269,8 +269,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
}
statusObj.setMapJsonObject(mapTaskOject);
statusObj.setReduceJsonObject(reduceTaskOject);
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.info("exception: {}", e.getMessage());
}
}
@@ -324,12 +323,11 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
{
try {
JSONObject jobJson = statusObj.getJsonObject();
- int totalTasks = ((JSONObject) ((JSONObject) jobJson.get(type + "TaskSummary")).get("taskStats")).getInt("numTotalTasks");
+ int totalTasks = ((JSONObject)((JSONObject)jobJson.get(type + "TaskSummary")).get("taskStats")).getInt("numTotalTasks");
Map<String, TaskObject> taskMap;
if (type.equalsIgnoreCase("map")) {
taskMap = statusObj.getMapJsonObject();
- }
- else {
+ } else {
taskMap = statusObj.getReduceJsonObject();
}
@@ -371,12 +369,10 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
if (type.equalsIgnoreCase("map")) {
statusObj.setMapJsonObject(taskMap);
- }
- else {
+ } else {
statusObj.setReduceJsonObject(taskMap);
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.info(e.getMessage());
}
@@ -387,8 +383,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
{
try {
Thread.sleep(sleepTime);//
- }
- catch (InterruptedException ie) {
+ } catch (InterruptedException ie) {
// If this thread was intrrupted by nother thread
}
if (!iterator.hasNext()) {
@@ -399,8 +394,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
MRStatusObject obj = iterator.next();
if (obj.getHadoopVersion() == 2) {
getJsonForJob(obj);
- }
- else if (obj.getHadoopVersion() == 1) {
+ } else if (obj.getHadoopVersion() == 1) {
getJsonForLegacyJob(obj);
}
}
@@ -465,8 +459,7 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
outputJsonObject.put("tasks", arr);
reduceOutput.emit(outputJsonObject.toString());
obj.setRetrials(0);
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.warn("error creating json {}", e.getMessage());
}
@@ -543,17 +536,14 @@ public class MRJobStatusOperator implements Operator, IdleTimeHandler
if (!modified) {
if (obj.getRetrials() >= maxRetrials) {
delList.add(obj.getJobId());
- }
- else {
+ } else {
obj.setRetrials(obj.getRetrials() + 1);
}
- }
- else {
+ } else {
obj.setRetrials(0);
}
}
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
LOG.warn("error creating json {}", ex.getMessage());
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java
index 5758ad1..037378a 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java
@@ -20,10 +20,11 @@ package com.datatorrent.demos.mrmonitor;
import java.net.URI;
-import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java
index f0471f3..481f3dc 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java
@@ -149,7 +149,8 @@ public class MRStatusObject
virtualMemoryStatusHistory = new LinkedList<String>();
cpuStatusHistory = new LinkedList<String>();
statusScheduler = Executors.newScheduledThreadPool(1);
- statusScheduler.scheduleAtFixedRate(new Runnable() {
+ statusScheduler.scheduleAtFixedRate(new Runnable()
+ {
@Override
public void run()
{
@@ -333,12 +334,15 @@ public class MRStatusObject
@Override
public boolean equals(Object that)
{
- if (this == that)
+ if (this == that) {
return true;
- if (!(that instanceof MRStatusObject))
+ }
+ if (!(that instanceof MRStatusObject)) {
return false;
- if (this.hashCode() == that.hashCode())
+ }
+ if (this.hashCode() == that.hashCode()) {
return true;
+ }
return false;
}
@@ -443,7 +447,7 @@ public class MRStatusObject
/**
* This returns the task information as json
- *
+ *
* @return
*/
public JSONObject getJson()
@@ -453,7 +457,7 @@ public class MRStatusObject
/**
* This stores the task information as json
- *
+ *
* @param json
*/
public void setJson(JSONObject json)
@@ -463,7 +467,7 @@ public class MRStatusObject
/**
* This returns if the json object has been modified
- *
+ *
* @return
*/
public boolean isModified()
@@ -473,7 +477,7 @@ public class MRStatusObject
/**
* This sets if the json object is modified
- *
+ *
* @param modified
*/
public void setModified(boolean modified)
@@ -483,7 +487,7 @@ public class MRStatusObject
/**
* This returns the string format of the json object
- *
+ *
* @return
*/
public String getJsonString()
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java
index cb10347..0d7f6af 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java
@@ -20,15 +20,16 @@ package com.datatorrent.demos.mrmonitor;
import java.io.IOException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.DefaultHttpClient;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* <p>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java
index e37454f..5075163 100644
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java
+++ b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java
@@ -33,7 +33,8 @@ import com.datatorrent.api.Operator;
public class MapToMRObjectOperator implements Operator
{
- public final transient DefaultInputPort<Map<String, String>> input = new DefaultInputPort<Map<String, String>>() {
+ public final transient DefaultInputPort<Map<String, String>> input = new DefaultInputPort<Map<String, String>>()
+ {
@Override
public void process(Map<String, String> tuple)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java b/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java
index 70cf840..ad8de02 100644
--- a/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java
+++ b/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java
@@ -28,9 +28,8 @@ import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
-
import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
/**
* <p>MapReduceDebuggerApplicationTest class.</p>
@@ -53,7 +52,7 @@ public class MrMonitoringApplicationTest
contextHandler.addServlet(sh, "/pubsub");
contextHandler.addServlet(sh, "/*");
server.start();
- Connector connector[] = server.getConnectors();
+ Connector[] connector = server.getConnectors();
conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort());
MRMonitoringApplication application = new MRMonitoringApplication();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java
index 8ecf76e..5dbd83f 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java
@@ -18,11 +18,11 @@
*/
package com.datatorrent.demos.mroperator;
-import java.text.SimpleDateFormat;
-import java.util.Date;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import org.apache.hadoop.io.WritableComparable;
@@ -33,44 +33,48 @@ import org.apache.hadoop.io.WritableComparable;
*/
public class DateWritable implements WritableComparable<DateWritable>
{
- private final static SimpleDateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd' T 'HH:mm:ss.SSS" );
- private Date date;
+ private static final SimpleDateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd' T 'HH:mm:ss.SSS" );
+ private Date date;
+
+ public Date getDate()
+ {
+ return date;
+ }
+
+ public void setDate( Date date )
+ {
+ this.date = date;
+ }
- public Date getDate()
- {
- return date;
- }
+ public void readFields( DataInput in ) throws IOException
+ {
+ date = new Date( in.readLong() );
+ }
- public void setDate( Date date )
- {
- this.date = date;
- }
+ public void write( DataOutput out ) throws IOException
+ {
+ out.writeLong( date.getTime() );
+ }
- public void readFields( DataInput in ) throws IOException
- {
- date = new Date( in.readLong() );
- }
+ @Override
+ public boolean equals(Object o)
+ {
+ return toString().equals(o.toString());
+ }
- public void write( DataOutput out ) throws IOException
- {
- out.writeLong( date.getTime() );
- }
+ @Override
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
- @Override
- public boolean equals(Object o){
- return toString().equals(o.toString());
- }
- @Override
- public int hashCode(){
- return toString().hashCode();
- }
- public String toString()
- {
- return formatter.format( date);
- }
+ public String toString()
+ {
+ return formatter.format( date);
+ }
- public int compareTo( DateWritable other )
- {
- return date.compareTo( other.getDate() );
- }
+ public int compareTo( DateWritable other )
+ {
+ return date.compareTo( other.getDate() );
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java
index b6b9735..c4b9c49 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java
@@ -36,6 +36,6 @@ public class HdfsKeyValOutputOperator<K, V> extends AbstractSingleFileOutputOper
@Override
public byte[] getBytesForTuple(KeyHashValPair<K,V> t)
{
- return (t.toString()+"\n").getBytes();
+ return (t.toString() + "\n").getBytes();
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java
index dae07a2..076b8ac 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java
@@ -29,7 +29,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
*
* @since 0.9.0
*/
-@ApplicationAnnotation(name="InvertedIndexDemo")
+@ApplicationAnnotation(name = "InvertedIndexDemo")
public class InvertedIndexApplication extends MapReduceApplication<LongWritable, Text, Text, Text>
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java
index aabea81..e963954 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java
@@ -41,18 +41,18 @@ import org.apache.hadoop.mapred.Reporter;
*
* @since 0.9.0
*/
-public class LineIndexer {
+public class LineIndexer
+{
public static class LineIndexMapper extends MapReduceBase
- implements Mapper<LongWritable, Text, Text, Text> {
-
- private final static Text word = new Text();
- private final static Text location = new Text();
+ implements Mapper<LongWritable, Text, Text, Text>
+ {
+ private static final Text word = new Text();
+ private static final Text location = new Text();
public void map(LongWritable key, Text val,
- OutputCollector<Text, Text> output, Reporter reporter)
- throws IOException {
-
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException
+ {
FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
String fileName = fileSplit.getPath().getName();
location.set(fileName);
@@ -69,18 +69,18 @@ public class LineIndexer {
public static class LineIndexReducer extends MapReduceBase
- implements Reducer<Text, Text, Text, Text> {
-
+ implements Reducer<Text, Text, Text, Text>
+ {
public void reduce(Text key, Iterator<Text> values,
- OutputCollector<Text, Text> output, Reporter reporter)
- throws IOException {
-
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException
+ {
boolean first = true;
StringBuilder toReturn = new StringBuilder();
- while (values.hasNext()){
- if (!first)
+ while (values.hasNext()) {
+ if (!first) {
toReturn.append(", ");
- first=false;
+ }
+ first = false;
toReturn.append(values.next().toString());
}
@@ -93,7 +93,8 @@ public class LineIndexer {
* The actual main() method for our program; this is the
* "driver" for the MapReduce job.
*/
- public static void main(String[] args) {
+ public static void main(String[] args)
+ {
JobClient client = new JobClient();
JobConf conf = new JobConf(LineIndexer.class);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java
index 793ad4d..69ee892 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java
@@ -18,201 +18,170 @@
*/
package com.datatorrent.demos.mroperator;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Iterator;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Iterator;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
/**
* <p>LogCountsPerHour class.</p>
*
* @since 0.9.0
*/
-public class LogCountsPerHour extends Configured implements Tool {
+public class LogCountsPerHour extends Configured implements Tool
+{
- public static class LogMapClass extends MapReduceBase
- implements Mapper<LongWritable, Text, DateWritable, IntWritable>
- {
- private DateWritable date = new DateWritable();
- private final static IntWritable one = new IntWritable( 1 );
-
- public void map( LongWritable key, // Offset into the file
- Text value,
- OutputCollector<DateWritable, IntWritable> output,
- Reporter reporter) throws IOException
- {
- // Get the value as a String; it is of the format:
- // 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
- String text = value.toString();
-
- // Get the date and time
- int openBracket = text.indexOf( '[' );
- int closeBracket = text.indexOf( ']' );
- if( openBracket != -1 && closeBracket != -1 )
- {
- // Read the date
- String dateString = text.substring( text.indexOf( '[' ) + 1, text.indexOf( ']' ) );
-
- // Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500
- int index = 0;
- int nextIndex = dateString.indexOf( '/' );
- int day = Integer.parseInt( dateString.substring(index, nextIndex) );
-
- index = nextIndex;
- nextIndex = dateString.indexOf( '/', index+1 );
- String month = dateString.substring( index+1, nextIndex );
-
- index = nextIndex;
- nextIndex = dateString.indexOf( ':', index );
- int year = Integer.parseInt(dateString.substring(index + 1, nextIndex));
-
- index = nextIndex;
- nextIndex = dateString.indexOf( ':', index+1 );
- int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex));
-
- // Build a calendar object for this date
- Calendar calendar = Calendar.getInstance();
- calendar.set( Calendar.DATE, day );
- calendar.set( Calendar.YEAR, year );
- calendar.set( Calendar.HOUR, hour );
- calendar.set( Calendar.MINUTE, 0 );
- calendar.set( Calendar.SECOND, 0 );
- calendar.set( Calendar.MILLISECOND, 0 );
-
- if( month.equalsIgnoreCase( "dec" ) )
- {
- calendar.set( Calendar.MONTH, Calendar.DECEMBER );
- }
- else if( month.equalsIgnoreCase( "nov" ) )
- {
- calendar.set( Calendar.MONTH, Calendar.NOVEMBER );
- }
- else if( month.equalsIgnoreCase( "oct" ) )
- {
- calendar.set( Calendar.MONTH, Calendar.OCTOBER );
- }
- else if( month.equalsIgnoreCase( "sep" ) )
- {
- calendar.set( Calendar.MONTH, Calendar.SEPTEMBER );
- }
- else if( month.equalsIgnoreCase( "aug" ) )
- {
- calendar.set( Calendar.MONTH, Calendar.AUGUST );
- }
- else if( month.equalsIgnoreCase( "jul" ) )
- {
- calendar.set( Calendar.MONTH, Calendar.JULY );
- }
- else if( month.equalsIgnoreCase( "jun" ) )
- {
- calendar.set( Calendar.MONTH, Calendar.JUNE );
- }
- else if( month.equalsIgnoreCase( "may" ) )
- {
- calendar.set( Calendar.MONTH, Calendar.MAY );
- }
- else if( month.equalsIgnoreCase( "apr" ) )
- {
- calendar.set( Calendar.MONTH, Calendar.APRIL );
- }
- else if( month.equalsIgnoreCase( "mar" ) )
- {
- calendar.set( Calendar.MONTH, Calendar.MARCH );
- }
- else if( month.equalsIgnoreCase( "feb" ) )
- {
- calendar.set( Calendar.MONTH, Calendar.FEBRUARY );
- }
- else if( month.equalsIgnoreCase( "jan" ) )
- {
- calendar.set( Calendar.MONTH, Calendar.JANUARY );
- }
-
-
- // Output the date as the key and 1 as the value
- date.setDate( calendar.getTime() );
- output.collect(date, one);
- }
- }
- }
+ public static class LogMapClass extends MapReduceBase
+ implements Mapper<LongWritable, Text, DateWritable, IntWritable>
+ {
+ private DateWritable date = new DateWritable();
+ private static final IntWritable one = new IntWritable(1);
- public static class LogReduce extends MapReduceBase
- implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable>
+ public void map(LongWritable key, Text value, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException
{
- public void reduce( DateWritable key, Iterator<IntWritable> values,
- OutputCollector<DateWritable, IntWritable> output,
- Reporter reporter) throws IOException
- {
- // Iterate over all of the values (counts of occurrences of this word)
- int count = 0;
- while( values.hasNext() )
- {
- // Add the value to our count
- count += values.next().get();
- }
-
- // Output the word with its count (wrapped in an IntWritable)
- output.collect( key, new IntWritable( count ) );
+ // Get the value as a String; it is of the format:
+ // 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
+ String text = value.toString();
+
+ // Get the date and time
+ int openBracket = text.indexOf('[');
+ int closeBracket = text.indexOf(']');
+ if (openBracket != -1 && closeBracket != -1) {
+ // Read the date
+ String dateString = text.substring(text.indexOf('[') + 1, text.indexOf(']'));
+
+ // Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500
+ int index = 0;
+ int nextIndex = dateString.indexOf('/');
+ int day = Integer.parseInt(dateString.substring(index, nextIndex));
+
+ index = nextIndex;
+ nextIndex = dateString.indexOf('/', index + 1);
+ String month = dateString.substring(index + 1, nextIndex);
+
+ index = nextIndex;
+ nextIndex = dateString.indexOf(':', index);
+ int year = Integer.parseInt(dateString.substring(index + 1, nextIndex));
+
+ index = nextIndex;
+ nextIndex = dateString.indexOf(':', index + 1);
+ int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex));
+
+ // Build a calendar object for this date
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(Calendar.DATE, day);
+ calendar.set(Calendar.YEAR, year);
+ calendar.set(Calendar.HOUR, hour);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+
+ if (month.equalsIgnoreCase("dec")) {
+ calendar.set(Calendar.MONTH, Calendar.DECEMBER);
+ } else if (month.equalsIgnoreCase("nov")) {
+ calendar.set(Calendar.MONTH, Calendar.NOVEMBER);
+ } else if (month.equalsIgnoreCase("oct")) {
+ calendar.set(Calendar.MONTH, Calendar.OCTOBER);
+ } else if (month.equalsIgnoreCase("sep")) {
+ calendar.set(Calendar.MONTH, Calendar.SEPTEMBER);
+ } else if (month.equalsIgnoreCase("aug")) {
+ calendar.set(Calendar.MONTH, Calendar.AUGUST);
+ } else if (month.equalsIgnoreCase("jul")) {
+ calendar.set(Calendar.MONTH, Calendar.JULY);
+ } else if (month.equalsIgnoreCase("jun")) {
+ calendar.set(Calendar.MONTH, Calendar.JUNE);
+ } else if (month.equalsIgnoreCase("may")) {
+ calendar.set(Calendar.MONTH, Calendar.MAY);
+ } else if (month.equalsIgnoreCase("apr")) {
+ calendar.set(Calendar.MONTH, Calendar.APRIL);
+ } else if (month.equalsIgnoreCase("mar")) {
+ calendar.set(Calendar.MONTH, Calendar.MARCH);
+ } else if (month.equalsIgnoreCase("feb")) {
+ calendar.set(Calendar.MONTH, Calendar.FEBRUARY);
+ } else if (month.equalsIgnoreCase("jan")) {
+ calendar.set(Calendar.MONTH, Calendar.JANUARY);
}
- }
- public int run(String[] args) throws Exception
- {
- // Create a configuration
- Configuration conf = getConf();
-
- // Create a job from the default configuration that will use the WordCount class
- JobConf job = new JobConf( conf, LogCountsPerHour.class );
-
- // Define our input path as the first command line argument and our output path as the second
- Path in = new Path( args[0] );
- Path out = new Path( args[1] );
-
- // Create File Input/Output formats for these paths (in the job)
- FileInputFormat.setInputPaths( job, in );
- FileOutputFormat.setOutputPath( job, out );
-
- // Configure the job: name, mapper, reducer, and combiner
- job.setJobName( "LogAveragePerHour" );
- job.setMapperClass( LogMapClass.class );
- job.setReducerClass( LogReduce.class );
- job.setCombinerClass( LogReduce.class );
-
- // Configure the output
- job.setOutputFormat( TextOutputFormat.class );
- job.setOutputKeyClass( DateWritable.class );
- job.setOutputValueClass( IntWritable.class );
-
- // Run the job
- JobClient.runJob(job);
- return 0;
+ // Output the date as the key and 1 as the value
+ date.setDate(calendar.getTime());
+ output.collect(date, one);
+ }
}
+ }
- public static void main(String[] args) throws Exception
+ public static class LogReduce extends MapReduceBase
+ implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable>
+ {
+ public void reduce(DateWritable key, Iterator<IntWritable> values, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException
{
- // Start the LogCountsPerHour MapReduce application
- int res = ToolRunner.run( new Configuration(),
- new LogCountsPerHour(),
- args );
- System.exit( res );
+ // Iterate over all of the values (counts of occurrences of this word)
+ int count = 0;
+ while (values.hasNext()) {
+ // Add the value to our count
+ count += values.next().get();
+ }
+
+ // Output the word with its count (wrapped in an IntWritable)
+ output.collect(key, new IntWritable(count));
}
+ }
+
+
+ public int run(String[] args) throws Exception
+ {
+ // Create a configuration
+ Configuration conf = getConf();
+
+ // Create a job from the default configuration that will use the WordCount class
+ JobConf job = new JobConf(conf, LogCountsPerHour.class);
+
+ // Define our input path as the first command line argument and our output path as the second
+ Path in = new Path(args[0]);
+ Path out = new Path(args[1]);
+
+ // Create File Input/Output formats for these paths (in the job)
+ FileInputFormat.setInputPaths(job, in);
+ FileOutputFormat.setOutputPath(job, out);
+
+ // Configure the job: name, mapper, reducer, and combiner
+ job.setJobName("LogAveragePerHour");
+ job.setMapperClass(LogMapClass.class);
+ job.setReducerClass(LogReduce.class);
+ job.setCombinerClass(LogReduce.class);
+
+ // Configure the output
+ job.setOutputFormat(TextOutputFormat.class);
+ job.setOutputKeyClass(DateWritable.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ // Run the job
+ JobClient.runJob(job);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ // Start the LogCountsPerHour MapReduce application
+ int res = ToolRunner.run(new Configuration(), new LogCountsPerHour(), args);
+ System.exit(res);
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java
index cbe5566..2d647ed 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java
@@ -30,7 +30,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
*
* @since 0.9.0
*/
-@ApplicationAnnotation(name="LogsCountDemo")
+@ApplicationAnnotation(name = "LogsCountDemo")
public class LogsCountApplication extends MapReduceApplication<LongWritable, Text, DateWritable, IntWritable>
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
index b8023f5..509f6ae 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
@@ -22,18 +22,35 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import javax.validation.constraints.Min;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultOutputPort;
@@ -123,8 +140,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<
if (reader == null) {
try {
reader = inputFormat.getRecordReader(inputSplit, new JobConf(new Configuration()), reporter);
- }
- catch (IOException e) {
+ } catch (IOException e) {
logger.info("error getting record reader {}", e.getMessage());
}
}
@@ -150,11 +166,10 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<
SerializationFactory serializationFactory = new SerializationFactory(conf);
Deserializer keyDesiralizer = serializationFactory.getDeserializer(inputSplitClass);
keyDesiralizer.open(new ByteArrayInputStream(outstream.toByteArray()));
- inputSplit = (InputSplit) keyDesiralizer.deserialize(null);
- ((ReporterImpl) reporter).setInputSplit(inputSplit);
+ inputSplit = (InputSplit)keyDesiralizer.deserialize(null);
+ ((ReporterImpl)reporter).setInputSplit(inputSplit);
reader = inputFormat.getRecordReader(inputSplit, new JobConf(conf), reporter);
- }
- catch (Exception e) {
+ } catch (Exception e) {
logger.info("failed to initialize inputformat obj {}", inputFormat);
throw new RuntimeException(e);
}
@@ -172,8 +187,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<
if (mapClass != null) {
try {
mapObject = mapClass.newInstance();
- }
- catch (Exception e) {
+ } catch (Exception e) {
logger.info("can't instantiate object {}", e.getMessage());
}
@@ -182,8 +196,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<
if (combineClass != null) {
try {
combineObject = combineClass.newInstance();
- }
- catch (Exception e) {
+ } catch (Exception e) {
logger.info("can't instantiate object {}", e.getMessage());
}
combineObject.configure(jobConf);
@@ -202,15 +215,14 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<
KeyHashValPair<K1, V1> keyValue = new KeyHashValPair<K1, V1>(key, val);
mapObject.map(keyValue.getKey(), keyValue.getValue(), outputCollector, reporter);
if (combineObject == null) {
- List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>) outputCollector).getList();
+ List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList();
for (KeyHashValPair<K2, V2> e : list) {
output.emit(e);
}
list.clear();
}
}
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
logger.debug(ex.toString());
throw new RuntimeException(ex);
}
@@ -220,7 +232,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<
@Override
public void endWindow()
{
- List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>) outputCollector).getList();
+ List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList();
if (combineObject != null) {
Map<K2, List<V2>> cacheObject = new HashMap<K2, List<V2>>();
for (KeyHashValPair<K2, V2> tuple : list) {
@@ -229,8 +241,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<
cacheList = new ArrayList<V2>();
cacheList.add(tuple.getValue());
cacheObject.put(tuple.getKey(), cacheList);
- }
- else {
+ } else {
cacheList.add(tuple.getValue());
}
}
@@ -239,12 +250,11 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<
for (Map.Entry<K2, List<V2>> e : cacheObject.entrySet()) {
try {
combineObject.reduce(e.getKey(), e.getValue().iterator(), tempOutputCollector, reporter);
- }
- catch (IOException e1) {
+ } catch (IOException e1) {
logger.info(e1.getMessage());
}
}
- list = ((OutputCollectorImpl<K2, V2>) tempOutputCollector).getList();
+ list = ((OutputCollectorImpl<K2, V2>)tempOutputCollector).getList();
for (KeyHashValPair<K2, V2> e : list) {
output.emit(e);
}
@@ -261,14 +271,13 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<
{
FileInputFormat.setInputPaths(conf, new Path(path));
if (inputFormat == null) {
- inputFormat = inputFormatClass.newInstance();
- String inputFormatClassName = inputFormatClass.getName();
- if (inputFormatClassName.equals("org.apache.hadoop.mapred.TextInputFormat")) {
- ((TextInputFormat) inputFormat).configure(conf);
- }
- else if (inputFormatClassName.equals("org.apache.hadoop.mapred.KeyValueTextInputFormat")) {
- ((KeyValueTextInputFormat) inputFormat).configure(conf);
- }
+ inputFormat = inputFormatClass.newInstance();
+ String inputFormatClassName = inputFormatClass.getName();
+ if (inputFormatClassName.equals("org.apache.hadoop.mapred.TextInputFormat")) {
+ ((TextInputFormat)inputFormat).configure(conf);
+ } else if (inputFormatClassName.equals("org.apache.hadoop.mapred.KeyValueTextInputFormat")) {
+ ((KeyValueTextInputFormat)inputFormat).configure(conf);
+ }
}
return inputFormat.getSplits(conf, numSplits);
// return null;
@@ -296,8 +305,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<
InputSplit[] splits;
try {
splits = getSplits(new JobConf(conf), tempPartitionCount, template.getPartitionedInstance().getDirName());
- }
- catch (Exception e1) {
+ } catch (Exception e1) {
logger.info(" can't get splits {}", e1.getMessage());
throw new RuntimeException(e1);
}
@@ -316,8 +324,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<
keySerializer.open(opr.getOutstream());
keySerializer.serialize(splits[size - 1]);
opr.setInputSplitClass(splits[size - 1].getClass());
- }
- catch (IOException e) {
+ } catch (IOException e) {
logger.info("error while serializing {}", e.getMessage());
}
size--;
@@ -333,8 +340,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<
keySerializer.open(opr.getOutstream());
keySerializer.serialize(splits[size - 1]);
opr.setInputSplitClass(splits[size - 1].getClass());
- }
- catch (IOException e) {
+ } catch (IOException e) {
logger.info("error while serializing {}", e.getMessage());
}
size--;
@@ -342,8 +348,7 @@ public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<
}
try {
keySerializer.close();
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
return operList;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java
index 45f9005..b0ea7d8 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java
@@ -30,15 +30,15 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
*
* @since 0.9.0
*/
-@ApplicationAnnotation(name="WordCountDemo")
-public class NewWordCountApplication extends MapReduceApplication<LongWritable, Text, Text, IntWritable> {
-
- public void NewWordCountApplication() {
- setMapClass(WordCount.Map.class);
- setReduceClass(WordCount.Reduce.class);
- setCombineClass(WordCount.Reduce.class);
- setInputFormat(TextInputFormat.class);
-
- }
+@ApplicationAnnotation(name = "WordCountDemo")
+public class NewWordCountApplication extends MapReduceApplication<LongWritable, Text, Text, IntWritable>
+{
+ public void NewWordCountApplication()
+ {
+ setMapClass(WordCount.Map.class);
+ setReduceClass(WordCount.Reduce.class);
+ setCombineClass(WordCount.Reduce.class);
+ setInputFormat(TextInputFormat.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java
index b380553..6c81724 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java
@@ -24,14 +24,14 @@ import java.io.PipedOutputStream;
import java.util.ArrayList;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.OutputCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.datatorrent.lib.util.KeyHashValPair;
/**
@@ -40,50 +40,55 @@ import com.datatorrent.lib.util.KeyHashValPair;
* @since 0.9.0
*/
@SuppressWarnings("unchecked")
-public class OutputCollectorImpl<K extends Object, V extends Object> implements OutputCollector<K, V> {
- private static final Logger logger = LoggerFactory.getLogger(OutputCollectorImpl.class);
+public class OutputCollectorImpl<K extends Object, V extends Object> implements OutputCollector<K, V>
+{
+ private static final Logger logger = LoggerFactory.getLogger(OutputCollectorImpl.class);
- private List<KeyHashValPair<K, V>> list = new ArrayList<KeyHashValPair<K, V>>();
+ private List<KeyHashValPair<K, V>> list = new ArrayList<KeyHashValPair<K, V>>();
- public List<KeyHashValPair<K, V>> getList() {
- return list;
- }
+ public List<KeyHashValPair<K, V>> getList()
+ {
+ return list;
+ }
- private transient SerializationFactory serializationFactory;
- private transient Configuration conf = null;
+ private transient SerializationFactory serializationFactory;
+ private transient Configuration conf = null;
- public OutputCollectorImpl() {
- conf = new Configuration();
- serializationFactory = new SerializationFactory(conf);
+ public OutputCollectorImpl()
+ {
+ conf = new Configuration();
+ serializationFactory = new SerializationFactory(conf);
- }
+ }
- private <T> T cloneObj(T t) throws IOException {
- Serializer<T> keySerializer;
- Class<T> keyClass;
- PipedInputStream pis = new PipedInputStream();
- PipedOutputStream pos = new PipedOutputStream(pis);
- keyClass = (Class<T>) t.getClass();
- keySerializer = serializationFactory.getSerializer(keyClass);
- keySerializer.open(pos);
- keySerializer.serialize(t);
- Deserializer<T> keyDesiralizer = serializationFactory.getDeserializer(keyClass);
- keyDesiralizer.open(pis);
- T clonedArg0 = keyDesiralizer.deserialize(null);
- pos.close();
- pis.close();
- keySerializer.close();
- keyDesiralizer.close();
- return clonedArg0;
+ private <T> T cloneObj(T t) throws IOException
+ {
+ Serializer<T> keySerializer;
+ Class<T> keyClass;
+ PipedInputStream pis = new PipedInputStream();
+ PipedOutputStream pos = new PipedOutputStream(pis);
+ keyClass = (Class<T>)t.getClass();
+ keySerializer = serializationFactory.getSerializer(keyClass);
+ keySerializer.open(pos);
+ keySerializer.serialize(t);
+ Deserializer<T> keyDesiralizer = serializationFactory.getDeserializer(keyClass);
+ keyDesiralizer.open(pis);
+ T clonedArg0 = keyDesiralizer.deserialize(null);
+ pos.close();
+ pis.close();
+ keySerializer.close();
+ keyDesiralizer.close();
+ return clonedArg0;
- }
+ }
- @Override
- public void collect(K arg0, V arg1) throws IOException {
- if (conf == null) {
- conf = new Configuration();
- serializationFactory = new SerializationFactory(conf);
- }
- list.add(new KeyHashValPair<K, V>(cloneObj(arg0), cloneObj(arg1)));
- }
+ @Override
+ public void collect(K arg0, V arg1) throws IOException
+ {
+ if (conf == null) {
+ conf = new Configuration();
+ serializationFactory = new SerializationFactory(conf);
+ }
+ list.add(new KeyHashValPair<K, V>(cloneObj(arg0), cloneObj(arg1)));
+ }
}