You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/14 09:05:48 UTC

[1/3] incubator-apex-malhar git commit: APEXMALHAR-1970 #resolve #comment Fix the ArrayOutOfBoundaryException and add a bunch of tests for both one_to_one and one_to_many partition

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 8b37ee372 -> 9ba99b0ca


APEXMALHAR-1970 #resolve #comment Fix the ArrayOutOfBoundaryException and add a bunch of tests for both one_to_one and one_to_many partition


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b431eb34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b431eb34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b431eb34

Branch: refs/heads/devel-3
Commit: b431eb34f497f113b30158ae2033be3318de28bc
Parents: 04c9f52
Author: Siyuan Hua <hs...@apache.org>
Authored: Mon Jan 11 22:09:25 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Mon Jan 11 22:09:25 2016 -0800

----------------------------------------------------------------------
 .../malhar/kafka/AbstractKafkaPartitioner.java  |  4 ++--
 .../apex/malhar/kafka/OneToManyPartitioner.java |  4 ++--
 .../malhar/kafka/KafkaInputOperatorTest.java    | 23 ++++++++++++--------
 3 files changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 0fdd721..2159e4f 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -257,13 +257,13 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
       }
       PartitionMeta that = (PartitionMeta)o;
       return Objects.equals(cluster, that.cluster) &&
-        Objects.equals(topicPartition, that.topicPartition);
+        Objects.equals(getTopicPartition(), that.getTopicPartition());
     }
 
     @Override
     public int hashCode()
     {
-      return Objects.hash(cluster, topicPartition);
+      return Objects.hash(cluster, getTopicPartition());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
index 09d22eb..736727e 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
@@ -52,8 +52,8 @@ public class OneToManyPartitioner extends AbstractKafkaPartitioner
       for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) {
         for (PartitionInfo pif : topicPartition.getValue()) {
           int index = i++ % partitionCount;
-          if (eachPartitionAssignment.get(index) == null) {
-            eachPartitionAssignment.add(index, new HashSet<PartitionMeta>());
+          if (index >= eachPartitionAssignment.size()) {
+            eachPartitionAssignment.add(new HashSet<PartitionMeta>());
           }
           eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition()));
         }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 17bc465..d055555 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -48,26 +48,30 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
 
   private int totalBrokers = 0;
 
+  private String partition = null;
 
-
-  @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}")
-  public static Collection<Boolean[]> testScenario()
+  @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
+  public static Collection<Object[]> testScenario()
   {
-    return Arrays.asList(new Boolean[][]{{true, false}, // multi cluster with single partition
-      {true, true}, // multi cluster with multi partitions
-      {false, true}, // single cluster with multi partitions
-      {false, false}, // single cluster with single partitions
+    return Arrays.asList(new Object[][]{{true, false, "one_to_one"},// multi cluster with single partition
+      {true, false, "one_to_many"},
+      {true, true, "one_to_one"},// multi cluster with multi partitions
+      {true, true, "one_to_many"},
+      {false, true, "one_to_one"}, // single cluster with multi partitions
+      {false, true, "one_to_many"},
+      {false, false, "one_to_one"}, // single cluster with single partitions
+      {false, false, "one_to_many"}
     });
   }
 
-  public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition)
+  public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition)
   {
     // This class want to initialize several kafka brokers for multiple partitions
     this.hasMultiCluster = hasMultiCluster;
     this.hasMultiPartition = hasMultiPartition;
     int cluster = 1 + (hasMultiCluster ? 1 : 0);
     totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
-
+    this.partition = partition;
   }
 
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
@@ -167,6 +171,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     node.setTopics(TEST_TOPIC);
     node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
     node.setClusters(getClusterConfig());
+    node.setStrategy(partition);
 
     // Create Test tuple collector
     CollectorModule collector = dag.addOperator("TestMessageCollector", new CollectorModule());


[2/3] incubator-apex-malhar git commit: APEXMALHAR-1973 #comment disable committing offsets for latest|earliest and store the offset for next message

Posted by th...@apache.org.
APEXMALHAR-1973 #comment disable committing offsets for latest|earliest and store the offset for next message


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c464f064
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c464f064
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c464f064

Branch: refs/heads/devel-3
Commit: c464f064b590b786f257a765df026cb29a50c8ae
Parents: b431eb3
Author: Siyuan Hua <hs...@apache.org>
Authored: Wed Jan 13 17:07:18 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Wed Jan 13 17:12:08 2016 -0800

----------------------------------------------------------------------
 .../apache/apex/malhar/kafka/AbstractKafkaInputOperator.java  | 4 +++-
 .../apache/apex/malhar/kafka/AbstractKafkaPartitioner.java    | 7 ++++---
 2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c464f064/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 4f2f704..c021c1c 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -177,6 +177,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
   @Override
   public void committed(long windowId)
   {
+    if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST)
+      return;
     //ask kafka consumer wrapper to store the committed offsets
     for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) {
       Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next();
@@ -202,7 +204,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
       emitTuple(tuple.getLeft(), msg);
       AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(),
           msg.topic(), msg.partition());
-      offsetTrack.put(pm, msg.offset());
+      offsetTrack.put(pm, msg.offset() + 1);
     }
     emitCount += count;
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c464f064/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 2159e4f..57c6998 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -256,14 +256,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
         return false;
       }
       PartitionMeta that = (PartitionMeta)o;
-      return Objects.equals(cluster, that.cluster) &&
-        Objects.equals(getTopicPartition(), that.getTopicPartition());
+      return Objects.equals(partitionId, that.partitionId) &&
+        Objects.equals(cluster, that.cluster) &&
+        Objects.equals(topic, that.topic);
     }
 
     @Override
     public int hashCode()
     {
-      return Objects.hash(cluster, getTopicPartition());
+      return Objects.hash(cluster, topic, partitionId);
     }
 
     @Override


[3/3] incubator-apex-malhar git commit: Merge branch 'MALHAR-1970' of https://github.com/siyuanh/incubator-apex-malhar into devel-3

Posted by th...@apache.org.
Merge branch 'MALHAR-1970' of https://github.com/siyuanh/incubator-apex-malhar into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9ba99b0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9ba99b0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9ba99b0c

Branch: refs/heads/devel-3
Commit: 9ba99b0ca3cc556f8710393b2ee026bfbf21bb84
Parents: 8b37ee3 c464f06
Author: Thomas Weise <th...@datatorrent.com>
Authored: Wed Jan 13 23:53:27 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Wed Jan 13 23:53:27 2016 -0800

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       |  4 +++-
 .../malhar/kafka/AbstractKafkaPartitioner.java  |  7 +++---
 .../apex/malhar/kafka/OneToManyPartitioner.java |  4 ++--
 .../malhar/kafka/KafkaInputOperatorTest.java    | 23 ++++++++++++--------
 4 files changed, 23 insertions(+), 15 deletions(-)
----------------------------------------------------------------------