You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:12:59 UTC
[42/50] [abbrv] incubator-apex-malhar git commit: APEXMALHAR-1970
#resolve #comment Fix the ArrayOutOfBoundaryException and add a bunch of
tests for both one_to_one and one_to_many partition
APEXMALHAR-1970 #resolve #comment Fix the ArrayOutOfBoundaryException and add a bunch of tests for both one_to_one and one_to_many partition
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b431eb34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b431eb34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b431eb34
Branch: refs/heads/master
Commit: b431eb34f497f113b30158ae2033be3318de28bc
Parents: 04c9f52
Author: Siyuan Hua <hs...@apache.org>
Authored: Mon Jan 11 22:09:25 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Mon Jan 11 22:09:25 2016 -0800
----------------------------------------------------------------------
.../malhar/kafka/AbstractKafkaPartitioner.java | 4 ++--
.../apex/malhar/kafka/OneToManyPartitioner.java | 4 ++--
.../malhar/kafka/KafkaInputOperatorTest.java | 23 ++++++++++++--------
3 files changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 0fdd721..2159e4f 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -257,13 +257,13 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
}
PartitionMeta that = (PartitionMeta)o;
return Objects.equals(cluster, that.cluster) &&
- Objects.equals(topicPartition, that.topicPartition);
+ Objects.equals(getTopicPartition(), that.getTopicPartition());
}
@Override
public int hashCode()
{
- return Objects.hash(cluster, topicPartition);
+ return Objects.hash(cluster, getTopicPartition());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
index 09d22eb..736727e 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
@@ -52,8 +52,8 @@ public class OneToManyPartitioner extends AbstractKafkaPartitioner
for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) {
for (PartitionInfo pif : topicPartition.getValue()) {
int index = i++ % partitionCount;
- if (eachPartitionAssignment.get(index) == null) {
- eachPartitionAssignment.add(index, new HashSet<PartitionMeta>());
+ if (index >= eachPartitionAssignment.size()) {
+ eachPartitionAssignment.add(new HashSet<PartitionMeta>());
}
eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition()));
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 17bc465..d055555 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -48,26 +48,30 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
private int totalBrokers = 0;
+ private String partition = null;
-
- @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}")
- public static Collection<Boolean[]> testScenario()
+ @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
+ public static Collection<Object[]> testScenario()
{
- return Arrays.asList(new Boolean[][]{{true, false}, // multi cluster with single partition
- {true, true}, // multi cluster with multi partitions
- {false, true}, // single cluster with multi partitions
- {false, false}, // single cluster with single partitions
+ return Arrays.asList(new Object[][]{{true, false, "one_to_one"},// multi cluster with single partition
+ {true, false, "one_to_many"},
+ {true, true, "one_to_one"},// multi cluster with multi partitions
+ {true, true, "one_to_many"},
+ {false, true, "one_to_one"}, // single cluster with multi partitions
+ {false, true, "one_to_many"},
+ {false, false, "one_to_one"}, // single cluster with single partitions
+ {false, false, "one_to_many"}
});
}
- public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition)
+ public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition)
{
// This class want to initialize several kafka brokers for multiple partitions
this.hasMultiCluster = hasMultiCluster;
this.hasMultiPartition = hasMultiPartition;
int cluster = 1 + (hasMultiCluster ? 1 : 0);
totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
-
+ this.partition = partition;
}
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
@@ -167,6 +171,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
node.setTopics(TEST_TOPIC);
node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
node.setClusters(getClusterConfig());
+ node.setStrategy(partition);
// Create Test tuple collector
CollectorModule collector = dag.addOperator("TestMessageCollector", new CollectorModule());