You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/01/22 19:24:05 UTC
[1/3] apex-malhar git commit: fixing all checkstyle violations,
delete maxAllowedViolations from pom
Repository: apex-malhar
Updated Branches:
refs/heads/master 623b803f5 -> e22ea0de1
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java
index e2a94ef..29cd079 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java
@@ -18,15 +18,17 @@
*/
package com.datatorrent.benchmark.testbench;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.lib.testbench.EventGenerator;
import java.util.ArrayList;
import java.util.HashMap;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.lib.testbench.EventGenerator;
+
/**
* HashMap Input Operator used as a helper in testbench benchmarking apps.
*
@@ -36,11 +38,15 @@ public class HashMapOperator implements InputOperator
{
private String keys = null;
private static final Logger logger = LoggerFactory.getLogger(EventGenerator.class);
- private String[] keysArray = {"a","b","c","d"};
- public final transient DefaultOutputPort<HashMap<String, Double>> hmap_data = new DefaultOutputPort<HashMap<String, Double>>();
- public final transient DefaultOutputPort<HashMap<String, ArrayList<Integer>>> hmapList_data = new DefaultOutputPort<HashMap<String, ArrayList<Integer>>>();
- public final transient DefaultOutputPort<HashMap<String, HashMap<String, Integer>>> hmapMap_data = new DefaultOutputPort<HashMap<String, HashMap<String, Integer>>>();
- public final transient DefaultOutputPort<HashMap<String, Integer>> hmapInt_data = new DefaultOutputPort<HashMap<String, Integer>>();
+ private String[] keysArray = {"a", "b", "c", "d"};
+ public final transient DefaultOutputPort<HashMap<String, Double>> hmap_data =
+ new DefaultOutputPort<HashMap<String, Double>>();
+ public final transient DefaultOutputPort<HashMap<String, ArrayList<Integer>>> hmapList_data =
+ new DefaultOutputPort<HashMap<String, ArrayList<Integer>>>();
+ public final transient DefaultOutputPort<HashMap<String, HashMap<String, Integer>>> hmapMap_data =
+ new DefaultOutputPort<HashMap<String, HashMap<String, Integer>>>();
+ public final transient DefaultOutputPort<HashMap<String, Integer>> hmapInt_data =
+ new DefaultOutputPort<HashMap<String, Integer>>();
private int numTuples = 1000;
private String seed = "a";
private int numKeys = 2;
@@ -89,7 +95,7 @@ public class HashMapOperator implements InputOperator
for (int j = 0; j < numKeys; j++) {
hmapMapTemp.put(keysArray[j], 100 * j);
}
- for (int j = 0; j < numKeys; j++) {
+ for (int j = 0; j < numKeys; j++) {
hmapMap.put(keysArray[j], hmapMapTemp);
}
hmapMap_data.emit(hmapMap);
@@ -107,7 +113,7 @@ public class HashMapOperator implements InputOperator
}
if (hmapInt_data.isConnected()) {
- for (int i = 0; i < numTuples; i++) {
+ for (int i = 0; i < numTuples; i++) {
HashMap<String, Integer> hmapMapTemp = new HashMap<String, Integer>();
for (int j = 0; j < numKeys; j++) {
hmapMapTemp.put(keysArray[j], 100 * j);
@@ -120,7 +126,8 @@ public class HashMapOperator implements InputOperator
@Override
public void beginWindow(long windowId)
{
- // throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ // throw new UnsupportedOperationException("Not supported yet.");
+ // To change body of generated methods, choose Tools | Templates.
}
@Override
@@ -132,13 +139,15 @@ public class HashMapOperator implements InputOperator
@Override
public void setup(OperatorContext context)
{
- // throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ // throw new UnsupportedOperationException("Not supported yet.");
+ // To change body of generated methods, choose Tools | Templates.
}
@Override
public void teardown()
{
- // throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ // throw new UnsupportedOperationException("Not supported yet.");
+ // To change body of generated methods, choose Tools | Templates.
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java
index da757d9..df5b11e 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java
@@ -18,13 +18,14 @@
*/
package com.datatorrent.benchmark.testbench;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for RandomEventGenerator Operator.
@@ -37,14 +38,15 @@ public class RandomEventGeneratorApp implements StreamingApplication
{
private final Locality locality = null;
public static final int QUEUE_CAPACITY = 16 * 1024;
+
@Override
public void populateDAG(DAG dag, Configuration conf)
{
RandomEventGenerator random = dag.addOperator("random", new RandomEventGenerator());
DevNull<Integer> dev1 = dag.addOperator("dev1", new DevNull());
DevNull<String> dev2 = dag.addOperator("dev2", new DevNull());
- dag.addStream("random1",random.integer_data,dev1.data).setLocality(locality);
- dag.addStream("random2",random.string_data,dev2.data).setLocality(locality);
+ dag.addStream("random1", random.integer_data, dev1.data).setLocality(locality);
+ dag.addStream("random2", random.string_data, dev2.data).setLocality(locality);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java
index db18937..faafcbf 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java
@@ -18,6 +18,10 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.ArrayList;
+import java.util.HashMap;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -26,9 +30,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.testbench.SeedEventGenerator;
import com.datatorrent.lib.util.KeyValPair;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
+
/**
* Benchmark App for SeedEventGenerator Operator.
@@ -55,10 +57,12 @@ public class SeedEventGeneratorApp implements StreamingApplication
DevNull<HashMap<String, String>> devVal = dag.addOperator("devVal", new DevNull<HashMap<String, String>>());
DevNull<HashMap<String, ArrayList<Integer>>> devList = dag.addOperator("devList", new DevNull());
- dag.getMeta(seedEvent).getMeta(seedEvent.string_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ dag.getMeta(seedEvent).getMeta(seedEvent.string_data)
+ .getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
dag.addStream("SeedEventGeneratorString", seedEvent.string_data, devString.data).setLocality(locality);
- dag.getMeta(seedEvent).getMeta(seedEvent.keyvalpair_list).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ dag.getMeta(seedEvent).getMeta(seedEvent.keyvalpair_list).getAttributes()
+ .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
dag.addStream("SeedEventGeneratorKeyVal", seedEvent.keyvalpair_list, devKeyVal.data).setLocality(locality);
dag.getMeta(seedEvent).getMeta(seedEvent.val_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java
index 5b66b54..d6e762e 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java
@@ -18,14 +18,16 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.testbench.ThroughputCounter;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for ThroughputCounter Operator.
@@ -38,14 +40,15 @@ public class ThroughputCounterApp implements StreamingApplication
{
public static final int QUEUE_CAPACITY = 16 * 1024;
private final Locality locality = null;
+
@Override
public void populateDAG(DAG dag, Configuration conf)
{
ThroughputCounter counter = dag.addOperator("counter", new ThroughputCounter());
HashMapOperator oper = dag.addOperator("oper", new HashMapOperator());
- DevNull<HashMap<String,Number>> dev = dag.addOperator("dev", new DevNull());
- dag.addStream("count1",oper.hmapInt_data,counter.data).setLocality(locality);
- dag.addStream("count2",counter.count,dev.data).setLocality(locality);
+ DevNull<HashMap<String, Number>> dev = dag.addOperator("dev", new DevNull());
+ dag.addStream("count1", oper.hmapInt_data, counter.data).setLocality(locality);
+ dag.addStream("count2", counter.count, dev.data).setLocality(locality);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
index 64af9f9..7250e74 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
@@ -50,7 +50,8 @@ import com.datatorrent.benchmark.window.WindowedOperatorBenchmarkApp.WindowedGen
import com.datatorrent.lib.fileaccess.TFileImpl;
import com.datatorrent.lib.stream.DevNull;
-public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O extends AbstractWindowedOperator> implements StreamingApplication
+public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O extends AbstractWindowedOperator>
+ implements StreamingApplication
{
protected static final String PROP_STORE_PATH = "dt.application.WindowedOperatorBenchmark.storeBasePath";
protected static final String DEFAULT_BASE_PATH = "WindowedOperatorBenchmark/Store";
@@ -80,7 +81,8 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
// WatermarkGenerator watermarkGenerator = new WatermarkGenerator();
// dag.addOperator("WatermarkGenerator", watermarkGenerator);
-// dag.addStream("Control", watermarkGenerator.control, windowedOperator.controlInput).setLocality(Locality.CONTAINER_LOCAL);
+// dag.addStream("Control", watermarkGenerator.control, windowedOperator.controlInput)
+// .setLocality(Locality.CONTAINER_LOCAL);
DevNull output = dag.addOperator("output", new DevNull());
dag.addStream("output", windowedOperator.output, output.data).setLocality(Locality.CONTAINER_LOCAL);
@@ -112,7 +114,8 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
windowedOperator.setAllowedLateness(Duration.millis(ALLOWED_LATENESS));
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(1)));
//accumulating mode
- windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.standardSeconds(1)).accumulatingFiredPanes().firingOnlyUpdatedPanes());
+ windowedOperator.setTriggerOption(TriggerOption.AtWatermark()
+ .withEarlyFiringsAtEvery(Duration.standardSeconds(1)).accumulatingFiredPanes().firingOnlyUpdatedPanes());
windowedOperator.setFixedWatermark(30000);
//windowedOperator.setTriggerOption(TriggerOption.AtWatermark());
@@ -153,7 +156,6 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
return basePath;
}
-
public static class TestStatsListener implements StatsListener, Serializable
{
private static final Logger LOG = LoggerFactory.getLogger(TestStatsListener.class);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
index 5a9c955..19df8fd 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
@@ -41,7 +41,8 @@ import com.datatorrent.api.DAG.Locality;
import com.datatorrent.lib.fileaccess.TFileImpl;
import com.datatorrent.lib.util.KeyValPair;
-public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator>
+public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<
+ KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator>
{
public KeyedWindowedOperatorBenchmarkApp()
{
@@ -58,7 +59,8 @@ public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorB
}
@Override
- protected void setUpdatedKeyStorage(MyKeyedWindowedOperator windowedOperator, Configuration conf, SpillableComplexComponentImpl sccImpl)
+ protected void setUpdatedKeyStorage(MyKeyedWindowedOperator windowedOperator,
+ Configuration conf, SpillableComplexComponentImpl sccImpl)
{
windowedOperator.setUpdatedKeyStorage(createUpdatedDataStorage(conf, sccImpl));
}
@@ -107,7 +109,8 @@ public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorB
}
}
- protected static class KeyedWindowedGenerator extends AbstractGenerator<Tuple.TimestampedTuple<KeyValPair<String, Long>>>
+ protected static class KeyedWindowedGenerator extends
+ AbstractGenerator<Tuple.TimestampedTuple<KeyValPair<String, Long>>>
{
@Override
protected TimestampedTuple<KeyValPair<String, Long>> generateNextTuple()
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
index 98275ce..d96b453 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
@@ -35,7 +35,8 @@ import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.annotation.ApplicationAnnotation;
@ApplicationAnnotation(name = "WindowedOperatorBenchmark")
-public class WindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<WindowedOperatorBenchmarkApp.WindowedGenerator, WindowedOperatorBenchmarkApp.MyWindowedOperator>
+public class WindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<
+ WindowedOperatorBenchmarkApp.WindowedGenerator, WindowedOperatorBenchmarkApp.MyWindowedOperator>
{
public WindowedOperatorBenchmarkApp()
{
@@ -49,7 +50,8 @@ public class WindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchm
@Override
protected TimestampedTuple<Long> generateNextTuple()
{
- return new Tuple.TimestampedTuple<Long>(System.currentTimeMillis() - random.nextInt(120000), (long)random.nextInt(100));
+ return new Tuple.TimestampedTuple<Long>(System.currentTimeMillis() - random.nextInt(120000),
+ (long)random.nextInt(100));
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java
index 1d76855..cd8a3ec 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java
@@ -18,16 +18,17 @@
*/
package com.datatorrent.benchmark;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.Context.PortContext;
-
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+
/**
* Test the DAG declaration in local mode.
*/
@@ -40,8 +41,10 @@ public class ApplicationFixedTest
new ApplicationFixed().populateDAG(lma.getDAG(), new Configuration(false));
DAG dag = lma.cloneDAG();
- FixedTuplesInputOperator wordGenerator = (FixedTuplesInputOperator)dag.getOperatorMeta("WordGenerator").getOperator();
- Assert.assertEquals("Queue Capacity", ApplicationFixed.QUEUE_CAPACITY, (int)dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getValue(PortContext.QUEUE_CAPACITY));
+ FixedTuplesInputOperator wordGenerator = (FixedTuplesInputOperator)dag
+ .getOperatorMeta("WordGenerator").getOperator();
+ Assert.assertEquals("Queue Capacity", ApplicationFixed.QUEUE_CAPACITY, (int)dag.getMeta(wordGenerator)
+ .getMeta(wordGenerator.output).getValue(PortContext.QUEUE_CAPACITY));
LocalMode.Controller lc = lma.getController();
lc.run(60000);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java
index 9439243..0a21a7c 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java
@@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.LocalMode;
-import com.datatorrent.benchmark.Benchmark;
/**
* Test the DAG declaration in local mode.
@@ -38,7 +37,7 @@ public class BenchmarkTest
{
for (final Locality l : Locality.values()) {
logger.debug("Running the with {} locality", l);
- LocalMode.runApp(new Benchmark.AbstractApplication ()
+ LocalMode.runApp(new Benchmark.AbstractApplication()
{
@Override
public Locality getLocality()
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java
index 7c9f892..6a1c968 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java
@@ -18,14 +18,17 @@
*/
package com.datatorrent.benchmark;
-import com.datatorrent.api.LocalMode;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+
+import org.junit.Test;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
-import org.junit.Test;
+
+import com.datatorrent.api.LocalMode;
public class CouchBaseBenchmarkTest
{
@@ -52,8 +55,7 @@ public class CouchBaseBenchmarkTest
LocalMode.Controller lc = lm.getController();
//lc.setHeartbeatMonitoringEnabled(false);
lc.run(20000);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
logger.info(ex.getCause());
}
is.close();
@@ -76,8 +78,7 @@ public class CouchBaseBenchmarkTest
lm.prepareDAG(new CouchBaseAppInput(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
logger.info(ex.getCause());
}
is.close();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java
index 5e407a0..e2936fe 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java
@@ -27,6 +27,7 @@ import com.datatorrent.contrib.accumulo.AbstractAccumuloOutputOperator;
import com.datatorrent.contrib.accumulo.AccumuloRowTupleGenerator;
import com.datatorrent.contrib.accumulo.AccumuloTestHelper;
import com.datatorrent.contrib.accumulo.AccumuloTuple;
+
/**
* BenchMark Results
* -----------------
@@ -39,14 +40,16 @@ import com.datatorrent.contrib.accumulo.AccumuloTuple;
*
* @since 1.0.4
*/
-public class AccumuloApp implements StreamingApplication {
+public class AccumuloApp implements StreamingApplication
+{
@Override
- public void populateDAG(DAG dag, Configuration conf) {
+ public void populateDAG(DAG dag, Configuration conf)
+ {
AccumuloTestHelper.getConnector();
AccumuloTestHelper.clearTable();
dag.setAttribute(DAG.APPLICATION_NAME, "AccumuloOutputTest");
- AccumuloRowTupleGenerator rtg = dag.addOperator("tuplegenerator",AccumuloRowTupleGenerator.class);
+ AccumuloRowTupleGenerator rtg = dag.addOperator("tuplegenerator", AccumuloRowTupleGenerator.class);
TestAccumuloOutputOperator taop = dag.addOperator("testaccumulooperator", TestAccumuloOutputOperator.class);
dag.addStream("ss", rtg.outputPort, taop.input);
com.datatorrent.api.Attribute.AttributeMap attributes = dag.getAttributes();
@@ -58,12 +61,14 @@ public class AccumuloApp implements StreamingApplication {
}
- public static class TestAccumuloOutputOperator extends AbstractAccumuloOutputOperator<AccumuloTuple> {
+ public static class TestAccumuloOutputOperator extends AbstractAccumuloOutputOperator<AccumuloTuple>
+ {
@Override
- public Mutation operationMutation(AccumuloTuple t) {
+ public Mutation operationMutation(AccumuloTuple t)
+ {
Mutation mutation = new Mutation(t.getRow().getBytes());
- mutation.put(t.getColFamily().getBytes(),t.getColName().getBytes(),t.getColValue().getBytes());
+ mutation.put(t.getColFamily().getBytes(), t.getColName().getBytes(), t.getColValue().getBytes());
return mutation;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java
index 8f0a1fd..8b47a9b 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java
@@ -22,9 +22,11 @@ import org.junit.Test;
import com.datatorrent.api.LocalMode;
-public class AccumuloAppTest {
+public class AccumuloAppTest
+{
@Test
- public void testSomeMethod() throws Exception {
+ public void testSomeMethod() throws Exception
+ {
LocalMode.runApp(new AccumuloApp(), 30000);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java
index 8f2e19f..14fe441 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java
@@ -21,12 +21,13 @@ package com.datatorrent.benchmark.aerospike;
import org.junit.Test;
import com.datatorrent.api.LocalMode;
-import com.datatorrent.benchmark.aerospike.AerospikeOutputBenchmarkApplication;
-public class AerospikeBenchmarkAppTest {
+public class AerospikeBenchmarkAppTest
+{
@Test
- public void test() throws Exception {
+ public void test() throws Exception
+ {
LocalMode.runApp(new AerospikeOutputBenchmarkApplication(), 10000);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java
index c54cbdf..079d073 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java
@@ -18,10 +18,12 @@
*/
package com.datatorrent.benchmark.algo;
-import com.datatorrent.api.LocalMode;
-import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
/**
* Test the DAG declaration in local mode.
*/
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java
index e85e38c..ec4f308 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java
@@ -19,16 +19,18 @@
package com.datatorrent.benchmark.cassandra;
import org.junit.Test;
+
import com.datatorrent.api.LocalMode;
-import com.datatorrent.benchmark.cassandra.CassandraOutputBenchmarkApplication;
/**
* Test the DAG declaration in local mode.
*/
-public class CassandraApplicatonTest {
+public class CassandraApplicatonTest
+{
@Test
- public void test() throws Exception {
+ public void test() throws Exception
+ {
LocalMode.runApp(new CassandraOutputBenchmarkApplication(), 10000);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java
index 1658ab1..32a4907 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java
@@ -19,16 +19,19 @@
package com.datatorrent.benchmark.hbase;
import org.junit.Test;
+
import com.datatorrent.api.LocalMode;
/**
* Test the DAG declaration in local mode.
*/
-public class HBaseApplicationTest {
+public class HBaseApplicationTest
+{
@Test
- public void test() throws Exception {
- LocalMode.runApp(new HBaseCsvMappingApplication(), 20000);
+ public void test() throws Exception
+ {
+ LocalMode.runApp(new HBaseCsvMappingApplication(), 20000);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java
index 562559f..b61f1d3 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java
@@ -37,7 +37,7 @@ import com.datatorrent.contrib.hbase.HBaseRowStringGenerator;
*
* @since 1.0.4
*/
-@ApplicationAnnotation(name="HBaseBenchmarkApp")
+@ApplicationAnnotation(name = "HBaseBenchmarkApp")
public class HBaseCsvMappingApplication implements StreamingApplication
{
private final Locality locality = null;
@@ -47,15 +47,12 @@ public class HBaseCsvMappingApplication implements StreamingApplication
{
HBaseRowStringGenerator row = dag.addOperator("rand", new HBaseRowStringGenerator());
-
HBaseCsvMappingPutOperator csvMappingPutOperator = dag.addOperator("HBaseoper", new HBaseCsvMappingPutOperator());
csvMappingPutOperator.getStore().setTableName("table1");
csvMappingPutOperator.getStore().setZookeeperQuorum("127.0.0.1");
csvMappingPutOperator.getStore().setZookeeperClientPort(2181);
csvMappingPutOperator.setMappingString("colfam0.street,colfam0.city,colfam0.state,row");
- dag.addStream("hbasestream",row.outputPort, csvMappingPutOperator.input).setLocality(locality);
+ dag.addStream("hbasestream", row.outputPort, csvMappingPutOperator.input).setLocality(locality);
}
-
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java
index fdab095..653c6f6 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java
@@ -18,18 +18,21 @@
*/
package com.datatorrent.benchmark.hive;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.netlet.util.DTThrowable;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.sql.SQLException;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
+
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.netlet.util.DTThrowable;
+
public class HiveInsertBenchmarkTest
{
private static final Logger LOG = LoggerFactory.getLogger(HiveInsertBenchmarkTest.class);
@@ -41,26 +44,30 @@ public class HiveInsertBenchmarkTest
InputStream inputStream = null;
try {
inputStream = new FileInputStream("src/site/conf/dt-site-hive.xml");
- }
- catch (FileNotFoundException ex) {
- LOG.debug("Exception caught",ex);
+ } catch (FileNotFoundException ex) {
+ LOG.debug("Exception caught", ex);
}
conf.addResource(inputStream);
- LOG.debug("conf properties are {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
- LOG.debug("conf dburl is {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
- LOG.debug("conf filepath is {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
- LOG.debug("maximum length is {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.maxLength"));
- LOG.debug("tablename is {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
- LOG.debug("permission is {}",conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.filePermission"));
+ LOG.debug("conf properties are {}",
+ conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+ LOG.debug("conf dburl is {}",
+ conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
+ LOG.debug("conf filepath is {}",
+ conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
+ LOG.debug("maximum length is {}",
+ conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.maxLength"));
+ LOG.debug("tablename is {}",
+ conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
+ LOG.debug("permission is {}",
+ conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.filePermission"));
HiveInsertBenchmarkingApp app = new HiveInsertBenchmarkingApp();
LocalMode lm = LocalMode.newInstance();
try {
lm.prepareDAG(app, conf);
LocalMode.Controller lc = lm.getController();
lc.run(120000);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
DTThrowable.rethrow(ex);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java
index 3acba3e..e0097c6 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java
@@ -18,6 +18,8 @@
*/
package com.datatorrent.benchmark.hive;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.InputStream;
import java.sql.SQLException;
@@ -28,16 +30,13 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
-
import com.datatorrent.api.LocalMode;
-
import com.datatorrent.netlet.util.DTThrowable;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
public class HiveMapBenchmarkTest
{
private static final Logger LOG = LoggerFactory.getLogger(HiveMapBenchmarkTest.class);
+
@Test
public void testMethod() throws SQLException
{
@@ -45,18 +44,24 @@ public class HiveMapBenchmarkTest
InputStream inputStream = null;
try {
inputStream = new FileInputStream("src/site/conf/dt-site-hive.xml");
- }
- catch (FileNotFoundException ex) {
- LOG.debug("Exception caught {}",ex);
+ } catch (FileNotFoundException ex) {
+ LOG.debug("Exception caught {}", ex);
}
conf.addResource(inputStream);
- LOG.debug("conf properties are {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
- LOG.debug("conf dburl is {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
- LOG.debug("conf filepath is {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
- LOG.debug("maximum length is {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.maxLength"));
- LOG.debug("tablename is {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"));
- LOG.debug("permission is {}",conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.filePermission"));
- LOG.debug("delimiter is {}",conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.delimiter"));
+ LOG.debug("conf properties are {}",
+ conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+ LOG.debug("conf dburl is {}",
+ conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
+ LOG.debug("conf filepath is {}",
+ conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
+ LOG.debug("maximum length is {}",
+ conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.maxLength"));
+ LOG.debug("tablename is {}",
+ conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"));
+ LOG.debug("permission is {}",
+ conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.filePermission"));
+ LOG.debug("delimiter is {}",
+ conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.delimiter"));
HiveMapInsertBenchmarkingApp app = new HiveMapInsertBenchmarkingApp();
LocalMode lm = LocalMode.newInstance();
@@ -64,14 +69,11 @@ public class HiveMapBenchmarkTest
lm.prepareDAG(app, conf);
LocalMode.Controller lc = lm.getController();
lc.run(30000);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
DTThrowable.rethrow(ex);
}
IOUtils.closeQuietly(inputStream);
}
-
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java
index e2d2f6a..6cb901a 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java
@@ -43,8 +43,7 @@ public class KafkaInputBenchmarkTest
lma.prepareDAG(new KafkaInputBenchmark(), conf);
LocalMode.Controller lc = lma.getController();
lc.run(30000);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java
index d85372f..4de7193 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.LocalMode;
-
public class KafkaOutputBenchmarkTest
{
@Test
@@ -44,8 +43,7 @@ public class KafkaOutputBenchmarkTest
lma.prepareDAG(new KafkaInputBenchmark(), conf);
LocalMode.Controller lc = lma.getController();
lc.run(30000);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java
index 4c046fb..9201cd5 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java
@@ -18,13 +18,16 @@
*/
package com.datatorrent.benchmark.memsql;
-import com.datatorrent.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.memsql.MemsqlInputOperator;
import com.datatorrent.lib.stream.DevNull;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* BenchMark Results
@@ -39,7 +42,7 @@ import org.slf4j.LoggerFactory;
*
* @since 1.0.5
*/
-@ApplicationAnnotation(name="MemsqlInputBenchmark")
+@ApplicationAnnotation(name = "MemsqlInputBenchmark")
public class MemsqlInputBenchmark implements StreamingApplication
{
private static final Logger LOG = LoggerFactory.getLogger(MemsqlInputBenchmark.class);
@@ -50,13 +53,13 @@ public class MemsqlInputBenchmark implements StreamingApplication
public void populateDAG(DAG dag, Configuration conf)
{
MemsqlInputOperator memsqlInputOperator = dag.addOperator("memsqlInputOperator",
- new MemsqlInputOperator());
+ new MemsqlInputOperator());
DevNull<Object> devNull = dag.addOperator("devnull",
- new DevNull<Object>());
+ new DevNull<Object>());
dag.addStream("memsqlconnector",
- memsqlInputOperator.outputPort,
- devNull.data);
+ memsqlInputOperator.outputPort,
+ devNull.data);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
index 55fec7c..fa98a18 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
@@ -18,26 +18,33 @@
*/
package com.datatorrent.benchmark.memsql;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.Operator.ProcessingMode;
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.contrib.memsql.*;
-import static com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest.BATCH_SIZE;
-import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.*;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import java.util.Random;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
+
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator.ProcessingMode;
+import com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest;
+import com.datatorrent.contrib.memsql.MemsqlPOJOOutputOperator;
+import com.datatorrent.contrib.memsql.MemsqlStore;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.netlet.util.DTThrowable;
+
+import static com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest.BATCH_SIZE;
+import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.APP_ID;
+import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.OPERATOR_ID;
+
public class MemsqlInputBenchmarkTest
{
private static final Logger LOG = LoggerFactory.getLogger(MemsqlInputBenchmarkTest.class);
@@ -52,28 +59,33 @@ public class MemsqlInputBenchmarkTest
MemsqlStore memsqlStore = new MemsqlStore();
memsqlStore.setDatabaseUrl(conf.get("dt.rootDbUrl"));
- memsqlStore.setConnectionProperties(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
+ memsqlStore.setConnectionProperties(
+ conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
AbstractMemsqlOutputOperatorTest.memsqlInitializeDatabase(memsqlStore);
MemsqlPOJOOutputOperator outputOperator = new MemsqlPOJOOutputOperator();
- outputOperator.getStore().setDatabaseUrl(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.dbUrl"));
- outputOperator.getStore().setConnectionProperties(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
+ outputOperator.getStore().setDatabaseUrl(
+ conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.dbUrl"));
+ outputOperator.getStore().setConnectionProperties(
+ conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
outputOperator.setBatchSize(BATCH_SIZE);
Random random = new Random();
- com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
+ new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE);
attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L);
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+ OperatorContextTestHelper.TestIdOperatorContext context =
+ new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
long seedSize = conf.getLong("dt.seedSize", SEED_SIZE);
outputOperator.setup(context);
outputOperator.beginWindow(0);
- for(long valueCounter = 0;
+ for (long valueCounter = 0;
valueCounter < seedSize;
valueCounter++) {
outputOperator.input.put(random.nextInt());
@@ -89,8 +101,7 @@ public class MemsqlInputBenchmarkTest
lm.prepareDAG(app, conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
DTThrowable.rethrow(ex);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
index 8534e20..297bc6d 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
@@ -18,15 +18,17 @@
*/
package com.datatorrent.benchmark.memsql;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.memsql.MemsqlPOJOOutputOperator;
import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* BenchMark Results
@@ -41,10 +43,10 @@ import org.slf4j.LoggerFactory;
*
* @since 1.0.5
*/
-@ApplicationAnnotation(name="MemsqlOutputBenchmark")
+@ApplicationAnnotation(name = "MemsqlOutputBenchmark")
public class MemsqlOutputBenchmark implements StreamingApplication
{
- private static transient final Logger LOG = LoggerFactory.getLogger(MemsqlOutputBenchmark.class);
+ private static final transient Logger LOG = LoggerFactory.getLogger(MemsqlOutputBenchmark.class);
public static final int DEFAULT_BATCH_SIZE = 1000;
public static final int MAX_WINDOW_COUNT = 10000;
@@ -61,7 +63,7 @@ public class MemsqlOutputBenchmark implements StreamingApplication
@Override
public void emitTuples()
{
- if(done) {
+ if (done) {
return;
}
@@ -73,8 +75,7 @@ public class MemsqlOutputBenchmark implements StreamingApplication
{
try {
super.endWindow();
- }
- catch(Exception e) {
+ } catch (Exception e) {
done = true;
}
}
@@ -83,20 +84,21 @@ public class MemsqlOutputBenchmark implements StreamingApplication
@Override
public void populateDAG(DAG dag, Configuration conf)
{
- CustomRandomEventGenerator randomEventGenerator = dag.addOperator("randomEventGenerator", new CustomRandomEventGenerator());
+ CustomRandomEventGenerator randomEventGenerator = dag.addOperator(
+ "randomEventGenerator", new CustomRandomEventGenerator());
randomEventGenerator.setMaxCountOfWindows(MAX_WINDOW_COUNT);
randomEventGenerator.setTuplesBlastIntervalMillis(TUPLE_BLAST_MILLIS);
randomEventGenerator.setTuplesBlast(TUPLE_BLAST);
LOG.debug("Before making output operator");
MemsqlPOJOOutputOperator memsqlOutputOperator = dag.addOperator("memsqlOutputOperator",
- new MemsqlPOJOOutputOperator());
+ new MemsqlPOJOOutputOperator());
LOG.debug("After making output operator");
memsqlOutputOperator.setBatchSize(DEFAULT_BATCH_SIZE);
dag.addStream("memsqlConnector",
- randomEventGenerator.integer_data,
- memsqlOutputOperator.input);
+ randomEventGenerator.integer_data,
+ memsqlOutputOperator.input);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java
index bab0c9e..bf82ab3 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java
@@ -18,20 +18,23 @@
*/
package com.datatorrent.benchmark.memsql;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest;
-import com.datatorrent.contrib.memsql.MemsqlStore;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.sql.SQLException;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
+
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest;
+import com.datatorrent.contrib.memsql.MemsqlStore;
+import com.datatorrent.netlet.util.DTThrowable;
+
public class MemsqlOutputBenchmarkTest
{
private static final Logger LOG = LoggerFactory.getLogger(MemsqlOutputBenchmarkTest.class);
@@ -45,7 +48,8 @@ public class MemsqlOutputBenchmarkTest
MemsqlStore memsqlStore = new MemsqlStore();
memsqlStore.setDatabaseUrl(conf.get("dt.rootDbUrl"));
- memsqlStore.setConnectionProperties(conf.get("dt.application.MemsqlOutputBenchmark.operator.memsqlOutputOperator.store.connectionProperties"));
+ memsqlStore.setConnectionProperties(
+ conf.get("dt.application.MemsqlOutputBenchmark.operator.memsqlOutputOperator.store.connectionProperties"));
AbstractMemsqlOutputOperatorTest.memsqlInitializeDatabase(memsqlStore);
@@ -56,8 +60,7 @@ public class MemsqlOutputBenchmarkTest
lm.prepareDAG(app, conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
DTThrowable.rethrow(ex);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java
index 9f82e79..d270e7f 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java
@@ -18,14 +18,17 @@
*/
package com.datatorrent.benchmark.script;
-import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.LocalMode;
+
/**
* Benchmark Test for Ruby Operator in local mode.
*/
-public class RubyOperatorBenchmarkAppTest {
+public class RubyOperatorBenchmarkAppTest
+{
@Test
public void testApplication() throws Exception
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
index 7e64c5f..b87fec1 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
@@ -35,7 +35,6 @@ import org.apache.apex.malhar.lib.utils.serde.StringSerde;
import com.datatorrent.lib.fileaccess.TFileImpl;
-
public class SpillableDSBenchmarkTest
{
private static final Logger logger = LoggerFactory.getLogger(SpillableDSBenchmarkTest.class);
@@ -57,7 +56,6 @@ public class SpillableDSBenchmarkTest
@Rule
public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
-
@Before
public void setup()
{
@@ -116,7 +114,8 @@ public class SpillableDSBenchmarkTest
long spentTime = System.currentTimeMillis() - startTime;
if (spentTime > outputTimes * 5000) {
++outputTimes;
- logger.info("Total Statistics: Spent {} mills for {} operation. average/second: {}", spentTime, i, i * 1000 / spentTime);
+ logger.info("Total Statistics: Spent {} mills for {} operation. average/second: {}",
+ spentTime, i, i * 1000 / spentTime);
checkEnvironment();
}
}
@@ -126,7 +125,6 @@ public class SpillableDSBenchmarkTest
loopCount / spentTime);
}
-
public void putEntry(SpillableMapImpl<String, String> map)
{
map.put(keys[random.nextInt(keys.length)], values[random.nextInt(values.length)]);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
index 4f03a10..dc8f4b4 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
@@ -34,7 +34,6 @@ import com.datatorrent.benchmark.state.StoreOperator.ExecMode;
/**
* This is not a really unit test, but in fact a benchmark runner.
* Provides this class to give developers the convenience to run in local IDE environment.
- *
*/
public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp
{
@@ -91,8 +90,6 @@ public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp
lc.shutdown();
}
-
-
@Override
public String getStoreBasePath(Configuration conf)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java
index 14fd7e3..99d8a1f 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java
@@ -18,16 +18,19 @@
*/
package com.datatorrent.benchmark.testbench;
-import com.datatorrent.api.LocalMode;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
/**
* Benchmark Test for EventClassifier Operator in local mode.
*/
@@ -47,9 +50,8 @@ public class EventClassifierAppTest
lm.prepareDAG(new EventClassifierApp(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
- }
- catch (Exception ex) {
- logger.info(ex.getMessage());
+ } catch (Exception ex) {
+ logger.info(ex.getMessage());
}
is.close();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java
index b793ecd..929d8bc 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java
@@ -18,16 +18,19 @@
*/
package com.datatorrent.benchmark.testbench;
-import com.datatorrent.api.LocalMode;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
/**
* Benchmark Test for EventClassifierNumberToHashDouble Operator in local mode.
*/
@@ -48,9 +51,8 @@ public class EventClassifierNumberToHashDoubleAppTest
lm.prepareDAG(new EventClassifierNumberToHashDoubleApp(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
- }
- catch (Exception ex) {
- logger.info(ex.getMessage());
+ } catch (Exception ex) {
+ logger.info(ex.getMessage());
}
is.close();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java
index 4808ff2..5a427a5 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java
@@ -18,16 +18,19 @@
*/
package com.datatorrent.benchmark.testbench;
-import com.datatorrent.api.LocalMode;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
/**
* Benchmark Test for EventGenerator Operator in local mode.
*/
@@ -50,9 +53,8 @@ public class EventGeneratorAppTest
lm.prepareDAG(new EventGeneratorApp(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
- }
- catch (Exception ex) {
- logger.info(ex.getMessage());
+ } catch (Exception ex) {
+ logger.info(ex.getMessage());
}
is.close();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java
index 4384256..1a85a7b 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java
@@ -18,16 +18,19 @@
*/
package com.datatorrent.benchmark.testbench;
-import com.datatorrent.api.LocalMode;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
/**
* Benchmark Test for EventIncrementerApp Operator in local mode.
*/
@@ -48,9 +51,8 @@ public class EventIncrementerAppTest
lm.prepareDAG(new EventIncrementerApp(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
- }
- catch (Exception ex) {
- logger.info(ex.getMessage());
+ } catch (Exception ex) {
+ logger.info(ex.getMessage());
}
is.close();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java
index e1368ba..9419022 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java
@@ -19,6 +19,8 @@
package com.datatorrent.benchmark.testbench;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.io.InputStream;
import org.junit.Test;
@@ -28,8 +30,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.LocalMode;
-import java.io.FileNotFoundException;
-import java.io.IOException;
+
/**
* Benchmark Test for FilterClassifierApp Operator in local mode.
*/
@@ -49,9 +50,8 @@ public class FilterClassifierAppTest
lm.prepareDAG(new FilterClassifierApp(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
- }
- catch (Exception ex) {
- logger.info(ex.getMessage());
+ } catch (Exception ex) {
+ logger.info(ex.getMessage());
}
is.close();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java
index 95453f2..977d6b7 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java
@@ -18,16 +18,19 @@
*/
package com.datatorrent.benchmark.testbench;
-import com.datatorrent.api.LocalMode;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
/**
* Benchmark Test for FilteredEventClassifierApp Operator in local mode.
*/
@@ -47,9 +50,8 @@ public class FilteredEventClassifierAppTest
lm.prepareDAG(new FilteredEventClassifierApp(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
- }
- catch (Exception ex) {
- logger.info(ex.getMessage());
+ } catch (Exception ex) {
+ logger.info(ex.getMessage());
}
is.close();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java
index cc180f0..92ca0fd 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java
@@ -18,16 +18,19 @@
*/
package com.datatorrent.benchmark.testbench;
-import com.datatorrent.api.LocalMode;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
/**
* Benchmark Test for ThroughputCounterApp Operator in local mode.
*/
@@ -47,9 +50,8 @@ public class ThroughputCounterAppTest
lm.prepareDAG(new ThroughputCounterApp(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
- }
- catch (Exception ex) {
- logger.info(ex.getMessage());
+ } catch (Exception ex) {
+ logger.info(ex.getMessage());
}
is.close();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
index 98ecf67..157accc 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
@@ -40,7 +40,6 @@ public class GenericSerdePerformanceTest
private Random random = new Random();
private int serdeDataSize = 1000000;
-
@Test
public void testCompareSerdeForString()
{
@@ -74,7 +73,6 @@ public class GenericSerdePerformanceTest
buffer.release();
}
-
@Test
public void testCompareSerdeForRealCase()
{
@@ -88,7 +86,6 @@ public class GenericSerdePerformanceTest
long genericSerdeCost = System.currentTimeMillis() - beginTime;
logger.info("Generic Serde cost for ImmutablePair: {}", genericSerdeCost);
-
beginTime = System.currentTimeMillis();
Kryo kryo = new Kryo();
for (int i = 0; i < serdeDataSize; ++i) {
@@ -99,7 +96,6 @@ public class GenericSerdePerformanceTest
long kryoSerdeCost = System.currentTimeMillis() - beginTime;
logger.info("Kryo Serde cost for ImmutablePair without class info: {}", kryoSerdeCost);
-
beginTime = System.currentTimeMillis();
Kryo kryo1 = new Kryo();
for (int i = 0; i < serdeDataSize; ++i) {
@@ -113,6 +109,7 @@ public class GenericSerdePerformanceTest
protected ImmutablePair generatePair(long now)
{
- return new ImmutablePair(new Window.TimeWindow(now + random.nextInt(100), random.nextInt(100)), "" + random.nextInt(1000));
+ return new ImmutablePair(new Window.TimeWindow(now + random.nextInt(100),
+ random.nextInt(100)), "" + random.nextInt(1000));
}
}
[3/3] apex-malhar git commit: Merge commit 'refs/pull/539/head' of
https://github.com/apache/apex-malhar
Posted by th...@apache.org.
Merge commit 'refs/pull/539/head' of https://github.com/apache/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e22ea0de
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e22ea0de
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e22ea0de
Branch: refs/heads/master
Commit: e22ea0de192967e2006071dd191cb25d4173cf31
Parents: 623b803 5528a4c
Author: Thomas Weise <th...@apache.org>
Authored: Sun Jan 22 11:22:32 2017 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Sun Jan 22 11:22:32 2017 -0800
----------------------------------------------------------------------
benchmark/pom.xml | 8 ----
.../datatorrent/benchmark/ApplicationFixed.java | 15 +++---
.../com/datatorrent/benchmark/Benchmark.java | 17 +++----
.../benchmark/CouchBaseAppInput.java | 6 +--
.../benchmark/CouchBaseAppOutput.java | 8 ++--
.../benchmark/CouchBaseInputOperator.java | 17 ++++---
.../benchmark/FixedTuplesInputOperator.java | 11 +++--
.../datatorrent/benchmark/RandomMapOutput.java | 40 +++++++++-------
.../benchmark/RandomWordInputModule.java | 11 +++--
.../benchmark/WordCountOperator.java | 10 ++--
.../AerospikeOutputBenchmarkApplication.java | 17 ++++---
.../aerospike/AerospikeOutputOperator.java | 13 ++++--
.../UniqueValueCountBenchmarkApplication.java | 21 +++++----
.../CassandraOutputBenchmarkApplication.java | 17 ++++---
.../cassandra/CassandraOutputOperator.java | 14 +++---
.../benchmark/fs/FSByteOutputOperator.java | 9 ++--
.../benchmark/fs/FSOutputOperatorBenchmark.java | 26 +++++++----
.../hive/HiveInsertBenchmarkingApp.java | 28 ++++++-----
.../hive/HiveMapInsertBenchmarkingApp.java | 31 +++++++------
.../kafka/BenchmarkKafkaInputOperator.java | 6 +--
...nchmarkPartitionableKafkaOutputOperator.java | 44 +++++++++---------
.../benchmark/kafka/KafkaInputBenchmark.java | 23 ++++-----
.../benchmark/kafka/KafkaOutputBenchmark.java | 5 +-
.../benchmark/kafka/KafkaTestPartitioner.java | 7 +--
.../RubyOperatorBenchmarkApplication.java | 11 +++--
.../spillable/SpillableTestOperator.java | 3 +-
.../state/ManagedStateBenchmarkApp.java | 3 +-
.../benchmark/state/StoreOperator.java | 9 ++--
.../stream/DevNullCounterBenchmark.java | 13 +++---
.../benchmark/stream/IntegerOperator.java | 17 ++++---
.../benchmark/stream/StreamDuplicaterApp.java | 21 +++++----
.../benchmark/stream/StreamMergeApp.java | 5 +-
.../benchmark/testbench/EventClassifierApp.java | 11 +++--
.../EventClassifierNumberToHashDoubleApp.java | 18 ++++---
.../benchmark/testbench/EventGeneratorApp.java | 9 ++--
.../testbench/EventIncrementerApp.java | 22 +++++----
.../testbench/FilterClassifierApp.java | 15 +++---
.../testbench/FilteredEventClassifierApp.java | 8 ++--
.../benchmark/testbench/HashMapOperator.java | 37 +++++++++------
.../testbench/RandomEventGeneratorApp.java | 8 ++--
.../testbench/SeedEventGeneratorApp.java | 14 ++++--
.../testbench/ThroughputCounterApp.java | 13 ++++--
.../AbstractWindowedOperatorBenchmarkApp.java | 10 ++--
.../KeyedWindowedOperatorBenchmarkApp.java | 9 ++--
.../window/WindowedOperatorBenchmarkApp.java | 6 ++-
.../benchmark/ApplicationFixedTest.java | 17 ++++---
.../datatorrent/benchmark/BenchmarkTest.java | 3 +-
.../benchmark/CouchBaseBenchmarkTest.java | 13 +++---
.../benchmark/accumulo/AccumuloApp.java | 17 ++++---
.../benchmark/accumulo/AccumuloAppTest.java | 8 ++--
.../aerospike/AerospikeBenchmarkAppTest.java | 7 +--
.../algo/UniqueValueCountBenchmarkTest.java | 6 ++-
.../cassandra/CassandraApplicatonTest.java | 8 ++--
.../benchmark/hbase/HBaseApplicationTest.java | 9 ++--
.../hbase/HBaseCsvMappingApplication.java | 7 +--
.../benchmark/hive/HiveInsertBenchmarkTest.java | 37 +++++++++------
.../benchmark/hive/HiveMapBenchmarkTest.java | 38 ++++++++-------
.../kafka/KafkaInputBenchmarkTest.java | 3 +-
.../kafka/KafkaOutputBenchmarkTest.java | 4 +-
.../benchmark/memsql/MemsqlInputBenchmark.java | 21 +++++----
.../memsql/MemsqlInputBenchmarkTest.java | 49 ++++++++++++--------
.../benchmark/memsql/MemsqlOutputBenchmark.java | 26 ++++++-----
.../memsql/MemsqlOutputBenchmarkTest.java | 21 +++++----
.../script/RubyOperatorBenchmarkAppTest.java | 7 ++-
.../spillable/SpillableDSBenchmarkTest.java | 6 +--
.../state/ManagedStateBenchmarkAppTest.java | 3 --
.../testbench/EventClassifierAppTest.java | 12 +++--
...ventClassifierNumberToHashDoubleAppTest.java | 12 +++--
.../testbench/EventGeneratorAppTest.java | 12 +++--
.../testbench/EventIncrementerAppTest.java | 12 +++--
.../testbench/FilterClassifierAppTest.java | 10 ++--
.../FilteredEventClassifierAppTest.java | 12 +++--
.../testbench/ThroughputCounterAppTest.java | 12 +++--
.../util/serde/GenericSerdePerformanceTest.java | 7 +--
74 files changed, 608 insertions(+), 457 deletions(-)
----------------------------------------------------------------------
[2/3] apex-malhar git commit: fixing all checkstyle violations,
delete maxAllowedViolations from pom
Posted by th...@apache.org.
fixing all checkstyle violations,
delete maxAllowedViolations from pom
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5528a4c6
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5528a4c6
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5528a4c6
Branch: refs/heads/master
Commit: 5528a4c639a87dbfaba4a2bae68ac99971c66224
Parents: 4cbbb75
Author: Apex Dev <de...@apex.apache.org>
Authored: Wed Jan 18 14:43:33 2017 -0800
Committer: Oliver W <ol...@datatorrent.com>
Committed: Fri Jan 20 11:12:20 2017 -0800
----------------------------------------------------------------------
benchmark/pom.xml | 8 ----
.../datatorrent/benchmark/ApplicationFixed.java | 15 +++---
.../com/datatorrent/benchmark/Benchmark.java | 17 +++----
.../benchmark/CouchBaseAppInput.java | 6 +--
.../benchmark/CouchBaseAppOutput.java | 8 ++--
.../benchmark/CouchBaseInputOperator.java | 17 ++++---
.../benchmark/FixedTuplesInputOperator.java | 11 +++--
.../datatorrent/benchmark/RandomMapOutput.java | 40 +++++++++-------
.../benchmark/RandomWordInputModule.java | 11 +++--
.../benchmark/WordCountOperator.java | 10 ++--
.../AerospikeOutputBenchmarkApplication.java | 17 ++++---
.../aerospike/AerospikeOutputOperator.java | 13 ++++--
.../UniqueValueCountBenchmarkApplication.java | 21 +++++----
.../CassandraOutputBenchmarkApplication.java | 17 ++++---
.../cassandra/CassandraOutputOperator.java | 14 +++---
.../benchmark/fs/FSByteOutputOperator.java | 9 ++--
.../benchmark/fs/FSOutputOperatorBenchmark.java | 26 +++++++----
.../hive/HiveInsertBenchmarkingApp.java | 28 ++++++-----
.../hive/HiveMapInsertBenchmarkingApp.java | 31 +++++++------
.../kafka/BenchmarkKafkaInputOperator.java | 6 +--
...nchmarkPartitionableKafkaOutputOperator.java | 44 +++++++++---------
.../benchmark/kafka/KafkaInputBenchmark.java | 23 ++++-----
.../benchmark/kafka/KafkaOutputBenchmark.java | 5 +-
.../benchmark/kafka/KafkaTestPartitioner.java | 7 +--
.../RubyOperatorBenchmarkApplication.java | 11 +++--
.../spillable/SpillableTestOperator.java | 3 +-
.../state/ManagedStateBenchmarkApp.java | 3 +-
.../benchmark/state/StoreOperator.java | 9 ++--
.../stream/DevNullCounterBenchmark.java | 13 +++---
.../benchmark/stream/IntegerOperator.java | 17 ++++---
.../benchmark/stream/StreamDuplicaterApp.java | 21 +++++----
.../benchmark/stream/StreamMergeApp.java | 5 +-
.../benchmark/testbench/EventClassifierApp.java | 11 +++--
.../EventClassifierNumberToHashDoubleApp.java | 18 ++++---
.../benchmark/testbench/EventGeneratorApp.java | 9 ++--
.../testbench/EventIncrementerApp.java | 22 +++++----
.../testbench/FilterClassifierApp.java | 15 +++---
.../testbench/FilteredEventClassifierApp.java | 8 ++--
.../benchmark/testbench/HashMapOperator.java | 37 +++++++++------
.../testbench/RandomEventGeneratorApp.java | 8 ++--
.../testbench/SeedEventGeneratorApp.java | 14 ++++--
.../testbench/ThroughputCounterApp.java | 13 ++++--
.../AbstractWindowedOperatorBenchmarkApp.java | 10 ++--
.../KeyedWindowedOperatorBenchmarkApp.java | 9 ++--
.../window/WindowedOperatorBenchmarkApp.java | 6 ++-
.../benchmark/ApplicationFixedTest.java | 17 ++++---
.../datatorrent/benchmark/BenchmarkTest.java | 3 +-
.../benchmark/CouchBaseBenchmarkTest.java | 13 +++---
.../benchmark/accumulo/AccumuloApp.java | 17 ++++---
.../benchmark/accumulo/AccumuloAppTest.java | 8 ++--
.../aerospike/AerospikeBenchmarkAppTest.java | 7 +--
.../algo/UniqueValueCountBenchmarkTest.java | 6 ++-
.../cassandra/CassandraApplicatonTest.java | 8 ++--
.../benchmark/hbase/HBaseApplicationTest.java | 9 ++--
.../hbase/HBaseCsvMappingApplication.java | 7 +--
.../benchmark/hive/HiveInsertBenchmarkTest.java | 37 +++++++++------
.../benchmark/hive/HiveMapBenchmarkTest.java | 38 ++++++++-------
.../kafka/KafkaInputBenchmarkTest.java | 3 +-
.../kafka/KafkaOutputBenchmarkTest.java | 4 +-
.../benchmark/memsql/MemsqlInputBenchmark.java | 21 +++++----
.../memsql/MemsqlInputBenchmarkTest.java | 49 ++++++++++++--------
.../benchmark/memsql/MemsqlOutputBenchmark.java | 26 ++++++-----
.../memsql/MemsqlOutputBenchmarkTest.java | 21 +++++----
.../script/RubyOperatorBenchmarkAppTest.java | 7 ++-
.../spillable/SpillableDSBenchmarkTest.java | 6 +--
.../state/ManagedStateBenchmarkAppTest.java | 3 --
.../testbench/EventClassifierAppTest.java | 12 +++--
...ventClassifierNumberToHashDoubleAppTest.java | 12 +++--
.../testbench/EventGeneratorAppTest.java | 12 +++--
.../testbench/EventIncrementerAppTest.java | 12 +++--
.../testbench/FilterClassifierAppTest.java | 10 ++--
.../FilteredEventClassifierAppTest.java | 12 +++--
.../testbench/ThroughputCounterAppTest.java | 12 +++--
.../util/serde/GenericSerdePerformanceTest.java | 7 +--
74 files changed, 608 insertions(+), 457 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index b2e0981..4bbd5ac 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -143,14 +143,6 @@
<skip>true</skip>
</configuration>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <configuration>
- <maxAllowedViolations>281</maxAllowedViolations>
- <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole>
- </configuration>
- </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java b/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
index 53f01fc..aa10eea 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
@@ -18,13 +18,15 @@
*/
package com.datatorrent.benchmark;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.DAG;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
+
+import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.api.StreamingApplication;
-import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
/**
* Example of application configuration in Java.
@@ -34,7 +36,7 @@ import org.apache.hadoop.conf.Configuration;
*
* @since 0.3.2
*/
-@ApplicationAnnotation(name="PerformanceBenchmarkForFixedNumberOfTuples")
+@ApplicationAnnotation(name = "PerformanceBenchmarkForFixedNumberOfTuples")
public class ApplicationFixed implements StreamingApplication
{
private final Locality locality = null;
@@ -44,7 +46,8 @@ public class ApplicationFixed implements StreamingApplication
public void populateDAG(DAG dag, Configuration conf)
{
FixedTuplesInputOperator wordGenerator = dag.addOperator("WordGenerator", FixedTuplesInputOperator.class);
- dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes()
+ .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
wordGenerator.setCount(500000);
WordCountOperator<byte[]> counter = dag.addOperator("Counter", new WordCountOperator<byte[]>());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
index 5649914..d8d51b8 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
@@ -52,10 +52,10 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
* @since 0.9.0
*/
-@ApplicationAnnotation(name="PerformanceBenchmarkingApp")
+@ApplicationAnnotation(name = "PerformanceBenchmarkingApp")
public abstract class Benchmark
{
- static abstract class AbstractApplication implements StreamingApplication
+ abstract static class AbstractApplication implements StreamingApplication
{
public static final int QUEUE_CAPACITY = 32 * 1024;
@@ -63,7 +63,8 @@ public abstract class Benchmark
public void populateDAG(DAG dag, Configuration conf)
{
RandomWordInputModule wordGenerator = dag.addOperator("wordGenerator", RandomWordInputModule.class);
- dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes()
+ .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
WordCountOperator<byte[]> counter = dag.addOperator("counter", new WordCountOperator<byte[]>());
dag.getMeta(counter).getMeta(counter.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
@@ -77,7 +78,7 @@ public abstract class Benchmark
/**
* Let the engine decide how to best place the 2 operators.
*/
- @ApplicationAnnotation(name="PerformanceBenchmarkNoLocality")
+ @ApplicationAnnotation(name = "PerformanceBenchmarkNoLocality")
public static class NoLocality extends AbstractApplication
{
@Override
@@ -92,7 +93,7 @@ public abstract class Benchmark
* Place the 2 operators so that they are in the same Rack.
* The operators are requested to be deployed on different machines.
*/
- @ApplicationAnnotation(name="PerformanceBenchmarkRackLocal")
+ @ApplicationAnnotation(name = "PerformanceBenchmarkRackLocal")
public static class RackLocal extends AbstractApplication
{
@Override
@@ -107,7 +108,7 @@ public abstract class Benchmark
* Place the 2 operators so that they are in the same node.
* The operators are requested to be deployed as different processes within the same machine.
*/
- @ApplicationAnnotation(name="PerformanceBenchmarkNodeLocal")
+ @ApplicationAnnotation(name = "PerformanceBenchmarkNodeLocal")
public static class NodeLocal extends AbstractApplication
{
@Override
@@ -122,7 +123,7 @@ public abstract class Benchmark
* Place the 2 operators so that they are in the same container.
* The operators are deployed as different threads in the same process.
*/
- @ApplicationAnnotation(name="PerformanceBenchmarkContainerLocal")
+ @ApplicationAnnotation(name = "PerformanceBenchmarkContainerLocal")
public static class ContainerLocal extends AbstractApplication
{
@Override
@@ -136,7 +137,7 @@ public abstract class Benchmark
/**
* Place the 2 operators so that they are in the same thread.
*/
- @ApplicationAnnotation(name="PerformanceBenchmarkThreadLocal")
+ @ApplicationAnnotation(name = "PerformanceBenchmarkThreadLocal")
public static class ThreadLocal extends AbstractApplication
{
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
index 6096530..bf5b876 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
@@ -18,14 +18,14 @@
*/
package com.datatorrent.benchmark;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
/**
- *
* Application to benchmark the performance of couchbase input operator.
* The number of tuples processed per second were around 9000.
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
index f789d08..4f12791 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
@@ -18,12 +18,14 @@
*/
package com.datatorrent.benchmark;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
+
+import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
/**
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
index 923b588..8ae0a94 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
@@ -18,12 +18,14 @@
*/
package com.datatorrent.benchmark;
-import com.datatorrent.contrib.couchbase.AbstractCouchBaseInputOperator;
-import com.datatorrent.contrib.couchbase.CouchBaseWindowStore;
import java.util.ArrayList;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.contrib.couchbase.AbstractCouchBaseInputOperator;
+import com.datatorrent.contrib.couchbase.CouchBaseWindowStore;
+
/**
* <p>CouchBaseInputOperator class.</p>
*
@@ -32,14 +34,15 @@ import org.slf4j.LoggerFactory;
public class CouchBaseInputOperator extends AbstractCouchBaseInputOperator<String>
{
private static final Logger logger = LoggerFactory.getLogger(CouchBaseWindowStore.class);
+
@Override
public String getTuple(Object object)
{
- if(object!=null)
- return object.toString();
- else{
- logger.info("Object returned is null");
- return "null";
+ if (object != null) {
+ return object.toString();
+ } else {
+ logger.info("Object returned is null");
+ return "null";
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
index ad7f8c1..f2582bd 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
@@ -18,14 +18,15 @@
*/
package com.datatorrent.benchmark;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Context.OperatorContext;
+import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
+import com.datatorrent.api.Context.OperatorContext;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
/**
* <p>FixedTuplesInputOperator class.</p>
@@ -44,7 +45,7 @@ public class FixedTuplesInputOperator implements InputOperator
{
if (firstTime) {
long start = System.currentTimeMillis();
- for (int i = count; i-- > 0;) {
+ for (int i = count; i-- > 0; ) {
output.emit(new byte[64]);
}
firstTime = false;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java b/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
index 106bd79..3342771 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
@@ -19,20 +19,23 @@
package com.datatorrent.benchmark;
import java.util.HashMap;
-import com.datatorrent.common.util.BaseOperator;
+
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
/**
* Operator that outputs random values in a map.
*
* @since 1.0.4
*/
-public class RandomMapOutput extends BaseOperator {
+public class RandomMapOutput extends BaseOperator
+{
- public final transient DefaultOutputPort<HashMap<String, Object>> map_data = new DefaultOutputPort<HashMap<String, Object>>();
+ public final transient DefaultOutputPort<HashMap<String, Object>> map_data =
+ new DefaultOutputPort<HashMap<String, Object>>();
public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
- {
+ {
@Override
public void process(Integer tuple)
{
@@ -40,22 +43,25 @@ public class RandomMapOutput extends BaseOperator {
map.put(key, tuple);
RandomMapOutput.this.process(map);
}
- };
+ };
- private String key;
+ private String key;
- public String getKey() {
- return key;
- }
+ public String getKey()
+ {
+ return key;
+ }
- public void setKey(String key) {
- this.key = key;
- }
+ public void setKey(String key)
+ {
+ this.key = key;
+ }
- public void process(HashMap<String, Object> tuple) {
+ public void process(HashMap<String, Object> tuple)
+ {
- if (map_data.isConnected()) {
- map_data.emit(tuple);
- }
- }
+ if (map_data.isConnected()) {
+ map_data.emit(tuple);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java b/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
index 11c7568..7d02de2 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
@@ -18,10 +18,12 @@
*/
package com.datatorrent.benchmark;
+import javax.validation.constraints.Min;
+
+import com.datatorrent.api.Context.OperatorContext;
+
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Context.OperatorContext;
-import javax.validation.constraints.Min;
/**
* <p>
@@ -87,7 +89,6 @@ public class RandomWordInputModule implements InputOperator
return emitSameTuple;
}
-
/**
* Emits byte array of specified size.
* Emits either the same byte array or creates new byte array every time
@@ -103,11 +104,11 @@ public class RandomWordInputModule implements InputOperator
final boolean EMIT_SAME_TUPLE_COPY = emitSameTuple;
if (firstTime) {
if (EMIT_SAME_TUPLE_COPY) {
- for (int i = count--; i-- > 0;) {
+ for (int i = count--; i-- > 0; ) {
output.emit(sameTupleArray);
}
} else {
- for (int i = count--; i-- > 0;) {
+ for (int i = count--; i-- > 0; ) {
output.emit(new byte[TUPLE_SIZE_COPY]);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
index 098ed42..6e91482 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
@@ -21,14 +21,17 @@ package com.datatorrent.benchmark;
/*
* To change this template, choose Tools | Templates and open the template in the editor.
*/
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Context.OperatorContext;
import java.util.ArrayList;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.api.Context.OperatorContext;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
/**
* <p>WordCountOperator class.</p>
*
@@ -84,5 +87,6 @@ public class WordCountOperator<T> implements Operator
counts = new ArrayList<Integer>();
millis = new ArrayList<Integer>();
}
+
private static final Logger logger = LoggerFactory.getLogger(WordCountOperator.class);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
index a70aae6..0a880fd 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
@@ -18,15 +18,16 @@
*/
package com.datatorrent.benchmark.aerospike;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.aerospike.AerospikeTransactionalStore;
import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.hadoop.conf.Configuration;
+
/**
- *
* Application to benchmark the performance of aerospike output operator.
* The operator was tested on DT cluster and the number of tuples processed
* by the operator per second were around 12,000
@@ -34,16 +35,18 @@ import org.apache.hadoop.conf.Configuration;
* @since 1.0.4
*/
-
-@ApplicationAnnotation(name="AerospikeOutputOperatorBenchmark")
-public class AerospikeOutputBenchmarkApplication implements StreamingApplication {
+@ApplicationAnnotation(name = "AerospikeOutputOperatorBenchmark")
+public class AerospikeOutputBenchmarkApplication implements StreamingApplication
+{
private final String NODE = "127.0.0.1";
private final int PORT = 3000;
private final String NAMESPACE = "test";
private final Locality locality = null;
+
@Override
- public void populateDAG(DAG dag, Configuration conf) {
+ public void populateDAG(DAG dag, Configuration conf)
+ {
RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
rand.setMaxvalue(3000);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
index 210e086..f9ee689 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
@@ -23,25 +23,28 @@ import java.util.List;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
-import com.datatorrent.contrib.aerospike.AbstractAerospikeTransactionalPutOperator;
+import com.datatorrent.contrib.aerospike.AbstractAerospikeTransactionalPutOperator;
/**
* <p>AerospikeOutputOperator class.</p>
*
* @since 1.0.4
*/
-public class AerospikeOutputOperator extends AbstractAerospikeTransactionalPutOperator<Integer>{
+public class AerospikeOutputOperator extends AbstractAerospikeTransactionalPutOperator<Integer>
+{
private final String KEYSPACE = "test";
private final String SET_NAME = "Aerospike_Output";
private int id = 0;
+
@Override
protected Key getUpdatedBins(Integer tuple, List<Bin> bins)
- throws AerospikeException {
+ throws AerospikeException
+ {
- Key key = new Key(KEYSPACE,SET_NAME,id++);
- bins.add(new Bin("ID",tuple));
+ Key key = new Key(KEYSPACE, SET_NAME, id++);
+ bins.add(new Bin("ID", tuple));
return key;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
index f522396..f74311e 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
@@ -18,23 +18,22 @@
*/
package com.datatorrent.benchmark.algo;
-
import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+
import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.lib.stream.Counter;
import com.datatorrent.lib.testbench.RandomEventGenerator;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
/**
* Application to demonstrate PartitionableUniqueCount operator. <br>
* The input operator generate random keys, which is sent to
@@ -63,9 +62,11 @@ public class UniqueValueCountBenchmarkApplication implements StreamingApplicatio
/* Initialize with three partition to start with */
UniqueCounter<Integer> uniqCount = dag.addOperator("uniqevalue", new UniqueCounter<Integer>());
- MapToKeyHashValuePairConverter<Integer, Integer> converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter());
+ MapToKeyHashValuePairConverter<Integer, Integer> converter =
+ dag.addOperator("converter", new MapToKeyHashValuePairConverter());
- dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<UniqueCounter<Integer>>(3));
+ dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER,
+ new StatelessPartitioner<UniqueCounter<Integer>>(3));
dag.setInputPortAttribute(uniqCount.data, Context.PortContext.PARTITION_PARALLEL, true);
uniqCount.setCumulative(false);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
index dead2cd..46d503f 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
@@ -19,24 +19,27 @@
package com.datatorrent.benchmark.cassandra;
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
+
import com.datatorrent.api.annotation.ApplicationAnnotation;
+
import com.datatorrent.contrib.cassandra.CassandraTransactionalStore;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
/**
- *
- *Application to benchmark the performance of cassandra output operator.
- *The operator was tested on following configuration:
- *Virtual Box with 10GB ram, 4 processor cores on an i7 machine with 16GB ram
- *The number of tuples processed per second were around 20,000
+ * Application to benchmark the performance of cassandra output operator.
+ * The operator was tested on following configuration:
+ * Virtual Box with 10GB ram, 4 processor cores on an i7 machine with 16GB ram
+ * The number of tuples processed per second were around 20,000
*
* @since 1.0.3
*/
-@ApplicationAnnotation(name="CassandraOperatorDemo")
+@ApplicationAnnotation(name = "CassandraOperatorDemo")
public class CassandraOutputBenchmarkApplication implements StreamingApplication
{
private final Locality locality = null;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
index 666746b..592d8a2 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
@@ -18,34 +18,36 @@
*/
package com.datatorrent.benchmark.cassandra;
-
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
-import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator;
+import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator;
/**
* <p>CassandraOutputOperator class.</p>
*
* @since 1.0.3
*/
-public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperator<Integer>{
+public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperator<Integer>
+{
private int id = 0;
@Override
- protected PreparedStatement getUpdateCommand() {
+ protected PreparedStatement getUpdateCommand()
+ {
String statement = "Insert into test.cassandra_operator(id, result) values (?,?);";
return store.getSession().prepare(statement);
}
@Override
protected Statement setStatementParameters(PreparedStatement updateCommand,
- Integer tuple) throws DriverException {
+ Integer tuple) throws DriverException
+ {
BoundStatement boundStmnt = new BoundStatement(updateCommand);
- return boundStmnt.bind(id++,tuple);
+ return boundStmnt.bind(id++, tuple);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
index 894cb75..ce0821c 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
@@ -18,10 +18,12 @@
*/
package com.datatorrent.benchmark.fs;
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import java.util.Arrays;
+
import javax.validation.constraints.Min;
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
/**
* This output operator receives
*
@@ -38,19 +40,20 @@ public class FSByteOutputOperator extends AbstractFileOutputOperator<byte[]>
/**
* The file a tuple is written out to is determined by modding the hashcode of the
* tuple by the outputFileCount.
+ *
* @param tuple The input tuple to write out.
* @return The name of the file to write the tuple to.
*/
@Override
protected String getFileName(byte[] tuple)
{
- return ((Integer) (Arrays.hashCode(tuple) % outputFileCount)).toString();
+ return ((Integer)(Arrays.hashCode(tuple) % outputFileCount)).toString();
}
@Override
protected byte[] getBytesForTuple(byte[] tuple)
{
- for(int counter = 0;
+ for (int counter = 0;
counter < tuple.length;
counter++) {
tuple[counter] += 1;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
index 8702ab8..7a63d18 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
@@ -18,17 +18,20 @@
*/
package com.datatorrent.benchmark.fs;
-import com.datatorrent.lib.testbench.RandomWordGenerator;
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
+
+import com.datatorrent.api.StreamingApplication;
+
import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.counters.BasicCounters;
-import org.apache.commons.lang.mutable.MutableLong;
+import com.datatorrent.lib.counters.BasicCounters;
-import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.lib.testbench.RandomWordGenerator;
/**
* Application used to benchmark HDFS output operator
@@ -38,25 +41,28 @@ import org.apache.hadoop.conf.Configuration;
* @since 0.9.4
*/
-@ApplicationAnnotation(name="HDFSOutputOperatorBenchmarkingApp")
+@ApplicationAnnotation(name = "HDFSOutputOperatorBenchmarkingApp")
public class FSOutputOperatorBenchmark implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
String filePath = "HDFSOutputOperatorBenchmarkingApp/"
- + System.currentTimeMillis();
+ + System.currentTimeMillis();
dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
RandomWordGenerator wordGenerator = dag.addOperator("wordGenerator", RandomWordGenerator.class);
- dag.getOperatorMeta("wordGenerator").getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, 10000);
- dag.getOperatorMeta("wordGenerator").getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, 1);
+ dag.getOperatorMeta("wordGenerator").getMeta(wordGenerator.output)
+ .getAttributes().put(PortContext.QUEUE_CAPACITY, 10000);
+ dag.getOperatorMeta("wordGenerator").getAttributes()
+ .put(OperatorContext.APPLICATION_WINDOW_COUNT, 1);
FSByteOutputOperator hdfsOutputOperator = dag.addOperator("hdfsOutputOperator", new FSByteOutputOperator());
hdfsOutputOperator.setFilePath(filePath);
- dag.getOperatorMeta("hdfsOutputOperator").getAttributes().put(OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
+ dag.getOperatorMeta("hdfsOutputOperator").getAttributes()
+ .put(OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
dag.addStream("Generator2HDFSOutput", wordGenerator.output, hdfsOutputOperator.input);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
index 60be57d..95fa961 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
@@ -30,16 +30,18 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator;
-import com.datatorrent.contrib.hive.HiveOperator;
-import com.datatorrent.contrib.hive.HiveStore;
-
import com.datatorrent.api.Context.OperatorContext;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
+
import com.datatorrent.api.StreamingApplication;
+
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator;
+import com.datatorrent.contrib.hive.HiveOperator;
+import com.datatorrent.contrib.hive.HiveStore;
/**
* Application used to benchmark HIVE Insert operator
@@ -79,13 +81,14 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication
{
HiveStore store = new HiveStore();
store.setDatabaseUrl(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
- store.setConnectionProperties(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+ store.setConnectionProperties(conf.get(
+ "dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
store.setFilepath(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
try {
- hiveInitializeDatabase(store, conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
- }
- catch (SQLException ex) {
+ hiveInitializeDatabase(store, conf.get(
+ "dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
+ } catch (SQLException ex) {
LOG.debug(ex.getMessage());
}
@@ -109,8 +112,9 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication
{
hiveStore.connect();
Statement stmt = hiveStore.getConnection().createStatement();
- stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 string) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n"
- + "STORED AS TEXTFILE ");
+ stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename
+ + " (col1 string) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n"
+ + "STORED AS TEXTFILE ");
hiveStore.disconnect();
}
@@ -171,8 +175,8 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication
{
for (;
- tupleCounter < tuplesPerWindow;
- tupleCounter++) {
+ tupleCounter < tuplesPerWindow;
+ tupleCounter++) {
String output = "2014-12-1" + random.nextInt(10) + "";
outputString.emit(output);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
index cfbbfc5..98d9ce3 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
@@ -24,23 +24,23 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
-
-import com.datatorrent.contrib.hive.*;
-
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
-
import com.datatorrent.benchmark.RandomMapOutput;
+import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator;
+import com.datatorrent.contrib.hive.HiveOperator;
+import com.datatorrent.contrib.hive.HiveStore;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+
/**
* Application used to benchmark HIVE Map Insert operator
* The DAG consists of random Event generator operator that is
@@ -61,12 +61,13 @@ public class HiveMapInsertBenchmarkingApp implements StreamingApplication
{
HiveStore store = new HiveStore();
store.setDatabaseUrl(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
- store.setConnectionProperties(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+ store.setConnectionProperties(conf.get(
+ "dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
store.setFilepath(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
try {
- hiveInitializeMapDatabase(store, conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"), ":");
- }
- catch (SQLException ex) {
+ hiveInitializeMapDatabase(store, conf.get(
+ "dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"), ":");
+ } catch (SQLException ex) {
LOG.debug(ex.getMessage());
}
dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
@@ -90,13 +91,15 @@ public class HiveMapInsertBenchmarkingApp implements StreamingApplication
/*
* User can create table and specify data columns and partition columns in this function.
*/
- public static void hiveInitializeMapDatabase(HiveStore hiveStore, String tablename, String delimiterMap) throws SQLException
+ public static void hiveInitializeMapDatabase(
+ HiveStore hiveStore, String tablename, String delimiterMap) throws SQLException
{
hiveStore.connect();
Statement stmt = hiveStore.getConnection().createStatement();
- stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n"
- + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n"
- + "STORED AS TEXTFILE ");
+ stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename
+ + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n"
+ + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n"
+ + "STORED AS TEXTFILE ");
hiveStore.disconnect();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
index 8239ea7..e147ad7 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
@@ -18,11 +18,11 @@
*/
package com.datatorrent.benchmark.kafka;
-import kafka.message.Message;
-
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.contrib.kafka.AbstractKafkaInputOperator;
+import kafka.message.Message;
+
/**
* This operator emits one constant message for each kafka message received.
* So we can track the throughput by messages emitted per second in the stram platform.
@@ -38,7 +38,7 @@ public class BenchmarkKafkaInputOperator extends AbstractKafkaInputOperator
/**
* The output port on which messages are emitted.
*/
- public transient DefaultOutputPort<String> oport = new DefaultOutputPort<String>();
+ public transient DefaultOutputPort<String> oport = new DefaultOutputPort<String>();
@Override
protected void emitTuple(Message message)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
index 1126ac1..6353c37 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
@@ -18,7 +18,6 @@
*/
package com.datatorrent.benchmark.kafka;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@@ -26,9 +25,6 @@ import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
import javax.validation.constraints.Min;
@@ -37,21 +33,27 @@ import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultPartition;
+
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.Partitioner;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
/**
* This operator keep sending constant messages(1kb each) in {@link #threadNum} threads.
* Messages are distributed evenly to partitions.
* <p></p>
+ *
* @displayName Benchmark Partitionable Kafka Output
* @category Messaging
* @tags output operator
- *
* @since 0.9.3
*/
-public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<BenchmarkPartitionableKafkaOutputOperator>, InputOperator, ActivationListener<OperatorContext>
+public class BenchmarkPartitionableKafkaOutputOperator implements
+ Partitioner<BenchmarkPartitionableKafkaOutputOperator>, InputOperator, ActivationListener<OperatorContext>
{
private static final Logger logger = LoggerFactory.getLogger(BenchmarkPartitionableKafkaOutputOperator.class);
@@ -78,7 +80,8 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
private int stickyKey = 0;
- private transient Runnable r = new Runnable() {
+ private transient Runnable r = new Runnable()
+ {
Producer<String, String> producer = null;
@@ -101,12 +104,12 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
}
long k = 0;
- while (k<msgsSecThread || !controlThroughput) {
+ while (k < msgsSecThread || !controlThroughput) {
long key = (stickyKey >= 0 ? stickyKey : k);
k++;
producer.send(new KeyedMessage<String, String>(topic, "" + key, new String(constantMsg)));
- if(k==Long.MAX_VALUE){
- k=0;
+ if (k == Long.MAX_VALUE) {
+ k = 0;
}
}
}
@@ -152,10 +155,12 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
* {@inheritDoc}
*/
@Override
- public Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> definePartitions(Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> partitions, PartitioningContext context)
+ public Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> definePartitions(
+ Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> partitions, PartitioningContext context)
{
- ArrayList<Partition<BenchmarkPartitionableKafkaOutputOperator>> newPartitions = new ArrayList<Partitioner.Partition<BenchmarkPartitionableKafkaOutputOperator>>(partitionCount);
+ ArrayList<Partition<BenchmarkPartitionableKafkaOutputOperator>> newPartitions =
+ new ArrayList<Partitioner.Partition<BenchmarkPartitionableKafkaOutputOperator>>(partitionCount);
for (int i = 0; i < partitionCount; i++) {
BenchmarkPartitionableKafkaOutputOperator bpkoo = new BenchmarkPartitionableKafkaOutputOperator();
@@ -163,7 +168,8 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
bpkoo.setTopic(topic);
bpkoo.setBrokerList(brokerList);
bpkoo.setStickyKey(i);
- Partition<BenchmarkPartitionableKafkaOutputOperator> p = new DefaultPartition<BenchmarkPartitionableKafkaOutputOperator>(bpkoo);
+ Partition<BenchmarkPartitionableKafkaOutputOperator> p =
+ new DefaultPartition<BenchmarkPartitionableKafkaOutputOperator>(bpkoo);
newPartitions.add(p);
}
return newPartitions;
@@ -176,20 +182,17 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
logger.info("Activate the benchmark kafka output operator .... ");
constantMsg = new byte[msgSize];
for (int i = 0; i < constantMsg.length; i++) {
- constantMsg[i] = (byte) ('a' + i%26);
+ constantMsg[i] = (byte)('a' + i % 26);
}
-
for (int i = 0; i < threadNum; i++) {
- if(controlThroughput){
+ if (controlThroughput) {
ses.scheduleAtFixedRate(r, 0, 1, TimeUnit.SECONDS);
- }
- else {
+ } else {
ses.submit(r);
}
}
-
}
@Override
@@ -268,7 +271,4 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
this.stickyKey = stickyKey;
}
-
-
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
index 159ee60..ead6c66 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
@@ -18,24 +18,27 @@
*/
package com.datatorrent.benchmark.kafka;
-import com.google.common.collect.Sets;
import java.util.Properties;
+
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
+
+import com.datatorrent.api.DAG;
+
import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+
+import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.kafka.HighlevelKafkaConsumer;
import com.datatorrent.contrib.kafka.KafkaConsumer;
import com.datatorrent.contrib.kafka.SimpleKafkaConsumer;
-
/**
* The stream app to test the benckmark of kafka
* You can set the property file to make it using either {@link SimpleKafkaConsumer} or {@link HighlevelKafkaConsumer}
@@ -43,13 +46,14 @@ import com.datatorrent.contrib.kafka.SimpleKafkaConsumer;
*
* @since 0.9.3
*/
-@ApplicationAnnotation(name="KafkaInputBenchmark")
+@ApplicationAnnotation(name = "KafkaInputBenchmark")
public class KafkaInputBenchmark implements StreamingApplication
{
public static class CollectorModule extends BaseOperator
{
- public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>() {
+ public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>()
+ {
@Override
public void process(String arg0)
@@ -65,12 +69,10 @@ public class KafkaInputBenchmark implements StreamingApplication
dag.setAttribute(DAG.APPLICATION_NAME, "KafkaInputOperatorPartitionDemo");
BenchmarkKafkaInputOperator bpkio = new BenchmarkKafkaInputOperator();
-
String type = conf.get("kafka.consumertype", "simple");
KafkaConsumer consumer = null;
-
if (type.equals("highlevel")) {
// Create template high-level consumer
@@ -96,7 +98,6 @@ public class KafkaInputBenchmark implements StreamingApplication
dag.setAttribute(bpkio, OperatorContext.COUNTERS_AGGREGATOR, new KafkaConsumer.KafkaMeterStatsAggregator());
// dag.setAttribute(bpkio, OperatorContext.STATS_LISTENER, KafkaMeterStatsListener.class);
-
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
index ca1de48..0dd4352 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
@@ -29,7 +29,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
*
* @since 0.9.3
*/
-@ApplicationAnnotation(name="KafkaOutputBenchmark")
+@ApplicationAnnotation(name = "KafkaOutputBenchmark")
public class KafkaOutputBenchmark implements StreamingApplication
{
@@ -37,7 +37,8 @@ public class KafkaOutputBenchmark implements StreamingApplication
public void populateDAG(DAG dag, Configuration conf)
{
dag.setAttribute(DAG.APPLICATION_NAME, "KafkaOutputBenchmark");
- BenchmarkPartitionableKafkaOutputOperator bpkoo = dag.addOperator("KafkaBenchmarkProducer", BenchmarkPartitionableKafkaOutputOperator.class);
+ BenchmarkPartitionableKafkaOutputOperator bpkoo = dag.addOperator(
+ "KafkaBenchmarkProducer", BenchmarkPartitionableKafkaOutputOperator.class);
bpkoo.setBrokerList(conf.get("kafka.brokerlist"));
bpkoo.setPartitionCount(2);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
index 1d22613..65601d5 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
@@ -21,7 +21,6 @@ package com.datatorrent.benchmark.kafka;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
-
/**
* A simple partitioner class for test purpose
* Key is a int string
@@ -32,12 +31,14 @@ import kafka.utils.VerifiableProperties;
*/
public class KafkaTestPartitioner implements Partitioner
{
- public KafkaTestPartitioner (VerifiableProperties props) {
+ public KafkaTestPartitioner(VerifiableProperties props)
+ {
}
+
@Override
public int partition(Object key, int num_Partitions)
{
- return Integer.parseInt((String)key)%num_Partitions;
+ return Integer.parseInt((String)key) % num_Partitions;
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
index bc23404..b86cd01 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
@@ -18,17 +18,20 @@
*/
package com.datatorrent.benchmark.script;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG.Locality;
+
+import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+
import com.datatorrent.benchmark.RandomMapOutput;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.contrib.ruby.RubyOperator;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
/**
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
index 07ab02e..7c45106 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
@@ -169,7 +169,8 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec
long countInPeriod = totalCount - lastTotalCount;
long timeInPeriod = System.currentTimeMillis() - lastLogTime;
long totalTime = System.currentTimeMillis() - beginTime;
- logger.info("Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}",
+ logger.info(
+ "Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}",
totalCount, countInPeriod, totalCount * 1000 / totalTime, countInPeriod * 1000 / timeInPeriod);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
index ae5ba40..2dc6f0d 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
@@ -98,7 +98,8 @@ public class ManagedStateBenchmarkApp implements StreamingApplication
public static class TestGenerator extends BaseOperator implements InputOperator
{
- public final transient DefaultOutputPort<KeyValPair<byte[], byte[]>> data = new DefaultOutputPort<KeyValPair<byte[], byte[]>>();
+ public final transient DefaultOutputPort<KeyValPair<byte[], byte[]>> data =
+ new DefaultOutputPort<KeyValPair<byte[], byte[]>>();
int emitBatchSize = 1000;
byte[] val = ByteBuffer.allocate(1000).putLong(1234).array();
int rate = 20000;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
index 74ba658..60a775c 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
@@ -69,7 +69,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
private ExecMode execMode = ExecMode.INSERT;
private int timeRange = 1000 * 60;
- public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = new DefaultInputPort<KeyValPair<byte[], byte[]>>()
+ public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input =
+ new DefaultInputPort<KeyValPair<byte[], byte[]>>()
{
@Override
public void process(KeyValPair<byte[], byte[]> tuple)
@@ -172,7 +173,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
private final int taskBarrier = 100000;
/**
- * This method first send request of get to the state manager, then handle all the task(get) which already done and update the value.
+ * This method first send request of get to the state manager,
+ * then handle all the task(get) which already done and update the value.
* @param tuple
*/
private void updateAsync(KeyValPair<byte[], byte[]> tuple)
@@ -251,7 +253,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
long spentTime = now - statisticsBeginTime;
long totalSpentTime = now - applicationBeginTime;
totalTupleCount += tupleCount;
- logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}", windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime,
+ logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}",
+ windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime,
totalTupleCount * 1000 / totalSpentTime);
statisticsBeginTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
index e0ee160..b0b7314 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
@@ -18,13 +18,14 @@
*/
package com.datatorrent.benchmark.stream;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNullCounter;
-import org.apache.hadoop.conf.Configuration;
/**
*
@@ -56,11 +57,11 @@ public class DevNullCounterBenchmark implements StreamingApplication
@Override
public void populateDAG(DAG dag, Configuration conf)
{
- // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
- // rand.setMinvalue(0);
- // rand.setMaxvalue(999999);
- // rand.setTuplesBlastIntervalMillis(50);
- // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+ // rand.setMinvalue(0);
+ // rand.setMaxvalue(999999);
+ // rand.setTuplesBlastIntervalMillis(50);
+ // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator());
DevNullCounter oper = dag.addOperator("oper", new DevNullCounter());
dag.getMeta(oper).getMeta(oper.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
index ff6ed76..c716206 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
@@ -35,37 +35,42 @@ public class IntegerOperator implements InputOperator
* Output port which emits integer.
*/
public final transient DefaultOutputPort<Integer> integer_data = new DefaultOutputPort<Integer>();
+
@Override
public void emitTuples()
{
Integer i = 21;
- for(int j=0;j<1000;j++){
- integer_data.emit(i);
+ for (int j = 0; j < 1000; j++) {
+ integer_data.emit(i);
}
}
@Override
public void beginWindow(long windowId)
{
- //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ //throw new UnsupportedOperationException("Not supported yet.");
+ // To change body of generated methods, choose Tools | Templates.
}
@Override
public void endWindow()
{
- //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ //throw new UnsupportedOperationException("Not supported yet.");
+ // To change body of generated methods, choose Tools | Templates.
}
@Override
public void setup(OperatorContext context)
{
- //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ //throw new UnsupportedOperationException("Not supported yet.");
+ // To change body of generated methods, choose Tools | Templates.
}
@Override
public void teardown()
{
- //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ //throw new UnsupportedOperationException("Not supported yet.");
+ // To change body of generated methods, choose Tools | Templates.
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
index 951b44b..2e5bcf9 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
@@ -18,6 +18,8 @@
*/
package com.datatorrent.benchmark.stream;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -25,7 +27,6 @@ import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.stream.StreamDuplicater;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for StreamDuplicater Operator.
@@ -36,25 +37,25 @@ import org.apache.hadoop.conf.Configuration;
@ApplicationAnnotation(name = "StreamDuplicaterApp")
public class StreamDuplicaterApp implements StreamingApplication
{
- private final Locality locality = null;
- public static final int QUEUE_CAPACITY = 16 * 1024;
+ private final Locality locality = null;
+ public static final int QUEUE_CAPACITY = 16 * 1024;
@Override
public void populateDAG(DAG dag, Configuration conf)
{
- // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
- // rand.setMinvalue(0);
- // rand.setMaxvalue(999999);
- // rand.setTuplesBlastIntervalMillis(50);
- // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+ // rand.setMinvalue(0);
+ // rand.setMaxvalue(999999);
+ // rand.setTuplesBlastIntervalMillis(50);
+ // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator());
StreamDuplicater stream = dag.addOperator("stream", new StreamDuplicater());
dag.getMeta(stream).getMeta(stream.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
dag.addStream("streamdup1", intInput.integer_data, stream.data).setLocality(locality);
DevNull<Integer> dev1 = dag.addOperator("dev1", new DevNull());
DevNull<Integer> dev2 = dag.addOperator("dev2", new DevNull());
- dag.addStream("streamdup2",stream.out1,dev1.data).setLocality(locality);
- dag.addStream("streamdup3",stream.out2,dev2.data).setLocality(locality);
+ dag.addStream("streamdup2", stream.out1, dev1.data).setLocality(locality);
+ dag.addStream("streamdup3", stream.out2, dev2.data).setLocality(locality);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
index d90320c..bb1d081 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
@@ -18,6 +18,8 @@
*/
package com.datatorrent.benchmark.stream;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -25,7 +27,6 @@ import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.benchmark.WordCountOperator;
import com.datatorrent.lib.stream.StreamMerger;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for StreamMerge Operator.
@@ -46,7 +47,7 @@ public class StreamMergeApp implements StreamingApplication
StreamMerger stream = dag.addOperator("stream", new StreamMerger());
dag.getMeta(stream).getMeta(stream.data1).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
dag.getMeta(stream).getMeta(stream.data2).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
- dag.addStream("streammerge1", intInput.integer_data, stream.data1,stream.data2).setLocality(locality);
+ dag.addStream("streammerge1", intInput.integer_data, stream.data1, stream.data2).setLocality(locality);
WordCountOperator<Integer> counter = dag.addOperator("counter", new WordCountOperator<Integer>());
dag.getMeta(counter).getMeta(counter.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
index 419de18..b1ddbee 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
@@ -18,6 +18,11 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -25,9 +30,6 @@ import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.testbench.EventClassifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for EventClassifier Operator.
@@ -75,7 +77,8 @@ public class EventClassifierApp implements StreamingApplication
eventClassifier.setKeyMap(keymap);
eventClassifier.setOperationReplace();
eventClassifier.setKeyWeights(wmap);
- dag.getMeta(eventClassifier).getMeta(eventClassifier.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ dag.getMeta(eventClassifier).getMeta(eventClassifier.data).getAttributes()
+ .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
dag.addStream("eventtest1", hmapOper.hmap_data, eventClassifier.event).setLocality(locality);
DevNull<HashMap<String, Double>> dev = dag.addOperator("dev", new DevNull());
dag.addStream("eventtest2", eventClassifier.data, dev.data).setLocality(locality);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
index a49b30e..5fe478b 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
@@ -18,6 +18,10 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -26,8 +30,6 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.benchmark.WordCountOperator;
import com.datatorrent.benchmark.stream.IntegerOperator;
import com.datatorrent.lib.testbench.EventClassifierNumberToHashDouble;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for EventClassifierNumberToHashDouble Operator.
@@ -44,10 +46,14 @@ public class EventClassifierNumberToHashDoubleApp implements StreamingApplicatio
@Override
public void populateDAG(DAG dag, Configuration conf)
{
- WordCountOperator<HashMap<String, Double>> counterString = dag.addOperator("counterString", new WordCountOperator<HashMap<String, Double>>());
- dag.getMeta(counterString).getMeta(counterString.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
- EventClassifierNumberToHashDouble eventClassify = dag.addOperator("eventClassify", new EventClassifierNumberToHashDouble());
- dag.getMeta(eventClassify).getMeta(eventClassify.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ WordCountOperator<HashMap<String, Double>> counterString =
+ dag.addOperator("counterString", new WordCountOperator<HashMap<String, Double>>());
+ dag.getMeta(counterString).getMeta(counterString.input).getAttributes()
+ .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ EventClassifierNumberToHashDouble eventClassify =
+ dag.addOperator("eventClassify", new EventClassifierNumberToHashDouble());
+ dag.getMeta(eventClassify).getMeta(eventClassify.data)
+ .getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator());
dag.addStream("eventclassifier2", intInput.integer_data, eventClassify.event).setLocality(locality);
dag.addStream("eventclassifier1", eventClassify.data, counterString.input).setLocality(locality);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
index 3025c7e..8f28ae6 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
@@ -18,6 +18,10 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -25,8 +29,6 @@ import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.testbench.EventGenerator;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for EventGenerator Operator.
@@ -44,7 +46,8 @@ public class EventGeneratorApp implements StreamingApplication
public void populateDAG(DAG dag, Configuration conf)
{
EventGenerator eventGenerator = dag.addOperator("eventGenerator", new EventGenerator());
- dag.getMeta(eventGenerator).getMeta(eventGenerator.count).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ dag.getMeta(eventGenerator).getMeta(eventGenerator.count).getAttributes()
+ .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
DevNull<String> devString = dag.addOperator("devString", new DevNull());
DevNull<HashMap<String, Double>> devMap = dag.addOperator("devMap", new DevNull());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
index ea05d07..e562224 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
@@ -18,15 +18,17 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.testbench.EventIncrementer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for EventIncrementer Operator.
@@ -39,6 +41,7 @@ public class EventIncrementerApp implements StreamingApplication
{
private final Locality locality = null;
public static final int QUEUE_CAPACITY = 16 * 1024;
+
@Override
public void populateDAG(DAG dag, Configuration conf)
{
@@ -55,15 +58,14 @@ public class EventIncrementerApp implements StreamingApplication
eventInc.setKeylimits(keys, low, high);
eventInc.setDelta(1);
HashMapOperator hmapOper = dag.addOperator("hmapOper", new HashMapOperator());
- dag.addStream("eventIncInput1",hmapOper.hmapList_data,eventInc.seed);
- dag.addStream("eventIncInput2",hmapOper.hmapMap_data,eventInc.increment);
- DevNull<HashMap<String,Integer>> dev1= dag.addOperator("dev1", new DevNull());
- DevNull<HashMap<String,String>> dev2= dag.addOperator("dev2", new DevNull());
- dag.addStream("eventIncOutput1",eventInc.count,dev1.data).setLocality(locality);
- dag.addStream("eventIncOutput2",eventInc.data,dev2.data).setLocality(locality);
+ dag.addStream("eventIncInput1", hmapOper.hmapList_data, eventInc.seed);
+ dag.addStream("eventIncInput2", hmapOper.hmapMap_data, eventInc.increment);
+ DevNull<HashMap<String, Integer>> dev1 = dag.addOperator("dev1", new DevNull());
+ DevNull<HashMap<String, String>> dev2 = dag.addOperator("dev2", new DevNull());
+ dag.addStream("eventIncOutput1", eventInc.count, dev1.data).setLocality(locality);
+ dag.addStream("eventIncOutput2", eventInc.data, dev2.data).setLocality(locality);
}
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
index 915e6f0..ea2943f 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
@@ -18,15 +18,17 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.testbench.FilterClassifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for FilterClassifier Operator.
@@ -39,6 +41,7 @@ public class FilterClassifierApp implements StreamingApplication
{
private final Locality locality = null;
public static final int QUEUE_CAPACITY = 16 * 1024;
+
@Override
public void populateDAG(DAG dag, Configuration conf)
{
@@ -80,9 +83,9 @@ public class FilterClassifierApp implements StreamingApplication
filter.setTotalFilter(100);
HashMapOperator hmapOper = dag.addOperator("hmapOper", new HashMapOperator());
- DevNull<HashMap<String,Double>> dev = dag.addOperator("dev", new DevNull());
- dag.addStream("filter1",hmapOper.hmap_data,filter.data).setLocality(locality);
- dag.addStream("filer2",filter.filter,dev.data).setLocality(locality);
+ DevNull<HashMap<String, Double>> dev = dag.addOperator("dev", new DevNull());
+ dag.addStream("filter1", hmapOper.hmap_data, filter.data).setLocality(locality);
+ dag.addStream("filer2", filter.filter, dev.data).setLocality(locality);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
index c3d996e..52c0bed 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
@@ -18,15 +18,17 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.testbench.FilteredEventClassifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for FilteredEventClassifier Operator.