You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/01/22 19:24:06 UTC
[2/3] apex-malhar git commit: fixing all checkstyle violations,
delete maxAllowedViolations from pom
fixing all checkstyle violations,
delete maxAllowedViolations from pom
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5528a4c6
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5528a4c6
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5528a4c6
Branch: refs/heads/master
Commit: 5528a4c639a87dbfaba4a2bae68ac99971c66224
Parents: 4cbbb75
Author: Apex Dev <de...@apex.apache.org>
Authored: Wed Jan 18 14:43:33 2017 -0800
Committer: Oliver W <ol...@datatorrent.com>
Committed: Fri Jan 20 11:12:20 2017 -0800
----------------------------------------------------------------------
benchmark/pom.xml | 8 ----
.../datatorrent/benchmark/ApplicationFixed.java | 15 +++---
.../com/datatorrent/benchmark/Benchmark.java | 17 +++----
.../benchmark/CouchBaseAppInput.java | 6 +--
.../benchmark/CouchBaseAppOutput.java | 8 ++--
.../benchmark/CouchBaseInputOperator.java | 17 ++++---
.../benchmark/FixedTuplesInputOperator.java | 11 +++--
.../datatorrent/benchmark/RandomMapOutput.java | 40 +++++++++-------
.../benchmark/RandomWordInputModule.java | 11 +++--
.../benchmark/WordCountOperator.java | 10 ++--
.../AerospikeOutputBenchmarkApplication.java | 17 ++++---
.../aerospike/AerospikeOutputOperator.java | 13 ++++--
.../UniqueValueCountBenchmarkApplication.java | 21 +++++----
.../CassandraOutputBenchmarkApplication.java | 17 ++++---
.../cassandra/CassandraOutputOperator.java | 14 +++---
.../benchmark/fs/FSByteOutputOperator.java | 9 ++--
.../benchmark/fs/FSOutputOperatorBenchmark.java | 26 +++++++----
.../hive/HiveInsertBenchmarkingApp.java | 28 ++++++-----
.../hive/HiveMapInsertBenchmarkingApp.java | 31 +++++++------
.../kafka/BenchmarkKafkaInputOperator.java | 6 +--
...nchmarkPartitionableKafkaOutputOperator.java | 44 +++++++++---------
.../benchmark/kafka/KafkaInputBenchmark.java | 23 ++++-----
.../benchmark/kafka/KafkaOutputBenchmark.java | 5 +-
.../benchmark/kafka/KafkaTestPartitioner.java | 7 +--
.../RubyOperatorBenchmarkApplication.java | 11 +++--
.../spillable/SpillableTestOperator.java | 3 +-
.../state/ManagedStateBenchmarkApp.java | 3 +-
.../benchmark/state/StoreOperator.java | 9 ++--
.../stream/DevNullCounterBenchmark.java | 13 +++---
.../benchmark/stream/IntegerOperator.java | 17 ++++---
.../benchmark/stream/StreamDuplicaterApp.java | 21 +++++----
.../benchmark/stream/StreamMergeApp.java | 5 +-
.../benchmark/testbench/EventClassifierApp.java | 11 +++--
.../EventClassifierNumberToHashDoubleApp.java | 18 ++++---
.../benchmark/testbench/EventGeneratorApp.java | 9 ++--
.../testbench/EventIncrementerApp.java | 22 +++++----
.../testbench/FilterClassifierApp.java | 15 +++---
.../testbench/FilteredEventClassifierApp.java | 8 ++--
.../benchmark/testbench/HashMapOperator.java | 37 +++++++++------
.../testbench/RandomEventGeneratorApp.java | 8 ++--
.../testbench/SeedEventGeneratorApp.java | 14 ++++--
.../testbench/ThroughputCounterApp.java | 13 ++++--
.../AbstractWindowedOperatorBenchmarkApp.java | 10 ++--
.../KeyedWindowedOperatorBenchmarkApp.java | 9 ++--
.../window/WindowedOperatorBenchmarkApp.java | 6 ++-
.../benchmark/ApplicationFixedTest.java | 17 ++++---
.../datatorrent/benchmark/BenchmarkTest.java | 3 +-
.../benchmark/CouchBaseBenchmarkTest.java | 13 +++---
.../benchmark/accumulo/AccumuloApp.java | 17 ++++---
.../benchmark/accumulo/AccumuloAppTest.java | 8 ++--
.../aerospike/AerospikeBenchmarkAppTest.java | 7 +--
.../algo/UniqueValueCountBenchmarkTest.java | 6 ++-
.../cassandra/CassandraApplicatonTest.java | 8 ++--
.../benchmark/hbase/HBaseApplicationTest.java | 9 ++--
.../hbase/HBaseCsvMappingApplication.java | 7 +--
.../benchmark/hive/HiveInsertBenchmarkTest.java | 37 +++++++++------
.../benchmark/hive/HiveMapBenchmarkTest.java | 38 ++++++++-------
.../kafka/KafkaInputBenchmarkTest.java | 3 +-
.../kafka/KafkaOutputBenchmarkTest.java | 4 +-
.../benchmark/memsql/MemsqlInputBenchmark.java | 21 +++++----
.../memsql/MemsqlInputBenchmarkTest.java | 49 ++++++++++++--------
.../benchmark/memsql/MemsqlOutputBenchmark.java | 26 ++++++-----
.../memsql/MemsqlOutputBenchmarkTest.java | 21 +++++----
.../script/RubyOperatorBenchmarkAppTest.java | 7 ++-
.../spillable/SpillableDSBenchmarkTest.java | 6 +--
.../state/ManagedStateBenchmarkAppTest.java | 3 --
.../testbench/EventClassifierAppTest.java | 12 +++--
...ventClassifierNumberToHashDoubleAppTest.java | 12 +++--
.../testbench/EventGeneratorAppTest.java | 12 +++--
.../testbench/EventIncrementerAppTest.java | 12 +++--
.../testbench/FilterClassifierAppTest.java | 10 ++--
.../FilteredEventClassifierAppTest.java | 12 +++--
.../testbench/ThroughputCounterAppTest.java | 12 +++--
.../util/serde/GenericSerdePerformanceTest.java | 7 +--
74 files changed, 608 insertions(+), 457 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index b2e0981..4bbd5ac 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -143,14 +143,6 @@
<skip>true</skip>
</configuration>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <configuration>
- <maxAllowedViolations>281</maxAllowedViolations>
- <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole>
- </configuration>
- </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java b/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
index 53f01fc..aa10eea 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java
@@ -18,13 +18,15 @@
*/
package com.datatorrent.benchmark;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.DAG;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
+
+import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.api.StreamingApplication;
-import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
/**
* Example of application configuration in Java.
@@ -34,7 +36,7 @@ import org.apache.hadoop.conf.Configuration;
*
* @since 0.3.2
*/
-@ApplicationAnnotation(name="PerformanceBenchmarkForFixedNumberOfTuples")
+@ApplicationAnnotation(name = "PerformanceBenchmarkForFixedNumberOfTuples")
public class ApplicationFixed implements StreamingApplication
{
private final Locality locality = null;
@@ -44,7 +46,8 @@ public class ApplicationFixed implements StreamingApplication
public void populateDAG(DAG dag, Configuration conf)
{
FixedTuplesInputOperator wordGenerator = dag.addOperator("WordGenerator", FixedTuplesInputOperator.class);
- dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes()
+ .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
wordGenerator.setCount(500000);
WordCountOperator<byte[]> counter = dag.addOperator("Counter", new WordCountOperator<byte[]>());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
index 5649914..d8d51b8 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java
@@ -52,10 +52,10 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
* @since 0.9.0
*/
-@ApplicationAnnotation(name="PerformanceBenchmarkingApp")
+@ApplicationAnnotation(name = "PerformanceBenchmarkingApp")
public abstract class Benchmark
{
- static abstract class AbstractApplication implements StreamingApplication
+ abstract static class AbstractApplication implements StreamingApplication
{
public static final int QUEUE_CAPACITY = 32 * 1024;
@@ -63,7 +63,8 @@ public abstract class Benchmark
public void populateDAG(DAG dag, Configuration conf)
{
RandomWordInputModule wordGenerator = dag.addOperator("wordGenerator", RandomWordInputModule.class);
- dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes()
+ .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
WordCountOperator<byte[]> counter = dag.addOperator("counter", new WordCountOperator<byte[]>());
dag.getMeta(counter).getMeta(counter.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
@@ -77,7 +78,7 @@ public abstract class Benchmark
/**
* Let the engine decide how to best place the 2 operators.
*/
- @ApplicationAnnotation(name="PerformanceBenchmarkNoLocality")
+ @ApplicationAnnotation(name = "PerformanceBenchmarkNoLocality")
public static class NoLocality extends AbstractApplication
{
@Override
@@ -92,7 +93,7 @@ public abstract class Benchmark
* Place the 2 operators so that they are in the same Rack.
* The operators are requested to be deployed on different machines.
*/
- @ApplicationAnnotation(name="PerformanceBenchmarkRackLocal")
+ @ApplicationAnnotation(name = "PerformanceBenchmarkRackLocal")
public static class RackLocal extends AbstractApplication
{
@Override
@@ -107,7 +108,7 @@ public abstract class Benchmark
* Place the 2 operators so that they are in the same node.
* The operators are requested to be deployed as different processes within the same machine.
*/
- @ApplicationAnnotation(name="PerformanceBenchmarkNodeLocal")
+ @ApplicationAnnotation(name = "PerformanceBenchmarkNodeLocal")
public static class NodeLocal extends AbstractApplication
{
@Override
@@ -122,7 +123,7 @@ public abstract class Benchmark
* Place the 2 operators so that they are in the same container.
* The operators are deployed as different threads in the same process.
*/
- @ApplicationAnnotation(name="PerformanceBenchmarkContainerLocal")
+ @ApplicationAnnotation(name = "PerformanceBenchmarkContainerLocal")
public static class ContainerLocal extends AbstractApplication
{
@Override
@@ -136,7 +137,7 @@ public abstract class Benchmark
/**
* Place the 2 operators so that they are in the same thread.
*/
- @ApplicationAnnotation(name="PerformanceBenchmarkThreadLocal")
+ @ApplicationAnnotation(name = "PerformanceBenchmarkThreadLocal")
public static class ThreadLocal extends AbstractApplication
{
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
index 6096530..bf5b876 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java
@@ -18,14 +18,14 @@
*/
package com.datatorrent.benchmark;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
/**
- *
* Application to benchmark the performance of couchbase input operator.
* The number of tuples processed per second were around 9000.
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
index f789d08..4f12791 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
@@ -18,12 +18,14 @@
*/
package com.datatorrent.benchmark;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
+
+import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
/**
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
index 923b588..8ae0a94 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java
@@ -18,12 +18,14 @@
*/
package com.datatorrent.benchmark;
-import com.datatorrent.contrib.couchbase.AbstractCouchBaseInputOperator;
-import com.datatorrent.contrib.couchbase.CouchBaseWindowStore;
import java.util.ArrayList;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.contrib.couchbase.AbstractCouchBaseInputOperator;
+import com.datatorrent.contrib.couchbase.CouchBaseWindowStore;
+
/**
* <p>CouchBaseInputOperator class.</p>
*
@@ -32,14 +34,15 @@ import org.slf4j.LoggerFactory;
public class CouchBaseInputOperator extends AbstractCouchBaseInputOperator<String>
{
private static final Logger logger = LoggerFactory.getLogger(CouchBaseWindowStore.class);
+
@Override
public String getTuple(Object object)
{
- if(object!=null)
- return object.toString();
- else{
- logger.info("Object returned is null");
- return "null";
+ if (object != null) {
+ return object.toString();
+ } else {
+ logger.info("Object returned is null");
+ return "null";
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
index ad7f8c1..f2582bd 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java
@@ -18,14 +18,15 @@
*/
package com.datatorrent.benchmark;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Context.OperatorContext;
+import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
+import com.datatorrent.api.Context.OperatorContext;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
/**
* <p>FixedTuplesInputOperator class.</p>
@@ -44,7 +45,7 @@ public class FixedTuplesInputOperator implements InputOperator
{
if (firstTime) {
long start = System.currentTimeMillis();
- for (int i = count; i-- > 0;) {
+ for (int i = count; i-- > 0; ) {
output.emit(new byte[64]);
}
firstTime = false;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java b/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
index 106bd79..3342771 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java
@@ -19,20 +19,23 @@
package com.datatorrent.benchmark;
import java.util.HashMap;
-import com.datatorrent.common.util.BaseOperator;
+
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
/**
* Operator that outputs random values in a map.
*
* @since 1.0.4
*/
-public class RandomMapOutput extends BaseOperator {
+public class RandomMapOutput extends BaseOperator
+{
- public final transient DefaultOutputPort<HashMap<String, Object>> map_data = new DefaultOutputPort<HashMap<String, Object>>();
+ public final transient DefaultOutputPort<HashMap<String, Object>> map_data =
+ new DefaultOutputPort<HashMap<String, Object>>();
public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
- {
+ {
@Override
public void process(Integer tuple)
{
@@ -40,22 +43,25 @@ public class RandomMapOutput extends BaseOperator {
map.put(key, tuple);
RandomMapOutput.this.process(map);
}
- };
+ };
- private String key;
+ private String key;
- public String getKey() {
- return key;
- }
+ public String getKey()
+ {
+ return key;
+ }
- public void setKey(String key) {
- this.key = key;
- }
+ public void setKey(String key)
+ {
+ this.key = key;
+ }
- public void process(HashMap<String, Object> tuple) {
+ public void process(HashMap<String, Object> tuple)
+ {
- if (map_data.isConnected()) {
- map_data.emit(tuple);
- }
- }
+ if (map_data.isConnected()) {
+ map_data.emit(tuple);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java b/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
index 11c7568..7d02de2 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java
@@ -18,10 +18,12 @@
*/
package com.datatorrent.benchmark;
+import javax.validation.constraints.Min;
+
+import com.datatorrent.api.Context.OperatorContext;
+
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Context.OperatorContext;
-import javax.validation.constraints.Min;
/**
* <p>
@@ -87,7 +89,6 @@ public class RandomWordInputModule implements InputOperator
return emitSameTuple;
}
-
/**
* Emits byte array of specified size.
* Emits either the same byte array or creates new byte array every time
@@ -103,11 +104,11 @@ public class RandomWordInputModule implements InputOperator
final boolean EMIT_SAME_TUPLE_COPY = emitSameTuple;
if (firstTime) {
if (EMIT_SAME_TUPLE_COPY) {
- for (int i = count--; i-- > 0;) {
+ for (int i = count--; i-- > 0; ) {
output.emit(sameTupleArray);
}
} else {
- for (int i = count--; i-- > 0;) {
+ for (int i = count--; i-- > 0; ) {
output.emit(new byte[TUPLE_SIZE_COPY]);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
index 098ed42..6e91482 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java
@@ -21,14 +21,17 @@ package com.datatorrent.benchmark;
/*
* To change this template, choose Tools | Templates and open the template in the editor.
*/
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Context.OperatorContext;
import java.util.ArrayList;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.api.Context.OperatorContext;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
/**
* <p>WordCountOperator class.</p>
*
@@ -84,5 +87,6 @@ public class WordCountOperator<T> implements Operator
counts = new ArrayList<Integer>();
millis = new ArrayList<Integer>();
}
+
private static final Logger logger = LoggerFactory.getLogger(WordCountOperator.class);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
index a70aae6..0a880fd 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java
@@ -18,15 +18,16 @@
*/
package com.datatorrent.benchmark.aerospike;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.aerospike.AerospikeTransactionalStore;
import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.hadoop.conf.Configuration;
+
/**
- *
* Application to benchmark the performance of aerospike output operator.
* The operator was tested on DT cluster and the number of tuples processed
* by the operator per second were around 12,000
@@ -34,16 +35,18 @@ import org.apache.hadoop.conf.Configuration;
* @since 1.0.4
*/
-
-@ApplicationAnnotation(name="AerospikeOutputOperatorBenchmark")
-public class AerospikeOutputBenchmarkApplication implements StreamingApplication {
+@ApplicationAnnotation(name = "AerospikeOutputOperatorBenchmark")
+public class AerospikeOutputBenchmarkApplication implements StreamingApplication
+{
private final String NODE = "127.0.0.1";
private final int PORT = 3000;
private final String NAMESPACE = "test";
private final Locality locality = null;
+
@Override
- public void populateDAG(DAG dag, Configuration conf) {
+ public void populateDAG(DAG dag, Configuration conf)
+ {
RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
rand.setMaxvalue(3000);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
index 210e086..f9ee689 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java
@@ -23,25 +23,28 @@ import java.util.List;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
-import com.datatorrent.contrib.aerospike.AbstractAerospikeTransactionalPutOperator;
+import com.datatorrent.contrib.aerospike.AbstractAerospikeTransactionalPutOperator;
/**
* <p>AerospikeOutputOperator class.</p>
*
* @since 1.0.4
*/
-public class AerospikeOutputOperator extends AbstractAerospikeTransactionalPutOperator<Integer>{
+public class AerospikeOutputOperator extends AbstractAerospikeTransactionalPutOperator<Integer>
+{
private final String KEYSPACE = "test";
private final String SET_NAME = "Aerospike_Output";
private int id = 0;
+
@Override
protected Key getUpdatedBins(Integer tuple, List<Bin> bins)
- throws AerospikeException {
+ throws AerospikeException
+ {
- Key key = new Key(KEYSPACE,SET_NAME,id++);
- bins.add(new Bin("ID",tuple));
+ Key key = new Key(KEYSPACE, SET_NAME, id++);
+ bins.add(new Bin("ID", tuple));
return key;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
index f522396..f74311e 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
@@ -18,23 +18,22 @@
*/
package com.datatorrent.benchmark.algo;
-
import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+
import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.lib.stream.Counter;
import com.datatorrent.lib.testbench.RandomEventGenerator;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
/**
* Application to demonstrate PartitionableUniqueCount operator. <br>
* The input operator generate random keys, which is sent to
@@ -63,9 +62,11 @@ public class UniqueValueCountBenchmarkApplication implements StreamingApplicatio
/* Initialize with three partition to start with */
UniqueCounter<Integer> uniqCount = dag.addOperator("uniqevalue", new UniqueCounter<Integer>());
- MapToKeyHashValuePairConverter<Integer, Integer> converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter());
+ MapToKeyHashValuePairConverter<Integer, Integer> converter =
+ dag.addOperator("converter", new MapToKeyHashValuePairConverter());
- dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<UniqueCounter<Integer>>(3));
+ dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER,
+ new StatelessPartitioner<UniqueCounter<Integer>>(3));
dag.setInputPortAttribute(uniqCount.data, Context.PortContext.PARTITION_PARALLEL, true);
uniqCount.setCumulative(false);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
index dead2cd..46d503f 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java
@@ -19,24 +19,27 @@
package com.datatorrent.benchmark.cassandra;
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
+
import com.datatorrent.api.annotation.ApplicationAnnotation;
+
import com.datatorrent.contrib.cassandra.CassandraTransactionalStore;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
/**
- *
- *Application to benchmark the performance of cassandra output operator.
- *The operator was tested on following configuration:
- *Virtual Box with 10GB ram, 4 processor cores on an i7 machine with 16GB ram
- *The number of tuples processed per second were around 20,000
+ * Application to benchmark the performance of cassandra output operator.
+ * The operator was tested on following configuration:
+ * Virtual Box with 10GB ram, 4 processor cores on an i7 machine with 16GB ram
+ * The number of tuples processed per second were around 20,000
*
* @since 1.0.3
*/
-@ApplicationAnnotation(name="CassandraOperatorDemo")
+@ApplicationAnnotation(name = "CassandraOperatorDemo")
public class CassandraOutputBenchmarkApplication implements StreamingApplication
{
private final Locality locality = null;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
index 666746b..592d8a2 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
@@ -18,34 +18,36 @@
*/
package com.datatorrent.benchmark.cassandra;
-
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
-import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator;
+import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator;
/**
* <p>CassandraOutputOperator class.</p>
*
* @since 1.0.3
*/
-public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperator<Integer>{
+public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperator<Integer>
+{
private int id = 0;
@Override
- protected PreparedStatement getUpdateCommand() {
+ protected PreparedStatement getUpdateCommand()
+ {
String statement = "Insert into test.cassandra_operator(id, result) values (?,?);";
return store.getSession().prepare(statement);
}
@Override
protected Statement setStatementParameters(PreparedStatement updateCommand,
- Integer tuple) throws DriverException {
+ Integer tuple) throws DriverException
+ {
BoundStatement boundStmnt = new BoundStatement(updateCommand);
- return boundStmnt.bind(id++,tuple);
+ return boundStmnt.bind(id++, tuple);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
index 894cb75..ce0821c 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java
@@ -18,10 +18,12 @@
*/
package com.datatorrent.benchmark.fs;
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import java.util.Arrays;
+
import javax.validation.constraints.Min;
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
/**
* This output operator receives
*
@@ -38,19 +40,20 @@ public class FSByteOutputOperator extends AbstractFileOutputOperator<byte[]>
/**
* The file a tuple is written out to is determined by modding the hashcode of the
* tuple by the outputFileCount.
+ *
* @param tuple The input tuple to write out.
* @return The name of the file to write the tuple to.
*/
@Override
protected String getFileName(byte[] tuple)
{
- return ((Integer) (Arrays.hashCode(tuple) % outputFileCount)).toString();
+ return ((Integer)(Arrays.hashCode(tuple) % outputFileCount)).toString();
}
@Override
protected byte[] getBytesForTuple(byte[] tuple)
{
- for(int counter = 0;
+ for (int counter = 0;
counter < tuple.length;
counter++) {
tuple[counter] += 1;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
index 8702ab8..7a63d18 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java
@@ -18,17 +18,20 @@
*/
package com.datatorrent.benchmark.fs;
-import com.datatorrent.lib.testbench.RandomWordGenerator;
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
+
+import com.datatorrent.api.StreamingApplication;
+
import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.counters.BasicCounters;
-import org.apache.commons.lang.mutable.MutableLong;
+import com.datatorrent.lib.counters.BasicCounters;
-import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.lib.testbench.RandomWordGenerator;
/**
* Application used to benchmark HDFS output operator
@@ -38,25 +41,28 @@ import org.apache.hadoop.conf.Configuration;
* @since 0.9.4
*/
-@ApplicationAnnotation(name="HDFSOutputOperatorBenchmarkingApp")
+@ApplicationAnnotation(name = "HDFSOutputOperatorBenchmarkingApp")
public class FSOutputOperatorBenchmark implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
String filePath = "HDFSOutputOperatorBenchmarkingApp/"
- + System.currentTimeMillis();
+ + System.currentTimeMillis();
dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
RandomWordGenerator wordGenerator = dag.addOperator("wordGenerator", RandomWordGenerator.class);
- dag.getOperatorMeta("wordGenerator").getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, 10000);
- dag.getOperatorMeta("wordGenerator").getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, 1);
+ dag.getOperatorMeta("wordGenerator").getMeta(wordGenerator.output)
+ .getAttributes().put(PortContext.QUEUE_CAPACITY, 10000);
+ dag.getOperatorMeta("wordGenerator").getAttributes()
+ .put(OperatorContext.APPLICATION_WINDOW_COUNT, 1);
FSByteOutputOperator hdfsOutputOperator = dag.addOperator("hdfsOutputOperator", new FSByteOutputOperator());
hdfsOutputOperator.setFilePath(filePath);
- dag.getOperatorMeta("hdfsOutputOperator").getAttributes().put(OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
+ dag.getOperatorMeta("hdfsOutputOperator").getAttributes()
+ .put(OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>());
dag.addStream("Generator2HDFSOutput", wordGenerator.output, hdfsOutputOperator.input);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
index 60be57d..95fa961 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java
@@ -30,16 +30,18 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator;
-import com.datatorrent.contrib.hive.HiveOperator;
-import com.datatorrent.contrib.hive.HiveStore;
-
import com.datatorrent.api.Context.OperatorContext;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
+
import com.datatorrent.api.StreamingApplication;
+
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator;
+import com.datatorrent.contrib.hive.HiveOperator;
+import com.datatorrent.contrib.hive.HiveStore;
/**
* Application used to benchmark HIVE Insert operator
@@ -79,13 +81,14 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication
{
HiveStore store = new HiveStore();
store.setDatabaseUrl(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
- store.setConnectionProperties(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+ store.setConnectionProperties(conf.get(
+ "dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
store.setFilepath(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
try {
- hiveInitializeDatabase(store, conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
- }
- catch (SQLException ex) {
+ hiveInitializeDatabase(store, conf.get(
+ "dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
+ } catch (SQLException ex) {
LOG.debug(ex.getMessage());
}
@@ -109,8 +112,9 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication
{
hiveStore.connect();
Statement stmt = hiveStore.getConnection().createStatement();
- stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 string) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n"
- + "STORED AS TEXTFILE ");
+ stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename
+ + " (col1 string) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n"
+ + "STORED AS TEXTFILE ");
hiveStore.disconnect();
}
@@ -171,8 +175,8 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication
{
for (;
- tupleCounter < tuplesPerWindow;
- tupleCounter++) {
+ tupleCounter < tuplesPerWindow;
+ tupleCounter++) {
String output = "2014-12-1" + random.nextInt(10) + "";
outputString.emit(output);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
index cfbbfc5..98d9ce3 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java
@@ -24,23 +24,23 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
-
-import com.datatorrent.contrib.hive.*;
-
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
-
import com.datatorrent.benchmark.RandomMapOutput;
+import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator;
+import com.datatorrent.contrib.hive.HiveOperator;
+import com.datatorrent.contrib.hive.HiveStore;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+
/**
* Application used to benchmark HIVE Map Insert operator
* The DAG consists of random Event generator operator that is
@@ -61,12 +61,13 @@ public class HiveMapInsertBenchmarkingApp implements StreamingApplication
{
HiveStore store = new HiveStore();
store.setDatabaseUrl(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
- store.setConnectionProperties(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+ store.setConnectionProperties(conf.get(
+ "dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
store.setFilepath(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
try {
- hiveInitializeMapDatabase(store, conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"), ":");
- }
- catch (SQLException ex) {
+ hiveInitializeMapDatabase(store, conf.get(
+ "dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"), ":");
+ } catch (SQLException ex) {
LOG.debug(ex.getMessage());
}
dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
@@ -90,13 +91,15 @@ public class HiveMapInsertBenchmarkingApp implements StreamingApplication
/*
* User can create table and specify data columns and partition columns in this function.
*/
- public static void hiveInitializeMapDatabase(HiveStore hiveStore, String tablename, String delimiterMap) throws SQLException
+ public static void hiveInitializeMapDatabase(
+ HiveStore hiveStore, String tablename, String delimiterMap) throws SQLException
{
hiveStore.connect();
Statement stmt = hiveStore.getConnection().createStatement();
- stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n"
- + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n"
- + "STORED AS TEXTFILE ");
+ stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename
+ + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n"
+ + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n"
+ + "STORED AS TEXTFILE ");
hiveStore.disconnect();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
index 8239ea7..e147ad7 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
@@ -18,11 +18,11 @@
*/
package com.datatorrent.benchmark.kafka;
-import kafka.message.Message;
-
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.contrib.kafka.AbstractKafkaInputOperator;
+import kafka.message.Message;
+
/**
* This operator emits one constant message for each kafka message received.
* So we can track the throughput by messages emitted per second in the stram platform.
@@ -38,7 +38,7 @@ public class BenchmarkKafkaInputOperator extends AbstractKafkaInputOperator
/**
* The output port on which messages are emitted.
*/
- public transient DefaultOutputPort<String> oport = new DefaultOutputPort<String>();
+ public transient DefaultOutputPort<String> oport = new DefaultOutputPort<String>();
@Override
protected void emitTuple(Message message)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
index 1126ac1..6353c37 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
@@ -18,7 +18,6 @@
*/
package com.datatorrent.benchmark.kafka;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@@ -26,9 +25,6 @@ import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
import javax.validation.constraints.Min;
@@ -37,21 +33,27 @@ import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultPartition;
+
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.Partitioner;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
/**
* This operator keep sending constant messages(1kb each) in {@link #threadNum} threads.
* Messages are distributed evenly to partitions.
* <p></p>
+ *
* @displayName Benchmark Partitionable Kafka Output
* @category Messaging
* @tags output operator
- *
* @since 0.9.3
*/
-public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<BenchmarkPartitionableKafkaOutputOperator>, InputOperator, ActivationListener<OperatorContext>
+public class BenchmarkPartitionableKafkaOutputOperator implements
+ Partitioner<BenchmarkPartitionableKafkaOutputOperator>, InputOperator, ActivationListener<OperatorContext>
{
private static final Logger logger = LoggerFactory.getLogger(BenchmarkPartitionableKafkaOutputOperator.class);
@@ -78,7 +80,8 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
private int stickyKey = 0;
- private transient Runnable r = new Runnable() {
+ private transient Runnable r = new Runnable()
+ {
Producer<String, String> producer = null;
@@ -101,12 +104,12 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
}
long k = 0;
- while (k<msgsSecThread || !controlThroughput) {
+ while (k < msgsSecThread || !controlThroughput) {
long key = (stickyKey >= 0 ? stickyKey : k);
k++;
producer.send(new KeyedMessage<String, String>(topic, "" + key, new String(constantMsg)));
- if(k==Long.MAX_VALUE){
- k=0;
+ if (k == Long.MAX_VALUE) {
+ k = 0;
}
}
}
@@ -152,10 +155,12 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
* {@inheritDoc}
*/
@Override
- public Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> definePartitions(Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> partitions, PartitioningContext context)
+ public Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> definePartitions(
+ Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> partitions, PartitioningContext context)
{
- ArrayList<Partition<BenchmarkPartitionableKafkaOutputOperator>> newPartitions = new ArrayList<Partitioner.Partition<BenchmarkPartitionableKafkaOutputOperator>>(partitionCount);
+ ArrayList<Partition<BenchmarkPartitionableKafkaOutputOperator>> newPartitions =
+ new ArrayList<Partitioner.Partition<BenchmarkPartitionableKafkaOutputOperator>>(partitionCount);
for (int i = 0; i < partitionCount; i++) {
BenchmarkPartitionableKafkaOutputOperator bpkoo = new BenchmarkPartitionableKafkaOutputOperator();
@@ -163,7 +168,8 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
bpkoo.setTopic(topic);
bpkoo.setBrokerList(brokerList);
bpkoo.setStickyKey(i);
- Partition<BenchmarkPartitionableKafkaOutputOperator> p = new DefaultPartition<BenchmarkPartitionableKafkaOutputOperator>(bpkoo);
+ Partition<BenchmarkPartitionableKafkaOutputOperator> p =
+ new DefaultPartition<BenchmarkPartitionableKafkaOutputOperator>(bpkoo);
newPartitions.add(p);
}
return newPartitions;
@@ -176,20 +182,17 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
logger.info("Activate the benchmark kafka output operator .... ");
constantMsg = new byte[msgSize];
for (int i = 0; i < constantMsg.length; i++) {
- constantMsg[i] = (byte) ('a' + i%26);
+ constantMsg[i] = (byte)('a' + i % 26);
}
-
for (int i = 0; i < threadNum; i++) {
- if(controlThroughput){
+ if (controlThroughput) {
ses.scheduleAtFixedRate(r, 0, 1, TimeUnit.SECONDS);
- }
- else {
+ } else {
ses.submit(r);
}
}
-
}
@Override
@@ -268,7 +271,4 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be
this.stickyKey = stickyKey;
}
-
-
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
index 159ee60..ead6c66 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
@@ -18,24 +18,27 @@
*/
package com.datatorrent.benchmark.kafka;
-import com.google.common.collect.Sets;
import java.util.Properties;
+
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
+
+import com.datatorrent.api.DAG;
+
import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+
+import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.kafka.HighlevelKafkaConsumer;
import com.datatorrent.contrib.kafka.KafkaConsumer;
import com.datatorrent.contrib.kafka.SimpleKafkaConsumer;
-
/**
* The stream app to test the benckmark of kafka
* You can set the property file to make it using either {@link SimpleKafkaConsumer} or {@link HighlevelKafkaConsumer}
@@ -43,13 +46,14 @@ import com.datatorrent.contrib.kafka.SimpleKafkaConsumer;
*
* @since 0.9.3
*/
-@ApplicationAnnotation(name="KafkaInputBenchmark")
+@ApplicationAnnotation(name = "KafkaInputBenchmark")
public class KafkaInputBenchmark implements StreamingApplication
{
public static class CollectorModule extends BaseOperator
{
- public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>() {
+ public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>()
+ {
@Override
public void process(String arg0)
@@ -65,12 +69,10 @@ public class KafkaInputBenchmark implements StreamingApplication
dag.setAttribute(DAG.APPLICATION_NAME, "KafkaInputOperatorPartitionDemo");
BenchmarkKafkaInputOperator bpkio = new BenchmarkKafkaInputOperator();
-
String type = conf.get("kafka.consumertype", "simple");
KafkaConsumer consumer = null;
-
if (type.equals("highlevel")) {
// Create template high-level consumer
@@ -96,7 +98,6 @@ public class KafkaInputBenchmark implements StreamingApplication
dag.setAttribute(bpkio, OperatorContext.COUNTERS_AGGREGATOR, new KafkaConsumer.KafkaMeterStatsAggregator());
// dag.setAttribute(bpkio, OperatorContext.STATS_LISTENER, KafkaMeterStatsListener.class);
-
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
index ca1de48..0dd4352 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java
@@ -29,7 +29,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
*
* @since 0.9.3
*/
-@ApplicationAnnotation(name="KafkaOutputBenchmark")
+@ApplicationAnnotation(name = "KafkaOutputBenchmark")
public class KafkaOutputBenchmark implements StreamingApplication
{
@@ -37,7 +37,8 @@ public class KafkaOutputBenchmark implements StreamingApplication
public void populateDAG(DAG dag, Configuration conf)
{
dag.setAttribute(DAG.APPLICATION_NAME, "KafkaOutputBenchmark");
- BenchmarkPartitionableKafkaOutputOperator bpkoo = dag.addOperator("KafkaBenchmarkProducer", BenchmarkPartitionableKafkaOutputOperator.class);
+ BenchmarkPartitionableKafkaOutputOperator bpkoo = dag.addOperator(
+ "KafkaBenchmarkProducer", BenchmarkPartitionableKafkaOutputOperator.class);
bpkoo.setBrokerList(conf.get("kafka.brokerlist"));
bpkoo.setPartitionCount(2);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
index 1d22613..65601d5 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java
@@ -21,7 +21,6 @@ package com.datatorrent.benchmark.kafka;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
-
/**
* A simple partitioner class for test purpose
* Key is a int string
@@ -32,12 +31,14 @@ import kafka.utils.VerifiableProperties;
*/
public class KafkaTestPartitioner implements Partitioner
{
- public KafkaTestPartitioner (VerifiableProperties props) {
+ public KafkaTestPartitioner(VerifiableProperties props)
+ {
}
+
@Override
public int partition(Object key, int num_Partitions)
{
- return Integer.parseInt((String)key)%num_Partitions;
+ return Integer.parseInt((String)key) % num_Partitions;
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
index bc23404..b86cd01 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
@@ -18,17 +18,20 @@
*/
package com.datatorrent.benchmark.script;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG.Locality;
+
+import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+
import com.datatorrent.benchmark.RandomMapOutput;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.contrib.ruby.RubyOperator;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
/**
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
index 07ab02e..7c45106 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
@@ -169,7 +169,8 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec
long countInPeriod = totalCount - lastTotalCount;
long timeInPeriod = System.currentTimeMillis() - lastLogTime;
long totalTime = System.currentTimeMillis() - beginTime;
- logger.info("Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}",
+ logger.info(
+ "Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}",
totalCount, countInPeriod, totalCount * 1000 / totalTime, countInPeriod * 1000 / timeInPeriod);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
index ae5ba40..2dc6f0d 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
@@ -98,7 +98,8 @@ public class ManagedStateBenchmarkApp implements StreamingApplication
public static class TestGenerator extends BaseOperator implements InputOperator
{
- public final transient DefaultOutputPort<KeyValPair<byte[], byte[]>> data = new DefaultOutputPort<KeyValPair<byte[], byte[]>>();
+ public final transient DefaultOutputPort<KeyValPair<byte[], byte[]>> data =
+ new DefaultOutputPort<KeyValPair<byte[], byte[]>>();
int emitBatchSize = 1000;
byte[] val = ByteBuffer.allocate(1000).putLong(1234).array();
int rate = 20000;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
index 74ba658..60a775c 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
@@ -69,7 +69,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
private ExecMode execMode = ExecMode.INSERT;
private int timeRange = 1000 * 60;
- public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = new DefaultInputPort<KeyValPair<byte[], byte[]>>()
+ public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input =
+ new DefaultInputPort<KeyValPair<byte[], byte[]>>()
{
@Override
public void process(KeyValPair<byte[], byte[]> tuple)
@@ -172,7 +173,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
private final int taskBarrier = 100000;
/**
- * This method first send request of get to the state manager, then handle all the task(get) which already done and update the value.
+ * This method first send request of get to the state manager,
+ * then handle all the task(get) which already done and update the value.
* @param tuple
*/
private void updateAsync(KeyValPair<byte[], byte[]> tuple)
@@ -251,7 +253,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo
long spentTime = now - statisticsBeginTime;
long totalSpentTime = now - applicationBeginTime;
totalTupleCount += tupleCount;
- logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}", windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime,
+ logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}",
+ windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime,
totalTupleCount * 1000 / totalSpentTime);
statisticsBeginTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
index e0ee160..b0b7314 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java
@@ -18,13 +18,14 @@
*/
package com.datatorrent.benchmark.stream;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNullCounter;
-import org.apache.hadoop.conf.Configuration;
/**
*
@@ -56,11 +57,11 @@ public class DevNullCounterBenchmark implements StreamingApplication
@Override
public void populateDAG(DAG dag, Configuration conf)
{
- // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
- // rand.setMinvalue(0);
- // rand.setMaxvalue(999999);
- // rand.setTuplesBlastIntervalMillis(50);
- // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+ // rand.setMinvalue(0);
+ // rand.setMaxvalue(999999);
+ // rand.setTuplesBlastIntervalMillis(50);
+ // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator());
DevNullCounter oper = dag.addOperator("oper", new DevNullCounter());
dag.getMeta(oper).getMeta(oper.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
index ff6ed76..c716206 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java
@@ -35,37 +35,42 @@ public class IntegerOperator implements InputOperator
* Output port which emits integer.
*/
public final transient DefaultOutputPort<Integer> integer_data = new DefaultOutputPort<Integer>();
+
@Override
public void emitTuples()
{
Integer i = 21;
- for(int j=0;j<1000;j++){
- integer_data.emit(i);
+ for (int j = 0; j < 1000; j++) {
+ integer_data.emit(i);
}
}
@Override
public void beginWindow(long windowId)
{
- //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ //throw new UnsupportedOperationException("Not supported yet.");
+ // To change body of generated methods, choose Tools | Templates.
}
@Override
public void endWindow()
{
- //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ //throw new UnsupportedOperationException("Not supported yet.");
+ // To change body of generated methods, choose Tools | Templates.
}
@Override
public void setup(OperatorContext context)
{
- //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ //throw new UnsupportedOperationException("Not supported yet.");
+ // To change body of generated methods, choose Tools | Templates.
}
@Override
public void teardown()
{
- //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+ //throw new UnsupportedOperationException("Not supported yet.");
+ // To change body of generated methods, choose Tools | Templates.
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
index 951b44b..2e5bcf9 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java
@@ -18,6 +18,8 @@
*/
package com.datatorrent.benchmark.stream;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -25,7 +27,6 @@ import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.stream.StreamDuplicater;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for StreamDuplicater Operator.
@@ -36,25 +37,25 @@ import org.apache.hadoop.conf.Configuration;
@ApplicationAnnotation(name = "StreamDuplicaterApp")
public class StreamDuplicaterApp implements StreamingApplication
{
- private final Locality locality = null;
- public static final int QUEUE_CAPACITY = 16 * 1024;
+ private final Locality locality = null;
+ public static final int QUEUE_CAPACITY = 16 * 1024;
@Override
public void populateDAG(DAG dag, Configuration conf)
{
- // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
- // rand.setMinvalue(0);
- // rand.setMaxvalue(999999);
- // rand.setTuplesBlastIntervalMillis(50);
- // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+ // rand.setMinvalue(0);
+ // rand.setMaxvalue(999999);
+ // rand.setTuplesBlastIntervalMillis(50);
+ // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator());
StreamDuplicater stream = dag.addOperator("stream", new StreamDuplicater());
dag.getMeta(stream).getMeta(stream.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
dag.addStream("streamdup1", intInput.integer_data, stream.data).setLocality(locality);
DevNull<Integer> dev1 = dag.addOperator("dev1", new DevNull());
DevNull<Integer> dev2 = dag.addOperator("dev2", new DevNull());
- dag.addStream("streamdup2",stream.out1,dev1.data).setLocality(locality);
- dag.addStream("streamdup3",stream.out2,dev2.data).setLocality(locality);
+ dag.addStream("streamdup2", stream.out1, dev1.data).setLocality(locality);
+ dag.addStream("streamdup3", stream.out2, dev2.data).setLocality(locality);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
index d90320c..bb1d081 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java
@@ -18,6 +18,8 @@
*/
package com.datatorrent.benchmark.stream;
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -25,7 +27,6 @@ import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.benchmark.WordCountOperator;
import com.datatorrent.lib.stream.StreamMerger;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for StreamMerge Operator.
@@ -46,7 +47,7 @@ public class StreamMergeApp implements StreamingApplication
StreamMerger stream = dag.addOperator("stream", new StreamMerger());
dag.getMeta(stream).getMeta(stream.data1).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
dag.getMeta(stream).getMeta(stream.data2).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
- dag.addStream("streammerge1", intInput.integer_data, stream.data1,stream.data2).setLocality(locality);
+ dag.addStream("streammerge1", intInput.integer_data, stream.data1, stream.data2).setLocality(locality);
WordCountOperator<Integer> counter = dag.addOperator("counter", new WordCountOperator<Integer>());
dag.getMeta(counter).getMeta(counter.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
index 419de18..b1ddbee 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java
@@ -18,6 +18,11 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -25,9 +30,6 @@ import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.testbench.EventClassifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for EventClassifier Operator.
@@ -75,7 +77,8 @@ public class EventClassifierApp implements StreamingApplication
eventClassifier.setKeyMap(keymap);
eventClassifier.setOperationReplace();
eventClassifier.setKeyWeights(wmap);
- dag.getMeta(eventClassifier).getMeta(eventClassifier.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ dag.getMeta(eventClassifier).getMeta(eventClassifier.data).getAttributes()
+ .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
dag.addStream("eventtest1", hmapOper.hmap_data, eventClassifier.event).setLocality(locality);
DevNull<HashMap<String, Double>> dev = dag.addOperator("dev", new DevNull());
dag.addStream("eventtest2", eventClassifier.data, dev.data).setLocality(locality);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
index a49b30e..5fe478b 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java
@@ -18,6 +18,10 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -26,8 +30,6 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.benchmark.WordCountOperator;
import com.datatorrent.benchmark.stream.IntegerOperator;
import com.datatorrent.lib.testbench.EventClassifierNumberToHashDouble;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for EventClassifierNumberToHashDouble Operator.
@@ -44,10 +46,14 @@ public class EventClassifierNumberToHashDoubleApp implements StreamingApplicatio
@Override
public void populateDAG(DAG dag, Configuration conf)
{
- WordCountOperator<HashMap<String, Double>> counterString = dag.addOperator("counterString", new WordCountOperator<HashMap<String, Double>>());
- dag.getMeta(counterString).getMeta(counterString.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
- EventClassifierNumberToHashDouble eventClassify = dag.addOperator("eventClassify", new EventClassifierNumberToHashDouble());
- dag.getMeta(eventClassify).getMeta(eventClassify.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ WordCountOperator<HashMap<String, Double>> counterString =
+ dag.addOperator("counterString", new WordCountOperator<HashMap<String, Double>>());
+ dag.getMeta(counterString).getMeta(counterString.input).getAttributes()
+ .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ EventClassifierNumberToHashDouble eventClassify =
+ dag.addOperator("eventClassify", new EventClassifierNumberToHashDouble());
+ dag.getMeta(eventClassify).getMeta(eventClassify.data)
+ .getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator());
dag.addStream("eventclassifier2", intInput.integer_data, eventClassify.event).setLocality(locality);
dag.addStream("eventclassifier1", eventClassify.data, counterString.input).setLocality(locality);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
index 3025c7e..8f28ae6 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java
@@ -18,6 +18,10 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -25,8 +29,6 @@ import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.testbench.EventGenerator;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for EventGenerator Operator.
@@ -44,7 +46,8 @@ public class EventGeneratorApp implements StreamingApplication
public void populateDAG(DAG dag, Configuration conf)
{
EventGenerator eventGenerator = dag.addOperator("eventGenerator", new EventGenerator());
- dag.getMeta(eventGenerator).getMeta(eventGenerator.count).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
+ dag.getMeta(eventGenerator).getMeta(eventGenerator.count).getAttributes()
+ .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
DevNull<String> devString = dag.addOperator("devString", new DevNull());
DevNull<HashMap<String, Double>> devMap = dag.addOperator("devMap", new DevNull());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
index ea05d07..e562224 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java
@@ -18,15 +18,17 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.testbench.EventIncrementer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for EventIncrementer Operator.
@@ -39,6 +41,7 @@ public class EventIncrementerApp implements StreamingApplication
{
private final Locality locality = null;
public static final int QUEUE_CAPACITY = 16 * 1024;
+
@Override
public void populateDAG(DAG dag, Configuration conf)
{
@@ -55,15 +58,14 @@ public class EventIncrementerApp implements StreamingApplication
eventInc.setKeylimits(keys, low, high);
eventInc.setDelta(1);
HashMapOperator hmapOper = dag.addOperator("hmapOper", new HashMapOperator());
- dag.addStream("eventIncInput1",hmapOper.hmapList_data,eventInc.seed);
- dag.addStream("eventIncInput2",hmapOper.hmapMap_data,eventInc.increment);
- DevNull<HashMap<String,Integer>> dev1= dag.addOperator("dev1", new DevNull());
- DevNull<HashMap<String,String>> dev2= dag.addOperator("dev2", new DevNull());
- dag.addStream("eventIncOutput1",eventInc.count,dev1.data).setLocality(locality);
- dag.addStream("eventIncOutput2",eventInc.data,dev2.data).setLocality(locality);
+ dag.addStream("eventIncInput1", hmapOper.hmapList_data, eventInc.seed);
+ dag.addStream("eventIncInput2", hmapOper.hmapMap_data, eventInc.increment);
+ DevNull<HashMap<String, Integer>> dev1 = dag.addOperator("dev1", new DevNull());
+ DevNull<HashMap<String, String>> dev2 = dag.addOperator("dev2", new DevNull());
+ dag.addStream("eventIncOutput1", eventInc.count, dev1.data).setLocality(locality);
+ dag.addStream("eventIncOutput2", eventInc.data, dev2.data).setLocality(locality);
}
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
index 915e6f0..ea2943f 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java
@@ -18,15 +18,17 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.testbench.FilterClassifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for FilterClassifier Operator.
@@ -39,6 +41,7 @@ public class FilterClassifierApp implements StreamingApplication
{
private final Locality locality = null;
public static final int QUEUE_CAPACITY = 16 * 1024;
+
@Override
public void populateDAG(DAG dag, Configuration conf)
{
@@ -80,9 +83,9 @@ public class FilterClassifierApp implements StreamingApplication
filter.setTotalFilter(100);
HashMapOperator hmapOper = dag.addOperator("hmapOper", new HashMapOperator());
- DevNull<HashMap<String,Double>> dev = dag.addOperator("dev", new DevNull());
- dag.addStream("filter1",hmapOper.hmap_data,filter.data).setLocality(locality);
- dag.addStream("filer2",filter.filter,dev.data).setLocality(locality);
+ DevNull<HashMap<String, Double>> dev = dag.addOperator("dev", new DevNull());
+ dag.addStream("filter1", hmapOper.hmap_data, filter.data).setLocality(locality);
+ dag.addStream("filer2", filter.filter, dev.data).setLocality(locality);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
index c3d996e..52c0bed 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java
@@ -18,15 +18,17 @@
*/
package com.datatorrent.benchmark.testbench;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.lib.testbench.FilteredEventClassifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
/**
* Benchmark App for FilteredEventClassifier Operator.