You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2015/11/30 22:07:08 UTC
[62/98] [abbrv] incubator-apex-malhar git commit: Cover
distributeddistinct in CI build.
Cover distributeddistinct in CI build.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9587b03e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9587b03e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9587b03e
Branch: refs/heads/master
Commit: 9587b03e26ab884c6877701dfdbd5778aa2a5a81
Parents: a6ba9a4
Author: Thomas Weise <th...@datatorrent.com>
Authored: Fri Oct 16 20:35:22 2015 -0700
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Fri Oct 16 20:35:22 2015 -0700
----------------------------------------------------------------------
demos/distributedistinct/pom.xml | 4 +--
.../demos/distributeddistinct/Application.java | 9 +++--
.../UniqueValueCountAppender.java | 17 ++-------
.../DistributedDistinctTest.java | 4 +--
.../StatefulUniqueCountTest.java | 5 +--
demos/pom.xml | 36 ++++++++++++--------
6 files changed, 34 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/pom.xml
----------------------------------------------------------------------
diff --git a/demos/distributedistinct/pom.xml b/demos/distributedistinct/pom.xml
index 5f608a7..dac02ae 100644
--- a/demos/distributedistinct/pom.xml
+++ b/demos/distributedistinct/pom.xml
@@ -22,8 +22,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>org.apache.apex</groupId>
- <version>3.0.0</version>
<artifactId>distributedistinct</artifactId>
<packaging>jar</packaging>
@@ -33,7 +31,7 @@
<parent>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-demos</artifactId>
- <version>3.0.0</version>
+ <version>3.2.0-incubating-SNAPSHOT</version>
</parent>
<properties>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java
----------------------------------------------------------------------
diff --git a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java
index 7f65472..656b083 100644
--- a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java
+++ b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.algo.UniqueCounterValue;
import com.datatorrent.lib.algo.UniqueValueCount;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.stream.Counter;
@@ -61,10 +60,10 @@ public class Application implements StreamingApplication
dag.addStream("Duplicates", valCount.output, dup.data);
dag.addStream("Unverified", dup.out1, verifier.recIn);
dag.addStream("EventCount", randGen.verport, verifier.trueIn);
- dag.addStream("Verified", verifier.successPort, successcounter.data);
- dag.addStream("Failed", verifier.failurePort, failurecounter.data);
- dag.addStream("SuccessCount", successcounter.count, successOutput.input);
- dag.addStream("FailedCount", failurecounter.count, failureOutput.input);
+ dag.addStream("Verified", verifier.successPort, successcounter.input);
+ dag.addStream("Failed", verifier.failurePort, failurecounter.input);
+ dag.addStream("SuccessCount", successcounter.output, successOutput.input);
+ dag.addStream("FailedCount", failurecounter.output, failureOutput.input);
dag.addStream("Output", dup.out2, consOut.input);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
----------------------------------------------------------------------
diff --git a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
index e613e32..7c91f77 100644
--- a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
+++ b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
@@ -42,7 +42,6 @@ import com.datatorrent.lib.db.jdbc.JDBCLookupCacheBackedOperator;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner;
-
import com.datatorrent.netlet.util.DTThrowable;
/**
@@ -91,7 +90,7 @@ public abstract class UniqueValueCountAppender<V> extends JDBCLookupCacheBackedO
public void setup(Context.OperatorContext context)
{
super.setup(context);
- LOGGER.debug("store properties {} {}", store.getDbDriver(), store.getDbUrl());
+ LOGGER.debug("store properties {} {}", store.getDatabaseDriver(), store.getDatabaseUrl());
LOGGER.debug("table name {}", tableName);
windowID = context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID);
try {
@@ -197,19 +196,9 @@ public abstract class UniqueValueCountAppender<V> extends JDBCLookupCacheBackedO
* rollback, each partition will only clear the data that it is responsible for.
*/
@Override
- public Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> definePartitions(Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> partitions, int incrementalCapacity)
+ public Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> definePartitions(Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> partitions, PartitioningContext context)
{
- final int finalCapacity;
-
- //In the case of parallel partitioning
- if(incrementalCapacity != 0) {
- finalCapacity = incrementalCapacity;
- }
- //Do normal partitioning
- else {
- finalCapacity = partitionCount;
- }
-
+ final int finalCapacity = DefaultPartition.getRequiredPartitionCount(context, this.partitionCount);
UniqueValueCountAppender<V> anOldOperator = partitions.iterator().next().getPartitionedInstance();
partitions.clear();
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java
----------------------------------------------------------------------
diff --git a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java
index f5acbd6..d32047a 100644
--- a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java
+++ b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java
@@ -192,8 +192,8 @@ public class DistributedDistinctTest
attributes.put(DAG.APPLICATION_PATH, applicationPath);
attributes.put(OperatorContext.ACTIVATION_WINDOW_ID, 0L);
valueCounter.setTableName(TABLE_NAME);
- valueCounter.getStore().setDbDriver(INMEM_DB_DRIVER);
- valueCounter.getStore().setDbUrl(INMEM_DB_URL);
+ valueCounter.getStore().setDatabaseDriver(INMEM_DB_DRIVER);
+ valueCounter.getStore().setDatabaseUrl(INMEM_DB_URL);
TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributes);
valueCounter.setup(context);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java
----------------------------------------------------------------------
diff --git a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java
index 93ec636..55f1c8e 100644
--- a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java
+++ b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
import com.datatorrent.api.*;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.demos.distributeddistinct.IntegerUniqueValueCountAppender;
import com.datatorrent.lib.algo.UniqueValueCount;
@@ -41,7 +42,7 @@ public class StatefulUniqueCountTest
public static final String INMEM_DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
public static final String INMEM_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver";
public static final String TABLE_NAME = "Test_Lookup_Cache";
-
+
static class KeyGen implements InputOperator
{
@@ -193,7 +194,7 @@ public class StatefulUniqueCountTest
dag.addStream("ResultsOut", uniqueOut, verifyTable.input);
}
}
-
+
@BeforeClass
public static void setup(){
try {
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 123591f..97382ba 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -32,21 +32,6 @@
<packaging>pom</packaging>
<name>Apache Apex Malhar Demos</name>
- <modules>
- <module>machinedata</module>
- <module>pi</module>
- <module>twitter</module>
- <module>yahoofinance</module>
- <module>frauddetect</module>
- <module>mobile</module>
- <module>wordcount</module>
- <module>mrmonitor</module>
- <module>mroperator</module>
- <module>uniquecount</module>
- <module>r</module>
- <module>echoserver</module>
- </modules>
-
<properties>
<apex.core.version>3.2.0-incubating-SNAPSHOT</apex.core.version>
<datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
@@ -177,8 +162,29 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>all-modules</id>
+ <modules>
+ <module>distributedistinct</module>
+ </modules>
+ </profile>
</profiles>
+ <modules>
+ <module>machinedata</module>
+ <module>pi</module>
+ <module>twitter</module>
+ <module>yahoofinance</module>
+ <module>frauddetect</module>
+ <module>mobile</module>
+ <module>wordcount</module>
+ <module>mrmonitor</module>
+ <module>mroperator</module>
+ <module>uniquecount</module>
+ <module>r</module>
+ <module>echoserver</module>
+ </modules>
+
<dependencies>
<dependency>
<groupId>org.apache.apex</groupId>