You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2015/11/30 22:07:08 UTC

[62/98] [abbrv] incubator-apex-malhar git commit: Cover distributeddistinct in CI build.

Cover distributeddistinct in CI build.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9587b03e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9587b03e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9587b03e

Branch: refs/heads/master
Commit: 9587b03e26ab884c6877701dfdbd5778aa2a5a81
Parents: a6ba9a4
Author: Thomas Weise <th...@datatorrent.com>
Authored: Fri Oct 16 20:35:22 2015 -0700
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Fri Oct 16 20:35:22 2015 -0700

----------------------------------------------------------------------
 demos/distributedistinct/pom.xml                |  4 +--
 .../demos/distributeddistinct/Application.java  |  9 +++--
 .../UniqueValueCountAppender.java               | 17 ++-------
 .../DistributedDistinctTest.java                |  4 +--
 .../StatefulUniqueCountTest.java                |  5 +--
 demos/pom.xml                                   | 36 ++++++++++++--------
 6 files changed, 34 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/pom.xml
----------------------------------------------------------------------
diff --git a/demos/distributedistinct/pom.xml b/demos/distributedistinct/pom.xml
index 5f608a7..dac02ae 100644
--- a/demos/distributedistinct/pom.xml
+++ b/demos/distributedistinct/pom.xml
@@ -22,8 +22,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   
-  <groupId>org.apache.apex</groupId>
-  <version>3.0.0</version>
   <artifactId>distributedistinct</artifactId>
   <packaging>jar</packaging>
 
@@ -33,7 +31,7 @@
   <parent>
     <groupId>org.apache.apex</groupId>
     <artifactId>malhar-demos</artifactId>
-    <version>3.0.0</version>
+    <version>3.2.0-incubating-SNAPSHOT</version>
   </parent>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java
----------------------------------------------------------------------
diff --git a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java
index 7f65472..656b083 100644
--- a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java
+++ b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.algo.UniqueCounterValue;
 import com.datatorrent.lib.algo.UniqueValueCount;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.lib.stream.Counter;
@@ -61,10 +60,10 @@ public class Application implements StreamingApplication
     dag.addStream("Duplicates", valCount.output, dup.data);
     dag.addStream("Unverified", dup.out1, verifier.recIn);
     dag.addStream("EventCount", randGen.verport, verifier.trueIn);
-    dag.addStream("Verified", verifier.successPort, successcounter.data);
-    dag.addStream("Failed", verifier.failurePort, failurecounter.data);
-    dag.addStream("SuccessCount", successcounter.count, successOutput.input);
-    dag.addStream("FailedCount", failurecounter.count, failureOutput.input);
+    dag.addStream("Verified", verifier.successPort, successcounter.input);
+    dag.addStream("Failed", verifier.failurePort, failurecounter.input);
+    dag.addStream("SuccessCount", successcounter.output, successOutput.input);
+    dag.addStream("FailedCount", failurecounter.output, failureOutput.input);
     dag.addStream("Output", dup.out2, consOut.input);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
----------------------------------------------------------------------
diff --git a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
index e613e32..7c91f77 100644
--- a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
+++ b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
@@ -42,7 +42,6 @@ import com.datatorrent.lib.db.jdbc.JDBCLookupCacheBackedOperator;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultPartition;
 import com.datatorrent.api.Partitioner;
