You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:12:59 UTC

[42/50] [abbrv] incubator-apex-malhar git commit: APEXMALHAR-1970 #resolve #comment Fix the ArrayOutOfBoundaryException and add a bunch of tests for both one_to_one and one_to_many partition

APEXMALHAR-1970 #resolve #comment Fix the ArrayOutOfBoundaryException and add a bunch of tests for both one_to_one and one_to_many partition


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b431eb34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b431eb34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b431eb34

Branch: refs/heads/master
Commit: b431eb34f497f113b30158ae2033be3318de28bc
Parents: 04c9f52
Author: Siyuan Hua <hs...@apache.org>
Authored: Mon Jan 11 22:09:25 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Mon Jan 11 22:09:25 2016 -0800

----------------------------------------------------------------------
 .../malhar/kafka/AbstractKafkaPartitioner.java  |  4 ++--
 .../apex/malhar/kafka/OneToManyPartitioner.java |  4 ++--
 .../malhar/kafka/KafkaInputOperatorTest.java    | 23 ++++++++++++--------
 3 files changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 0fdd721..2159e4f 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -257,13 +257,13 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
       }
       PartitionMeta that = (PartitionMeta)o;
       return Objects.equals(cluster, that.cluster) &&
-        Objects.equals(topicPartition, that.topicPartition);
+        Objects.equals(getTopicPartition(), that.getTopicPartition());
     }
 
     @Override
     public int hashCode()
     {
-      return Objects.hash(cluster, topicPartition);
+      return Objects.hash(cluster, getTopicPartition());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
index 09d22eb..736727e 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
@@ -52,8 +52,8 @@ public class OneToManyPartitioner extends AbstractKafkaPartitioner
       for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) {
         for (PartitionInfo pif : topicPartition.getValue()) {
           int index = i++ % partitionCount;
-          if (eachPartitionAssignment.get(index) == null) {
-            eachPartitionAssignment.add(index, new HashSet<PartitionMeta>());
+          if (index >= eachPartitionAssignment.size()) {
+            eachPartitionAssignment.add(new HashSet<PartitionMeta>());
           }
           eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition()));
         }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 17bc465..d055555 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -48,26 +48,30 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
 
   private int totalBrokers = 0;
 
+  private String partition = null;
 
-
-  @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}")
-  public static Collection<Boolean[]> testScenario()
+  @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
+  public static Collection<Object[]> testScenario()
   {
-    return Arrays.asList(new Boolean[][]{{true, false}, // multi cluster with single partition
-      {true, true}, // multi cluster with multi partitions
-      {false, true}, // single cluster with multi partitions
-      {false, false}, // single cluster with single partitions
+    return Arrays.asList(new Object[][]{{true, false, "one_to_one"},// multi cluster with single partition
+      {true, false, "one_to_many"},
+      {true, true, "one_to_one"},// multi cluster with multi partitions
+      {true, true, "one_to_many"},
+      {false, true, "one_to_one"}, // single cluster with multi partitions
+      {false, true, "one_to_many"},
+      {false, false, "one_to_one"}, // single cluster with single partitions
+      {false, false, "one_to_many"}
     });
   }
 
-  public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition)
+  public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition)
   {
     // This class want to initialize several kafka brokers for multiple partitions
     this.hasMultiCluster = hasMultiCluster;
     this.hasMultiPartition = hasMultiPartition;
     int cluster = 1 + (hasMultiCluster ? 1 : 0);
     totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
-
+    this.partition = partition;
   }
 
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
@@ -167,6 +171,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     node.setTopics(TEST_TOPIC);
     node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
     node.setClusters(getClusterConfig());
+    node.setStrategy(partition);
 
     // Create Test tuple collector
     CollectorModule collector = dag.addOperator("TestMessageCollector", new CollectorModule());