You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/26 21:10:20 UTC
[2/6] apex-malhar git commit: Fixed checkstyle errors for demos.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java
index 32c7ccb..5df9b0d 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java
@@ -25,14 +25,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
@@ -47,129 +48,142 @@ import com.datatorrent.lib.util.KeyHashValPair;
* @since 0.9.0
*/
@SuppressWarnings({ "deprecation", "unused" })
-public class ReduceOperator<K1, V1, K2, V2> implements Operator {
- private static final Logger logger = LoggerFactory.getLogger(ReduceOperator.class);
-
- private Class<? extends Reducer<K1, V1, K2, V2>> reduceClass;
- private transient Reducer<K1, V1, K2, V2> reduceObj;
- private transient Reporter reporter;
- private OutputCollector<K2, V2> outputCollector;
- private String configFile;
-
- public Class<? extends Reducer<K1, V1, K2, V2>> getReduceClass() {
- return reduceClass;
- }
-
- public void setReduceClass(Class<? extends Reducer<K1, V1, K2, V2>> reduceClass) {
- this.reduceClass = reduceClass;
- }
-
- public String getConfigFile() {
- return configFile;
- }
-
- public void setConfigFile(String configFile) {
- this.configFile = configFile;
- }
-
- private int numberOfMappersRunning = -1;
- private int operatorId;
-
- public final transient DefaultInputPort<KeyHashValPair<Integer, Integer>> inputCount = new DefaultInputPort<KeyHashValPair<Integer, Integer>>() {
- @Override
- public void process(KeyHashValPair<Integer, Integer> tuple) {
- logger.info("processing {}", tuple);
- if (numberOfMappersRunning == -1)
- numberOfMappersRunning = tuple.getValue();
- else
- numberOfMappersRunning += tuple.getValue();
-
- }
-
- };
-
- public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>();
- private Map<K1, List<V1>> cacheObject;
- public final transient DefaultInputPort<KeyHashValPair<K1, V1>> input = new DefaultInputPort<KeyHashValPair<K1, V1>>() {
-
- @Override
- public void process(KeyHashValPair<K1, V1> tuple) {
- // logger.info("processing tupple {}",tuple);
- List<V1> list = cacheObject.get(tuple.getKey());
- if (list == null) {
- list = new ArrayList<V1>();
- list.add(tuple.getValue());
- cacheObject.put(tuple.getKey(), list);
- } else {
- list.add(tuple.getValue());
- }
- }
-
- };
-
- @Override
- public void setup(OperatorContext context) {
- reporter = new ReporterImpl(ReporterType.Reducer, new Counters());
- if(context != null){
- operatorId = context.getId();
- }
- cacheObject = new HashMap<K1, List<V1>>();
- outputCollector = new OutputCollectorImpl<K2, V2>();
- if (reduceClass != null) {
- try {
- reduceObj = reduceClass.newInstance();
- } catch (Exception e) {
- logger.info("can't instantiate object {}", e.getMessage());
- throw new RuntimeException(e);
- }
- Configuration conf = new Configuration();
- InputStream stream = null;
- if (configFile != null && configFile.length() > 0) {
- logger.info("system /{}", configFile);
- stream = ClassLoader.getSystemResourceAsStream("/" + configFile);
- if (stream == null) {
- logger.info("system {}", configFile);
- stream = ClassLoader.getSystemResourceAsStream(configFile);
- }
- }
- if (stream != null) {
- logger.info("found our stream... so adding it");
- conf.addResource(stream);
- }
- reduceObj.configure(new JobConf(conf));
- }
-
- }
-
- @Override
- public void teardown() {
-
- }
-
- @Override
- public void beginWindow(long windowId) {
-
- }
-
- @Override
- public void endWindow() {
- if (numberOfMappersRunning == 0) {
- for (Map.Entry<K1, List<V1>> e : cacheObject.entrySet()) {
- try {
- reduceObj.reduce(e.getKey(), e.getValue().iterator(), outputCollector, reporter);
- } catch (IOException e1) {
- logger.info(e1.getMessage());
- throw new RuntimeException(e1);
- }
- }
- List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>) outputCollector).getList();
- for (KeyHashValPair<K2, V2> e : list) {
- output.emit(e);
- }
- list.clear();
- cacheObject.clear();
- numberOfMappersRunning = -1;
- }
- }
+public class ReduceOperator<K1, V1, K2, V2> implements Operator
+{
+ private static final Logger logger = LoggerFactory.getLogger(ReduceOperator.class);
+
+ private Class<? extends Reducer<K1, V1, K2, V2>> reduceClass;
+ private transient Reducer<K1, V1, K2, V2> reduceObj;
+ private transient Reporter reporter;
+ private OutputCollector<K2, V2> outputCollector;
+ private String configFile;
+
+ public Class<? extends Reducer<K1, V1, K2, V2>> getReduceClass()
+ {
+ return reduceClass;
+ }
+
+ public void setReduceClass(Class<? extends Reducer<K1, V1, K2, V2>> reduceClass)
+ {
+ this.reduceClass = reduceClass;
+ }
+
+ public String getConfigFile()
+ {
+ return configFile;
+ }
+
+ public void setConfigFile(String configFile)
+ {
+ this.configFile = configFile;
+ }
+
+ private int numberOfMappersRunning = -1;
+ private int operatorId;
+
+ public final transient DefaultInputPort<KeyHashValPair<Integer, Integer>> inputCount = new DefaultInputPort<KeyHashValPair<Integer, Integer>>()
+ {
+ @Override
+ public void process(KeyHashValPair<Integer, Integer> tuple)
+ {
+ logger.info("processing {}", tuple);
+ if (numberOfMappersRunning == -1) {
+ numberOfMappersRunning = tuple.getValue();
+ } else {
+ numberOfMappersRunning += tuple.getValue();
+ }
+
+ }
+
+ };
+
+ public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>();
+ private Map<K1, List<V1>> cacheObject;
+ public final transient DefaultInputPort<KeyHashValPair<K1, V1>> input = new DefaultInputPort<KeyHashValPair<K1, V1>>()
+ {
+ @Override
+ public void process(KeyHashValPair<K1, V1> tuple)
+ {
+ // logger.info("processing tupple {}",tuple);
+ List<V1> list = cacheObject.get(tuple.getKey());
+ if (list == null) {
+ list = new ArrayList<V1>();
+ list.add(tuple.getValue());
+ cacheObject.put(tuple.getKey(), list);
+ } else {
+ list.add(tuple.getValue());
+ }
+ }
+
+ };
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ reporter = new ReporterImpl(ReporterType.Reducer, new Counters());
+ if (context != null) {
+ operatorId = context.getId();
+ }
+ cacheObject = new HashMap<K1, List<V1>>();
+ outputCollector = new OutputCollectorImpl<K2, V2>();
+ if (reduceClass != null) {
+ try {
+ reduceObj = reduceClass.newInstance();
+ } catch (Exception e) {
+ logger.info("can't instantiate object {}", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ Configuration conf = new Configuration();
+ InputStream stream = null;
+ if (configFile != null && configFile.length() > 0) {
+ logger.info("system /{}", configFile);
+ stream = ClassLoader.getSystemResourceAsStream("/" + configFile);
+ if (stream == null) {
+ logger.info("system {}", configFile);
+ stream = ClassLoader.getSystemResourceAsStream(configFile);
+ }
+ }
+ if (stream != null) {
+ logger.info("found our stream... so adding it");
+ conf.addResource(stream);
+ }
+ reduceObj.configure(new JobConf(conf));
+ }
+
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (numberOfMappersRunning == 0) {
+ for (Map.Entry<K1, List<V1>> e : cacheObject.entrySet()) {
+ try {
+ reduceObj.reduce(e.getKey(), e.getValue().iterator(), outputCollector, reporter);
+ } catch (IOException e1) {
+ logger.info(e1.getMessage());
+ throw new RuntimeException(e1);
+ }
+ }
+ List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList();
+ for (KeyHashValPair<K2, V2> e : list) {
+ output.emit(e);
+ }
+ list.clear();
+ cacheObject.clear();
+ numberOfMappersRunning = -1;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java
index 1eb3bdd..d2d38da 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java
@@ -18,8 +18,8 @@
*/
package com.datatorrent.demos.mroperator;
-import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Reporter;
@@ -28,81 +28,92 @@ import org.apache.hadoop.mapred.Reporter;
*
* @since 0.9.0
*/
-public class ReporterImpl implements Reporter {
-
- private Counters counters;
- InputSplit inputSplit;
-
- public enum ReporterType {
- Mapper, Reducer
- }
-
- private ReporterType typ;
-
- public ReporterImpl(final ReporterType kind, final Counters ctrs) {
- this.typ = kind;
- this.counters = ctrs;
- }
-
- @Override
- public InputSplit getInputSplit() {
- if (typ == ReporterType.Reducer) {
- throw new UnsupportedOperationException("Reducer cannot call getInputSplit()");
- } else {
- return inputSplit;
- }
- }
-
- public void setInputSplit(InputSplit inputSplit) {
- this.inputSplit = inputSplit;
- }
-
- @Override
- public void incrCounter(Enum<?> key, long amount) {
- if (null != counters) {
- counters.incrCounter(key, amount);
- }
- }
-
- @Override
- public void incrCounter(String group, String counter, long amount) {
- if (null != counters) {
- counters.incrCounter(group, counter, amount);
- }
- }
-
- @Override
- public void setStatus(String status) {
- // do nothing.
- }
-
- @Override
- public void progress() {
- // do nothing.
- }
-
- @Override
- public Counter getCounter(String group, String name) {
- Counters.Counter counter = null;
- if (counters != null) {
- counter = counters.findCounter(group, name);
- }
-
- return counter;
- }
-
- @Override
- public Counter getCounter(Enum<?> key) {
- Counters.Counter counter = null;
- if (counters != null) {
- counter = counters.findCounter(key);
- }
-
- return counter;
- }
-
- public float getProgress() {
- return 0;
- }
+public class ReporterImpl implements Reporter
+{
+ private Counters counters;
+ InputSplit inputSplit;
+
+ public enum ReporterType
+ {
+ Mapper, Reducer
+ }
+
+ private ReporterType typ;
+
+ public ReporterImpl(final ReporterType kind, final Counters ctrs)
+ {
+ this.typ = kind;
+ this.counters = ctrs;
+ }
+
+ @Override
+ public InputSplit getInputSplit()
+ {
+ if (typ == ReporterType.Reducer) {
+ throw new UnsupportedOperationException("Reducer cannot call getInputSplit()");
+ } else {
+ return inputSplit;
+ }
+ }
+
+ public void setInputSplit(InputSplit inputSplit)
+ {
+ this.inputSplit = inputSplit;
+ }
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount)
+ {
+ if (null != counters) {
+ counters.incrCounter(key, amount);
+ }
+ }
+
+ @Override
+ public void incrCounter(String group, String counter, long amount)
+ {
+ if (null != counters) {
+ counters.incrCounter(group, counter, amount);
+ }
+ }
+
+ @Override
+ public void setStatus(String status)
+ {
+ // do nothing.
+ }
+
+ @Override
+ public void progress()
+ {
+ // do nothing.
+ }
+
+ @Override
+ public Counter getCounter(String group, String name)
+ {
+ Counters.Counter counter = null;
+ if (counters != null) {
+ counter = counters.findCounter(group, name);
+ }
+
+ return counter;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> key)
+ {
+ Counters.Counter counter = null;
+ if (counters != null) {
+ counter = counters.findCounter(key);
+ }
+
+ return counter;
+ }
+
+ public float getProgress()
+ {
+ return 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java
index d5cbdb0..f78cf99 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java
@@ -19,13 +19,24 @@
package com.datatorrent.demos.mroperator;
import java.io.IOException;
-import java.util.*;
+import java.util.Iterator;
+import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
/**
* <p>WordCount class.</p>
@@ -38,7 +49,7 @@ public class WordCount
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
{
- private final static IntWritable one = new IntWritable(1);
+ private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java
index e8c71c3..0f330e8 100644
--- a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java
+++ b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java
@@ -23,6 +23,15 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -35,13 +44,6 @@ import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -118,8 +120,7 @@ public class MapOperatorTest
testDir = baseDir + "/" + methodName;
try {
FileUtils.forceMkdir(new File(testDir));
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
createFile(testDir + "/" + file1, "1\n2\n3\n1\n2\n3\n");
@@ -131,16 +132,13 @@ public class MapOperatorTest
try {
output = new BufferedWriter(new FileWriter(new File(fileName)));
output.write(data);
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
- }
- finally {
+ } finally {
if (output != null) {
try {
output.close();
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
LOG.error("not able to close the output stream: ", ex);
}
}
@@ -152,8 +150,7 @@ public class MapOperatorTest
{
try {
FileUtils.deleteDirectory(new File(baseDir));
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java
index 9ad5637..b85f8ad 100644
--- a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java
+++ b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java
@@ -18,55 +18,57 @@
*/
package com.datatorrent.demos.mroperator;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.KeyHashValPair;
-public class ReduceOperatorTest {
-
- private static Logger logger = LoggerFactory.getLogger(ReduceOperatorTest.class);
-
- /**
- * Test node logic emits correct results
- */
- @Test
- public void testNodeProcessing() throws Exception {
- testNodeProcessingSchema(new ReduceOperator<Text, IntWritable,Text, IntWritable>());
- }
+public class ReduceOperatorTest
+{
+ private static Logger logger = LoggerFactory.getLogger(ReduceOperatorTest.class);
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public void testNodeProcessingSchema(ReduceOperator<Text, IntWritable,Text, IntWritable> oper) {
+ /**
+ * Test node logic emits correct results
+ */
+ @Test
+ public void testNodeProcessing() throws Exception
+ {
+ testNodeProcessingSchema(new ReduceOperator<Text, IntWritable,Text, IntWritable>());
+ }
- oper.setReduceClass(WordCount.Reduce.class);
- oper.setConfigFile(null);
- oper.setup(null);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void testNodeProcessingSchema(ReduceOperator<Text, IntWritable,Text, IntWritable> oper)
+ {
+ oper.setReduceClass(WordCount.Reduce.class);
+ oper.setConfigFile(null);
+ oper.setup(null);
- CollectorTestSink sortSink = new CollectorTestSink();
+ CollectorTestSink sortSink = new CollectorTestSink();
oper.output.setSink(sortSink);
- oper.beginWindow(0);
- oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, 1));
- oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
- oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
- oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
- oper.endWindow();
+ oper.beginWindow(0);
+ oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, 1));
+ oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
+ oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
+ oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
+ oper.endWindow();
- oper.beginWindow(1);
- oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
- oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
- oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
- oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, -1));
- oper.endWindow();
- Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size());
- for (Object o : sortSink.collectedTuples) {
+ oper.beginWindow(1);
+ oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1)));
+ oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
+ oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1)));
+ oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, -1));
+ oper.endWindow();
+ Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size());
+ for (Object o : sortSink.collectedTuples) {
logger.debug(o.toString());
}
logger.debug("Done testing round\n");
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java
index cb1521a..bd732c1 100644
--- a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java
+++ b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java
@@ -23,14 +23,15 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
import com.google.common.collect.Maps;
import com.datatorrent.api.LocalMode;
@@ -58,7 +59,7 @@ public class WordCountMRApplicationTest
List<String> readLines = FileUtils.readLines(new File(testMeta.testDir + "/output.txt"));
Map<String,Integer> readMap = Maps.newHashMap();
Iterator<String> itr = readLines.iterator();
- while(itr.hasNext()){
+ while (itr.hasNext()) {
String[] splits = itr.next().split("=");
readMap.put(splits[0],Integer.valueOf(splits[1]));
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
index 8f4dd92..55ffe92 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
@@ -20,13 +20,12 @@ package com.datatorrent.demos.pi;
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
-
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
/**
* Monte Carlo PI estimation demo : <br>
@@ -75,7 +74,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
*
* @since 0.3.2
*/
-@ApplicationAnnotation(name="PiDemo")
+@ApplicationAnnotation(name = "PiDemo")
public class Application implements StreamingApplication
{
private final Locality locality = null;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
index 57c5249..328bb10 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
@@ -84,7 +84,7 @@ import com.datatorrent.lib.testbench.RandomEventGenerator;
*
* @since 0.3.2
*/
-@ApplicationAnnotation(name="PiDemoAppData")
+@ApplicationAnnotation(name = "PiDemoAppData")
public class ApplicationAppData implements StreamingApplication
{
public static final String SNAPSHOT_SCHEMA = "PiDemoDataSchema.json";
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java
index 3ed376f..0796608 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java
@@ -18,16 +18,15 @@
*/
package com.datatorrent.demos.pi;
-
import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.script.JavaScriptOperator;
import com.datatorrent.lib.stream.RoundRobinHashMap;
import com.datatorrent.lib.testbench.RandomEventGenerator;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
/**
* Monte Carlo PI estimation demo : <br>
@@ -78,7 +77,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
*
* @since 0.3.2
*/
-@ApplicationAnnotation(name="PiJavaScriptDemo")
+@ApplicationAnnotation(name = "PiJavaScriptDemo")
public class ApplicationWithScript implements StreamingApplication
{
@@ -92,13 +91,13 @@ public class ApplicationWithScript implements StreamingApplication
rand.setMaxvalue(maxValue);
RoundRobinHashMap<String,Object> rrhm = dag.addOperator("rrhm", new RoundRobinHashMap<String, Object>());
- rrhm.setKeys(new String[] { "x", "y" });
+ rrhm.setKeys(new String[]{"x", "y"});
JavaScriptOperator calc = dag.addOperator("picalc", new JavaScriptOperator());
calc.setPassThru(false);
calc.put("i",0);
calc.put("count",0);
- calc.addSetupScript("function pi() { if (x*x+y*y <= "+maxValue*maxValue+") { i++; } count++; return i / count * 4; }");
+ calc.addSetupScript("function pi() { if (x*x+y*y <= " + maxValue * maxValue + ") { i++; } count++; return i / count * 4; }");
calc.setInvoke("pi");
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java
index 9363b88..221ecc0 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java
@@ -40,7 +40,7 @@ import com.datatorrent.lib.testbench.RandomEventGenerator;
*
* @since 0.3.2
*/
-@ApplicationAnnotation(name="PiLibraryDemo")
+@ApplicationAnnotation(name = "PiLibraryDemo")
public class Calculator implements StreamingApplication
{
@Override
@@ -56,7 +56,7 @@ public class Calculator implements StreamingApplication
AbstractAggregator<Integer> pairOperator = dag.addOperator("PairXY", new ArrayListAggregator<Integer>());
Sigma<Integer> sumOperator = dag.addOperator("SumXY", new Sigma<Integer>());
LogicalCompareToConstant<Integer> comparator = dag.addOperator("AnalyzeLocation", new LogicalCompareToConstant<Integer>());
- comparator.setConstant(30000 *30000);
+ comparator.setConstant(30000 * 30000);
Counter inCircle = dag.addOperator("CountInCircle", Counter.class);
Counter inSquare = dag.addOperator("CountInSquare", Counter.class);
Division division = dag.addOperator("Ratio", Division.class);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java
index ce5ef9d..c50e17e 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java
@@ -25,10 +25,10 @@ import java.util.Map;
import javax.validation.constraints.NotNull;
-import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
/**
* <p>An operator which converts a raw value to a named value singleton list.</p>
@@ -47,9 +47,11 @@ public class NamedValueList<T> extends BaseOperator
private List<Map<String, T>> valueList;
private Map<String, T> valueMap;
- public final transient DefaultInputPort<T> inPort = new DefaultInputPort<T>() {
+ public final transient DefaultInputPort<T> inPort = new DefaultInputPort<T>()
+ {
@Override
- public void process(T val) {
+ public void process(T val)
+ {
valueMap.put(valueName, val);
outPort.emit(valueList);
}
@@ -80,11 +82,13 @@ public class NamedValueList<T> extends BaseOperator
{
}
- public String getValueName() {
+ public String getValueName()
+ {
return valueName;
}
- public void setValueName(String name) {
+ public void setValueName(String name)
+ {
valueName = name;
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java
index 14edf19..8e61991 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java
@@ -21,10 +21,10 @@ package com.datatorrent.demos.pi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
/**
* This operator implements Monte Carlo estimation of pi. For points randomly distributed points on
@@ -46,8 +46,7 @@ public class PiCalculateOperator extends BaseOperator
{
if (x == -1) {
x = tuple;
- }
- else {
+ } else {
y = tuple;
if (x * x + y * y <= base) {
inArea++;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java b/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java
index b61077a..d8881c2 100644
--- a/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java
+++ b/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java
@@ -18,13 +18,11 @@
*/
package com.datatorrent.demos.pi;
-
-import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.LocalMode;
-
/**
*
*/
@@ -33,12 +31,12 @@ public class ApplicationTest
@Test
public void testSomeMethod() throws Exception
{
- LocalMode lma = LocalMode.newInstance();
- Configuration conf =new Configuration(false);
- conf.addResource("dt-site-pi.xml");
- lma.prepareDAG(new Application(), conf);
- LocalMode.Controller lc = lma.getController();
- lc.run(10000);
-
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource("dt-site-pi.xml");
+ lma.prepareDAG(new Application(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(10000);
+
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java b/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java
index cd52873..8e12fcc 100644
--- a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java
+++ b/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java
@@ -18,9 +18,8 @@
*/
package com.datatorrent.demos.pi;
-
-import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.LocalMode;
@@ -33,7 +32,7 @@ public class CalculatorTest
public void testSomeMethod() throws Exception
{
LocalMode lma = LocalMode.newInstance();
- Configuration conf =new Configuration(false);
+ Configuration conf = new Configuration(false);
conf.addResource("dt-site-pilibrary.xml");
lma.prepareDAG(new Calculator(), conf);
LocalMode.Controller lc = lma.getController();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java
----------------------------------------------------------------------
diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java
index 8558554..cf49848 100755
--- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java
+++ b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java
@@ -25,11 +25,10 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.contrib.r.RScript;
-
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.contrib.r.RScript;
/**
* @since 2.1.0
@@ -52,7 +51,8 @@ public class FaithfulRScript extends RScript
}
@InputPortFieldAnnotation(optional = true)
- public final transient DefaultInputPort<FaithfulKey> faithfulInput = new DefaultInputPort<FaithfulKey>() {
+ public final transient DefaultInputPort<FaithfulKey> faithfulInput = new DefaultInputPort<FaithfulKey>()
+ {
@Override
public void process(FaithfulKey tuple)
{
@@ -65,7 +65,8 @@ public class FaithfulRScript extends RScript
};
@InputPortFieldAnnotation(optional = true)
- public final transient DefaultInputPort<Integer> inputElapsedTime = new DefaultInputPort<Integer>() {
+ public final transient DefaultInputPort<Integer> inputElapsedTime = new DefaultInputPort<Integer>()
+ {
@Override
public void process(Integer eT)
{
@@ -82,9 +83,9 @@ public class FaithfulRScript extends RScript
@Override
public void endWindow()
{
-
- if (readingsList.size() == 0)
+ if (readingsList.size() == 0) {
return;
+ }
LOG.info("Input data size: readingsList - " + readingsList.size());
double[] eruptionDuration = new double[readingsList.size()];
@@ -106,6 +107,5 @@ public class FaithfulRScript extends RScript
super.process(map);
readingsList.clear();
map.clear();
-
- };
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java
----------------------------------------------------------------------
diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java
index 86abba7..c45cd50 100755
--- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java
+++ b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java
@@ -82,8 +82,9 @@ public class InputGenerator implements InputOperator
{
int id;
do {
- id = (int) Math.abs(Math.round(random.nextGaussian() * max));
- } while (id >= max);
+ id = (int)Math.abs(Math.round(random.nextGaussian() * max));
+ }
+ while (id >= max);
if (id < min) {
id = min;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java
----------------------------------------------------------------------
diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java
index 400e80c..0483767 100755
--- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java
+++ b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java
@@ -23,11 +23,10 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
/**
* The application attempts to simulate 'Old Faithful Geyser" eruption.
@@ -38,7 +37,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
* waiting times and eruption duration values.
* For every application window, it generates only one 'elapsed time' input for which the
* prediction would be made.
- * Model in R is in file ruptionModel.R located at
+ * Model in R is in file ruptionModel.R located at
* demos/r/src/main/resources/com/datatorrent/demos/oldfaithful/ directory
*
* @since 2.1.0
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java b/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java
index dc6a8cb..0bb1901 100755
--- a/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java
+++ b/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java
@@ -29,7 +29,7 @@ import com.datatorrent.api.LocalMode;
public class OldFaithfulApplicationTest
{
-
+
private static final Logger LOG = LoggerFactory.getLogger(OldFaithfulApplicationTest.class);
@Test
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java
index fd2a430..b9d32ab 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java
@@ -18,6 +18,9 @@
*/
package com.datatorrent.demos.twitter;
+import java.net.URI;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.Operator.InputPort;
@@ -31,10 +34,7 @@ import com.datatorrent.contrib.twitter.TwitterSampleInput;
import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import java.net.URI;
/**
* Twitter Demo Application: <br>
* This demo application samples random public status from twitter, send to Hashtag
@@ -167,7 +167,7 @@ import java.net.URI;
*
* @since 2.0.0
*/
-@ApplicationAnnotation(name="TwitterKinesisDemo")
+@ApplicationAnnotation(name = "TwitterKinesisDemo")
public class KinesisHashtagsApplication implements StreamingApplication
{
private final Locality locality = null;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java
index 9bd81a4..8b9f447 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java
@@ -32,7 +32,7 @@ public class SlidingContainer<T> implements Serializable
T identifier;
int totalCount;
int position;
- int windowedCount[];
+ int[] windowedCount;
@SuppressWarnings("unused")
private SlidingContainer()
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java
index f61f5be..9edce64 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java
@@ -24,8 +24,6 @@ import java.sql.SQLException;
import javax.annotation.Nonnull;
import org.apache.hadoop.conf.Configuration;
-import twitter4j.Status;
-
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
@@ -34,6 +32,8 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.twitter.TwitterSampleInput;
import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
+import twitter4j.Status;
+
/**
* An application which connects to Twitter Sample Input and stores all the
* tweets with their usernames in a mysql database. Please review the docs
@@ -63,7 +63,7 @@ import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
*
* @since 0.9.4
*/
-@ApplicationAnnotation(name="TwitterDumpDemo")
+@ApplicationAnnotation(name = "TwitterDumpDemo")
public class TwitterDumpApplication implements StreamingApplication
{
public static class Status2Database extends AbstractJdbcTransactionableOutputOperator<Status>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java
index ecc412f..3adbbe0 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java
@@ -23,14 +23,14 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
-import com.datatorrent.contrib.hbase.AbstractHBasePutOutputOperator;
-import com.datatorrent.contrib.twitter.TwitterSampleInput;
-
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.hbase.AbstractHBasePutOutputOperator;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+
import twitter4j.Status;
/**
@@ -47,7 +47,7 @@ import twitter4j.Status;
*
* @since 1.0.2
*/
-@ApplicationAnnotation(name="TwitterDumpHBaseDemo")
+@ApplicationAnnotation(name = "TwitterDumpHBaseDemo")
public class TwitterDumpHBaseApplication implements StreamingApplication
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java
index 5ed6774..d22db40 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java
@@ -18,12 +18,12 @@
*/
package com.datatorrent.demos.twitter;
-import twitter4j.HashtagEntity;
-import twitter4j.Status;
-
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import twitter4j.HashtagEntity;
+import twitter4j.Status;
/**
* <p>TwitterStatusHashtagExtractor class.</p>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java
index ed4e207..6dbc436 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java
@@ -18,12 +18,13 @@
*/
package com.datatorrent.demos.twitter;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
import twitter4j.Status;
import twitter4j.URLEntity;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java
index 1818dca..e05a37a 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java
@@ -18,14 +18,14 @@
*/
package com.datatorrent.demos.twitter;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
-
import java.util.Arrays;
import java.util.HashSet;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
/**
* <p>TwitterStatusWordExtractor class.</p>
*
@@ -41,7 +41,7 @@ public class TwitterStatusWordExtractor extends BaseOperator
@Override
public void process(String text)
{
- String strs[] = text.split(" ");
+ String[] strs = text.split(" ");
if (strs != null) {
for (String str : strs) {
if (str != null && !filterList.contains(str) ) {
@@ -56,7 +56,7 @@ public class TwitterStatusWordExtractor extends BaseOperator
public void setup(OperatorContext context)
{
this.filterList = new HashSet<String>(Arrays.asList(new String[]{"", " ","I","you","the","a","to","as","he","him","his","her","she","me","can","for","of","and","or","but",
- "this","that","!",",",".",":","#","/","@","be","in","out","was","were","is","am","are","so","no","...","my","de","RT","on","que","la","i","your","it","have","with","?","when",
- "up","just","do","at","&","-","+","*","\\","y","n","like","se","en","te","el","I'm"}));
+ "this","that","!",",",".",":","#","/","@","be","in","out","was","were","is","am","are","so","no","...","my","de","RT","on","que","la","i","your","it","have","with","?","when",
+ "up","just","do","at","&","-","+","*","\\","y","n","like","se","en","te","el","I'm"}));
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
index c8d3b00..731a38f 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
@@ -19,10 +19,14 @@
package com.datatorrent.demos.twitter;
import java.net.URI;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import com.google.common.collect.Maps;
+
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -35,15 +39,10 @@ import com.datatorrent.contrib.twitter.TwitterSampleInput;
import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.appdata.schemas.SchemaUtils;
import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
-import com.google.common.collect.Maps;
-
-import java.util.List;
-import java.util.Map;
/**
* Twitter Demo Application: <br>
* This demo application samples random public status from twitter, send to url
@@ -147,7 +146,7 @@ import java.util.Map;
*
* @since 0.3.2
*/
-@ApplicationAnnotation(name=TwitterTopCounterApplication.APP_NAME)
+@ApplicationAnnotation(name = TwitterTopCounterApplication.APP_NAME)
public class TwitterTopCounterApplication implements StreamingApplication
{
public static final String SNAPSHOT_SCHEMA = "twitterURLDataSchema.json";
@@ -188,11 +187,7 @@ public class TwitterTopCounterApplication implements StreamingApplication
consoleOutput(dag, "topURLs", topCounts.output, SNAPSHOT_SCHEMA, "url");
}
- public static void consoleOutput(DAG dag,
- String operatorName,
- OutputPort<List<Map<String, Object>>> topCount,
- String schemaFile,
- String alias)
+ public static void consoleOutput(DAG dag, String operatorName, OutputPort<List<Map<String, Object>>> topCount, String schemaFile, String alias)
{
String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
if (!StringUtils.isEmpty(gatewayAddress)) {
@@ -217,8 +212,7 @@ public class TwitterTopCounterApplication implements StreamingApplication
dag.addStream("MapProvider", topCount, snapshotServer.input);
dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
- }
- else {
+ } else {
ConsoleOutputOperator operator = dag.addOperator(operatorName, new ConsoleOutputOperator());
operator.setStringFormat(operatorName + ": %s");
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java
index 8ed3678..3953ab7 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java
@@ -18,6 +18,7 @@
*/
package com.datatorrent.demos.twitter;
+import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -25,9 +26,6 @@ import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.twitter.TwitterSampleInput;
import com.datatorrent.lib.algo.UniqueCounter;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-
/**
* This application is same as other twitter demo
@@ -43,7 +41,7 @@ import org.apache.hadoop.conf.Configuration;
*
* @since 0.3.2
*/
-@ApplicationAnnotation(name=TwitterTopWordsApplication.APP_NAME)
+@ApplicationAnnotation(name = TwitterTopWordsApplication.APP_NAME)
public class TwitterTopWordsApplication implements StreamingApplication
{
public static final String SNAPSHOT_SCHEMA = "twitterWordDataSchema.json";
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java
index 5246060..3597a92 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java
@@ -18,16 +18,13 @@
*/
package com.datatorrent.demos.twitter;
+import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.twitter.TwitterSampleInput;
import com.datatorrent.lib.algo.UniqueCounter;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-
-
/**
* Twitter Demo Application: <br>
@@ -134,7 +131,7 @@ import org.apache.hadoop.conf.Configuration;
*
* @since 1.0.2
*/
-@ApplicationAnnotation(name=TwitterTrendingHashtagsApplication.APP_NAME)
+@ApplicationAnnotation(name = TwitterTrendingHashtagsApplication.APP_NAME)
public class TwitterTrendingHashtagsApplication implements StreamingApplication
{
public static final String SNAPSHOT_SCHEMA = "twitterHashTagDataSchema.json";
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java
index 7f6f399..43ed8f7 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java
@@ -18,11 +18,11 @@
*/
package com.datatorrent.demos.twitter;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.netlet.util.Slice;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
/**
* <p>URLSerDe class.</p>
@@ -42,11 +42,9 @@ public class URLSerDe implements StreamCodec<byte[]>
{
if (fragment == null || fragment.buffer == null) {
return null;
- }
- else if (fragment.offset == 0 && fragment.length == fragment.buffer.length) {
+ } else if (fragment.offset == 0 && fragment.length == fragment.buffer.length) {
return fragment.buffer;
- }
- else {
+ } else {
byte[] buffer = new byte[fragment.buffer.length];
System.arraycopy(fragment.buffer, fragment.offset, buffer, 0, fragment.length);
return buffer;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
index 449c903..20bb673 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
@@ -18,23 +18,25 @@
*/
package com.datatorrent.demos.twitter;
-import java.util.*;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.api.*;
-import com.datatorrent.api.Context.OperatorContext;
-
-import com.datatorrent.common.util.BaseOperator;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
/**
*
* WindowedTopCounter is an operator which counts the most often occurring tuples in a sliding window of a specific size.
@@ -114,8 +116,7 @@ public class WindowedTopCounter<T> extends BaseOperator
if (holder.totalCount == 0) {
iterator.remove();
- }
- else {
+ } else {
topCounter.add(holder);
if (--i == 0) {
break;
@@ -138,8 +139,7 @@ public class WindowedTopCounter<T> extends BaseOperator
topCounter.poll();
topCounter.add(holder);
smallest = topCounter.peek().totalCount;
- }
- else if (holder.totalCount == 0) {
+ } else if (holder.totalCount == 0) {
iterator.remove();
}
}
@@ -149,7 +149,7 @@ public class WindowedTopCounter<T> extends BaseOperator
Iterator<SlidingContainer<T>> topIter = topCounter.iterator();
- while(topIter.hasNext()) {
+ while (topIter.hasNext()) {
final SlidingContainer<T> wh = topIter.next();
Map<String, Object> tableRow = Maps.newHashMap();
@@ -254,8 +254,7 @@ public class WindowedTopCounter<T> extends BaseOperator
{
if (o1.totalCount > o2.totalCount) {
return 1;
- }
- else if (o1.totalCount < o2.totalCount) {
+ } else if (o1.totalCount < o2.totalCount) {
return -1;
}
@@ -274,8 +273,8 @@ public class WindowedTopCounter<T> extends BaseOperator
@Override
public int compare(Map<String, Object> o1, Map<String, Object> o2)
{
- Integer count1 = (Integer) o1.get(FIELD_COUNT);
- Integer count2 = (Integer) o2.get(FIELD_COUNT);
+ Integer count1 = (Integer)o1.get(FIELD_COUNT);
+ Integer count2 = (Integer)o2.get(FIELD_COUNT);
return count1.compareTo(count2);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java
index a4daf09..cd211ff 100644
--- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java
+++ b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java
@@ -19,10 +19,11 @@
package com.datatorrent.demos.twitter;
import org.junit.Test;
-import static org.junit.Assert.*;
import org.apache.hadoop.conf.Configuration;
+import static org.junit.Assert.assertEquals;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java
index 0ad4d18..91a4e20 100644
--- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java
+++ b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java
@@ -18,11 +18,10 @@
*/
package com.datatorrent.demos.twitter;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.LocalMode;
import com.datatorrent.contrib.twitter.TwitterSampleInput;
-import com.datatorrent.demos.twitter.TwitterTopCounterApplication;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
/**
* Test the DAG declaration in local mode.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java
index a27c60f..4ac2e8d 100644
--- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java
+++ b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java
@@ -18,12 +18,11 @@
*/
package com.datatorrent.demos.twitter;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.LocalMode;
import com.datatorrent.contrib.twitter.TwitterSampleInput;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
/**
* Test the DAG declaration in local mode.
*/
@@ -38,9 +37,9 @@ public class TwitterTopWordsTest
@Test
public void testApplication() throws Exception
{
- TwitterTopWordsApplication app = new TwitterTopWordsApplication();
- Configuration conf =new Configuration(false);
- conf.addResource("dt-site-rollingtopwords.xml");
+ TwitterTopWordsApplication app = new TwitterTopWordsApplication();
+ Configuration conf = new Configuration(false);
+ conf.addResource("dt-site-rollingtopwords.xml");
LocalMode lma = LocalMode.newInstance();
lma.prepareDAG(app, conf);
LocalMode.Controller lc = lma.getController();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
index 1b27cea..57ef1a1 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
@@ -18,7 +18,6 @@
*/
package com.datatorrent.demos.uniquecount;
-
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context;
@@ -27,6 +26,7 @@ import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
import com.datatorrent.lib.io.ConsoleOutputOperator;
@@ -34,8 +34,6 @@ import com.datatorrent.lib.stream.Counter;
import com.datatorrent.lib.stream.StreamDuplicater;
import com.datatorrent.lib.util.KeyHashValPair;
-import com.datatorrent.common.partitioner.StatelessPartitioner;
-
/**
* Application to demonstrate PartitionableUniqueCount operator. <br>
* The input operator generate random keys, which is sent to
@@ -45,7 +43,7 @@ import com.datatorrent.common.partitioner.StatelessPartitioner;
*
* @since 1.0.2
*/
-@ApplicationAnnotation(name="UniqueValueCountDemo")
+@ApplicationAnnotation(name = "UniqueValueCountDemo")
public class Application implements StreamingApplication
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java
index 3a5140d..d201037 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java
@@ -18,6 +18,9 @@
*/
package com.datatorrent.demos.uniquecount;
+import java.util.HashMap;
+import java.util.Map;
+
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
@@ -25,9 +28,6 @@ import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.util.KeyHashValPair;
-import java.util.HashMap;
-import java.util.Map;
-
/*
Compare results and print non-matching values to console.
*/
@@ -41,35 +41,33 @@ public class CountVerifier<K> implements Operator
HashMap<K, Integer> map1 = new HashMap<K, Integer>();
HashMap<K, Integer> map2 = new HashMap<K, Integer>();
- public transient final DefaultInputPort<KeyHashValPair<K, Integer>> in1 =
- new DefaultInputPort<KeyHashValPair<K, Integer>>()
- {
- @Override
- public void process(KeyHashValPair<K, Integer> tuple)
- {
- processTuple(tuple, map1);
- }
- };
+ public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in1 = new DefaultInputPort<KeyHashValPair<K, Integer>>()
+ {
+ @Override
+ public void process(KeyHashValPair<K, Integer> tuple)
+ {
+ processTuple(tuple, map1);
+ }
+ };
- public transient final DefaultInputPort<KeyHashValPair<K, Integer>> in2 =
- new DefaultInputPort<KeyHashValPair<K, Integer>>()
- {
- @Override
- public void process(KeyHashValPair<K, Integer> tuple)
- {
- processTuple(tuple, map2);
- }
- };
+ public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in2 = new DefaultInputPort<KeyHashValPair<K, Integer>>()
+ {
+ @Override
+ public void process(KeyHashValPair<K, Integer> tuple)
+ {
+ processTuple(tuple, map2);
+ }
+ };
void processTuple(KeyHashValPair<K, Integer> tuple, HashMap<K, Integer> map)
{
map.put(tuple.getKey(), tuple.getValue());
}
- @OutputPortFieldAnnotation(optional=true)
- public transient final DefaultOutputPort<Integer> successPort = new DefaultOutputPort<Integer>();
- @OutputPortFieldAnnotation(optional=true)
- public transient final DefaultOutputPort<Integer> failurePort = new DefaultOutputPort<Integer>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Integer> successPort = new DefaultOutputPort<Integer>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Integer> failurePort = new DefaultOutputPort<Integer>();
@Override
public void beginWindow(long l)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java
index 2742961..e806759 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java
@@ -18,14 +18,17 @@
*/
package com.datatorrent.demos.uniquecount;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.lib.util.KeyValPair;
-import java.util.HashMap;
-import java.util.Random;
-
/**
* Generate random Key value pairs.
* key is string and value is int, it emits the pair as KeyValPair on outPort,
@@ -36,6 +39,7 @@ public class RandomDataGenerator implements InputOperator
{
public final transient DefaultOutputPort<KeyValPair<String, Object>> outPort = new DefaultOutputPort<KeyValPair<String, Object>>();
private HashMap<String, Integer> dataInfo;
+ private final transient Logger LOG = LoggerFactory.getLogger(RandomDataGenerator.class);
private int count;
private int sleepMs = 10;
private int keyRange = 100;
@@ -51,15 +55,15 @@ public class RandomDataGenerator implements InputOperator
@Override
public void emitTuples()
{
- for(int i = 0 ; i < tupleBlast; i++) {
+ for (int i = 0; i < tupleBlast; i++) {
String key = String.valueOf(random.nextInt(keyRange));
int val = random.nextInt(valueRange);
outPort.emit(new KeyValPair<String, Object>(key, val));
}
try {
Thread.sleep(sleepMs);
- } catch(Exception ex) {
- System.out.println(ex.getMessage());
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage());
}
count++;
}
@@ -93,7 +97,7 @@ public class RandomDataGenerator implements InputOperator
@Override
public void endWindow()
{
- System.out.println("emitTuples called " + count + " times in this window");
+ LOG.debug("emitTuples called " + count + " times in this window");
count = 0;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java
index feeb282..28f3bc0 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java
@@ -18,16 +18,16 @@
*/
package com.datatorrent.demos.uniquecount;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
import java.util.BitSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
/**
* Input port operator for generating random values on keys. <br>
* Key(s) : key + integer in range between 0 and numKeys <br>
@@ -37,107 +37,117 @@ import java.util.Random;
*/
public class RandomKeyValues implements InputOperator
{
- public final transient DefaultOutputPort<KeyValPair<String, Object>> outport = new DefaultOutputPort<KeyValPair<String, Object>>();
- private Random random = new Random(11111);
- private int numKeys;
- private int numValuesPerKeys;
- private int tuppleBlast = 1000;
- private int emitDelay = 20; /* 20 ms */
-
- /* For verification */
- private Map<Integer, BitSet> history = new HashMap<Integer, BitSet>();
-
- public RandomKeyValues() {
- this.numKeys = 100;
- this.numValuesPerKeys = 100;
- }
-
- public RandomKeyValues(int keys, int values) {
- this.numKeys = keys;
- this.numValuesPerKeys = values;
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- }
-
- @Override
- public void endWindow()
- {
- }
-
- @Override
- public void setup(OperatorContext context)
- {
- }
-
- @Override
- public void teardown()
- {
- }
-
- @Override
- public void emitTuples()
- {
- /* generate tuples randomly, */
- for(int i = 0; i < tuppleBlast; i++) {
- int intKey = random.nextInt(numKeys);
- String key = "key" + String.valueOf(intKey);
- int value = random.nextInt(numValuesPerKeys);
-
- // update history for verifying later.
- BitSet bmap = history.get(intKey);
- if (bmap == null) {
- bmap = new BitSet();
- history.put(intKey, bmap);
- }
- bmap.set(value);
-
- // emit the key with value.
- outport.emit(new KeyValPair<String, Object>(key, value));
- }
- try
- {
- Thread.sleep(emitDelay);
- } catch (Exception e)
- {
- }
- }
-
- public int getNumKeys() {
- return numKeys;
- }
-
- public void setNumKeys(int numKeys) {
- this.numKeys = numKeys;
- }
-
- public int getNumValuesPerKeys() {
- return numValuesPerKeys;
- }
-
- public void setNumValuesPerKeys(int numValuesPerKeys) {
- this.numValuesPerKeys = numValuesPerKeys;
- }
-
- public int getTuppleBlast() {
- return tuppleBlast;
+ public final transient DefaultOutputPort<KeyValPair<String, Object>> outport = new DefaultOutputPort<KeyValPair<String, Object>>();
+ private Random random = new Random(11111);
+ private int numKeys;
+ private int numValuesPerKeys;
+ private int tuppleBlast = 1000;
+ private int emitDelay = 20; /* 20 ms */
+
+ /* For verification */
+ private Map<Integer, BitSet> history = new HashMap<Integer, BitSet>();
+
+ public RandomKeyValues()
+ {
+ this.numKeys = 100;
+ this.numValuesPerKeys = 100;
+ }
+
+ public RandomKeyValues(int keys, int values)
+ {
+ this.numKeys = keys;
+ this.numValuesPerKeys = values;
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ /* generate tuples randomly, */
+ for (int i = 0; i < tuppleBlast; i++) {
+ int intKey = random.nextInt(numKeys);
+ String key = "key" + String.valueOf(intKey);
+ int value = random.nextInt(numValuesPerKeys);
+
+ // update history for verifying later.
+ BitSet bmap = history.get(intKey);
+ if (bmap == null) {
+ bmap = new BitSet();
+ history.put(intKey, bmap);
+ }
+ bmap.set(value);
+
+ // emit the key with value.
+ outport.emit(new KeyValPair<String, Object>(key, value));
}
-
- public void setTuppleBlast(int tuppleBlast) {
- this.tuppleBlast = tuppleBlast;
- }
-
- public int getEmitDelay() {
- return emitDelay;
- }
-
- public void setEmitDelay(int emitDelay) {
- this.emitDelay = emitDelay;
- }
-
- public void debug() {
-
+ try {
+ Thread.sleep(emitDelay);
+ } catch (Exception e) {
+ // Ignore.
}
+ }
+
+ public int getNumKeys()
+ {
+ return numKeys;
+ }
+
+ public void setNumKeys(int numKeys)
+ {
+ this.numKeys = numKeys;
+ }
+
+ public int getNumValuesPerKeys()
+ {
+ return numValuesPerKeys;
+ }
+
+ public void setNumValuesPerKeys(int numValuesPerKeys)
+ {
+ this.numValuesPerKeys = numValuesPerKeys;
+ }
+
+ public int getTuppleBlast()
+ {
+ return tuppleBlast;
+ }
+
+ public void setTuppleBlast(int tuppleBlast)
+ {
+ this.tuppleBlast = tuppleBlast;
+ }
+
+ public int getEmitDelay()
+ {
+ return emitDelay;
+ }
+
+ public void setEmitDelay(int emitDelay)
+ {
+ this.emitDelay = emitDelay;
+ }
+
+ public void debug()
+ {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java
index 65b5c95..eb9d22c 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java
@@ -18,14 +18,18 @@
*/
package com.datatorrent.demos.uniquecount;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.util.KeyHashValPair;
-import org.apache.commons.lang3.mutable.MutableInt;
-
-import java.util.*;
/*
Generate random keys.
@@ -61,8 +65,7 @@ public class RandomKeysGenerator implements InputOperator
outPort.emit(key);
- if (verificationPort.isConnected())
- {
+ if (verificationPort.isConnected()) {
// maintain history for later verification.
MutableInt count = history.get(key);
if (count == null) {
@@ -74,10 +77,11 @@ public class RandomKeysGenerator implements InputOperator
}
try {
- if (sleepTime != 0)
+ if (sleepTime != 0) {
Thread.sleep(sleepTime);
+ }
} catch (Exception ex) {
-
+ // Ignore.
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
index 95323d5..eb9e392 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
@@ -18,7 +18,6 @@
*/
package com.datatorrent.demos.uniquecount;
-
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context;
@@ -27,19 +26,19 @@ import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+
import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.util.KeyValPair;
-import com.datatorrent.common.partitioner.StatelessPartitioner;
-
/**
* <p>UniqueKeyValCountDemo class.</p>
*
* @since 1.0.2
*/
-@ApplicationAnnotation(name="UniqueKeyValueCountDemo")
+@ApplicationAnnotation(name = "UniqueKeyValueCountDemo")
public class UniqueKeyValCountDemo implements StreamingApplication
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java
index 991a94d..66a0af1 100644
--- a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java
+++ b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java
@@ -18,9 +18,9 @@
*/
package com.datatorrent.demos.uniquecount;
-import com.datatorrent.api.LocalMode;
-import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
/**
* Test the DAG declaration in local mode.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java
index 01e790a..a198247 100644
--- a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java
+++ b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java
@@ -18,9 +18,9 @@
*/
package com.datatorrent.demos.uniquecount;
-import com.datatorrent.api.LocalMode;
-import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
/**
* Test the DAG declaration in local mode.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java
index 1028080..d0512cf 100644
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java
@@ -18,14 +18,14 @@
*/
package com.datatorrent.demos.wordcount;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.api.StreamingApplication;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.io.ConsoleOutputOperator;
-import org.apache.hadoop.conf.Configuration;
-
/**
* Simple Word Count Demo : <br>
* This is application to count total occurrence of each word from file or any
@@ -72,8 +72,8 @@ import org.apache.hadoop.conf.Configuration;
* Streaming Window Size : 500ms
* Operator Details : <br>
* <ul>
- * <li>
- * <p><b> The operator wordinput : </b> This operator opens local file, reads each line and sends each word to application.
+ * <li>
+ * <p><b> The operator wordinput : </b> This operator opens local file, reads each line and sends each word to application.
* This can replaced by any input stream by user. <br>
* Class : {@link com.datatorrent.demos.wordcount.WordCountInputOperator} <br>
* Operator Application Window Count : 1 <br>
@@ -93,10 +93,10 @@ import org.apache.hadoop.conf.Configuration;
*
* @since 0.3.2
*/
-@ApplicationAnnotation(name="WordCountDemo")
+@ApplicationAnnotation(name = "WordCountDemo")
public class Application implements StreamingApplication
{
- @Override
+ @Override
public void populateDAG(DAG dag, Configuration conf)
{
WordCountInputOperator input = dag.addOperator("wordinput", new WordCountInputOperator());
@@ -104,8 +104,5 @@ public class Application implements StreamingApplication
dag.addStream("wordinput-count", input.outputPort, wordCount.data);
ConsoleOutputOperator consoleOperator = dag.addOperator("console", new ConsoleOutputOperator());
dag.addStream("count-console",wordCount.count, consoleOperator.input);
-
}
-
-
}