-
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -91,7 +90,7 @@ public abstract class UniqueValueCountAppender<V> extends JDBCLookupCacheBackedO
   public void setup(Context.OperatorContext context)
   {
     super.setup(context);
-    LOGGER.debug("store properties {} {}", store.getDbDriver(), store.getDbUrl());
+    LOGGER.debug("store properties {} {}", store.getDatabaseDriver(), store.getDatabaseUrl());
     LOGGER.debug("table name {}", tableName);
     windowID = context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID);
     try {
@@ -197,19 +196,9 @@ public abstract class UniqueValueCountAppender<V> extends JDBCLookupCacheBackedO
    * rollback, each partition will only clear the data that it is responsible for.
    */
   @Override
-  public Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> definePartitions(Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> partitions, int incrementalCapacity)
+  public Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> definePartitions(Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> partitions, PartitioningContext context)
   {
-    final int finalCapacity;
-
-    //In the case of parallel partitioning
-    if(incrementalCapacity != 0) {
-      finalCapacity = incrementalCapacity;
-    }
-    //Do normal partitioning
-    else {
-      finalCapacity = partitionCount;
-    }
-
+    final int finalCapacity = DefaultPartition.getRequiredPartitionCount(context, this.partitionCount);
     UniqueValueCountAppender<V> anOldOperator = partitions.iterator().next().getPartitionedInstance();
     partitions.clear();
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java
----------------------------------------------------------------------
diff --git a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java
index f5acbd6..d32047a 100644
--- a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java
+++ b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java
@@ -192,8 +192,8 @@ public class DistributedDistinctTest
     attributes.put(DAG.APPLICATION_PATH, applicationPath);
     attributes.put(OperatorContext.ACTIVATION_WINDOW_ID, 0L);
     valueCounter.setTableName(TABLE_NAME);
-    valueCounter.getStore().setDbDriver(INMEM_DB_DRIVER);
-    valueCounter.getStore().setDbUrl(INMEM_DB_URL);
+    valueCounter.getStore().setDatabaseDriver(INMEM_DB_DRIVER);
+    valueCounter.getStore().setDatabaseUrl(INMEM_DB_URL);
     TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributes);
     valueCounter.setup(context);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java
----------------------------------------------------------------------
diff --git a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java
index 93ec636..55f1c8e 100644
--- a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java
+++ b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 
 import com.datatorrent.api.*;
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.demos.distributeddistinct.IntegerUniqueValueCountAppender;
 
 import com.datatorrent.lib.algo.UniqueValueCount;
@@ -41,7 +42,7 @@ public class StatefulUniqueCountTest
   public static final String INMEM_DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
   public static final String INMEM_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver";
   public static final String TABLE_NAME = "Test_Lookup_Cache";
-  
+
   static class KeyGen implements InputOperator
   {
 
@@ -193,7 +194,7 @@ public class StatefulUniqueCountTest
       dag.addStream("ResultsOut", uniqueOut, verifyTable.input);
     }
   }
-  
+
   @BeforeClass
   public static void setup(){
     try {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 123591f..97382ba 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -32,21 +32,6 @@
   <packaging>pom</packaging>
   <name>Apache Apex Malhar Demos</name>
 
-  <modules>
-    <module>machinedata</module>
-    <module>pi</module>
-    <module>twitter</module>
-    <module>yahoofinance</module>
-    <module>frauddetect</module>
-    <module>mobile</module>
-    <module>wordcount</module>
-    <module>mrmonitor</module>
-    <module>mroperator</module>
-    <module>uniquecount</module>
-    <module>r</module>
-    <module>echoserver</module>
-  </modules>
-
   <properties>
     <apex.core.version>3.2.0-incubating-SNAPSHOT</apex.core.version>
     <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
@@ -177,8 +162,29 @@
       </plugins>
 	</build>
 	</profile>
+    <profile>
+      <id>all-modules</id>
+      <modules>
+        <module>distributedistinct</module>
+      </modules>
+    </profile>
   </profiles>
 
+  <modules>
+    <module>machinedata</module>
+    <module>pi</module>
+    <module>twitter</module>
+    <module>yahoofinance</module>
+    <module>frauddetect</module>
+    <module>mobile</module>
+    <module>wordcount</module>
+    <module>mrmonitor</module>
+    <module>mroperator</module>
+    <module>uniquecount</module>
+    <module>r</module>
+    <module>echoserver</module>
+  </modules>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.apex</groupId>