You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/14 09:05:48 UTC
[1/3] incubator-apex-malhar git commit: APEXMALHAR-1970 #resolve
#comment Fix the ArrayOutOfBoundaryException and add a bunch of tests for
both one_to_one and one_to_many partition
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 8b37ee372 -> 9ba99b0ca
APEXMALHAR-1970 #resolve #comment Fix the ArrayOutOfBoundaryException and add a bunch of tests for both one_to_one and one_to_many partition
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b431eb34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b431eb34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b431eb34
Branch: refs/heads/devel-3
Commit: b431eb34f497f113b30158ae2033be3318de28bc
Parents: 04c9f52
Author: Siyuan Hua <hs...@apache.org>
Authored: Mon Jan 11 22:09:25 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Mon Jan 11 22:09:25 2016 -0800
----------------------------------------------------------------------
.../malhar/kafka/AbstractKafkaPartitioner.java | 4 ++--
.../apex/malhar/kafka/OneToManyPartitioner.java | 4 ++--
.../malhar/kafka/KafkaInputOperatorTest.java | 23 ++++++++++++--------
3 files changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 0fdd721..2159e4f 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -257,13 +257,13 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
}
PartitionMeta that = (PartitionMeta)o;
return Objects.equals(cluster, that.cluster) &&
- Objects.equals(topicPartition, that.topicPartition);
+ Objects.equals(getTopicPartition(), that.getTopicPartition());
}
@Override
public int hashCode()
{
- return Objects.hash(cluster, topicPartition);
+ return Objects.hash(cluster, getTopicPartition());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
index 09d22eb..736727e 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
@@ -52,8 +52,8 @@ public class OneToManyPartitioner extends AbstractKafkaPartitioner
for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) {
for (PartitionInfo pif : topicPartition.getValue()) {
int index = i++ % partitionCount;
- if (eachPartitionAssignment.get(index) == null) {
- eachPartitionAssignment.add(index, new HashSet<PartitionMeta>());
+ if (index >= eachPartitionAssignment.size()) {
+ eachPartitionAssignment.add(new HashSet<PartitionMeta>());
}
eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition()));
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 17bc465..d055555 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -48,26 +48,30 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
private int totalBrokers = 0;
+ private String partition = null;
-
- @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}")
- public static Collection<Boolean[]> testScenario()
+ @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
+ public static Collection<Object[]> testScenario()
{
- return Arrays.asList(new Boolean[][]{{true, false}, // multi cluster with single partition
- {true, true}, // multi cluster with multi partitions
- {false, true}, // single cluster with multi partitions
- {false, false}, // single cluster with single partitions
+ return Arrays.asList(new Object[][]{{true, false, "one_to_one"},// multi cluster with single partition
+ {true, false, "one_to_many"},
+ {true, true, "one_to_one"},// multi cluster with multi partitions
+ {true, true, "one_to_many"},
+ {false, true, "one_to_one"}, // single cluster with multi partitions
+ {false, true, "one_to_many"},
+ {false, false, "one_to_one"}, // single cluster with single partitions
+ {false, false, "one_to_many"}
});
}
- public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition)
+ public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition)
{
// This class want to initialize several kafka brokers for multiple partitions
this.hasMultiCluster = hasMultiCluster;
this.hasMultiPartition = hasMultiPartition;
int cluster = 1 + (hasMultiCluster ? 1 : 0);
totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
-
+ this.partition = partition;
}
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
@@ -167,6 +171,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
node.setTopics(TEST_TOPIC);
node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
node.setClusters(getClusterConfig());
+ node.setStrategy(partition);
// Create Test tuple collector
CollectorModule collector = dag.addOperator("TestMessageCollector", new CollectorModule());
[2/3] incubator-apex-malhar git commit: APEXMALHAR-1973 #comment
disable committing offsets for latest|earliest and store the offset for next
message
Posted by th...@apache.org.
APEXMALHAR-1973 #comment disable committing offsets for latest|earliest and store the offset for next message
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c464f064
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c464f064
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c464f064
Branch: refs/heads/devel-3
Commit: c464f064b590b786f257a765df026cb29a50c8ae
Parents: b431eb3
Author: Siyuan Hua <hs...@apache.org>
Authored: Wed Jan 13 17:07:18 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Wed Jan 13 17:12:08 2016 -0800
----------------------------------------------------------------------
.../apache/apex/malhar/kafka/AbstractKafkaInputOperator.java | 4 +++-
.../apache/apex/malhar/kafka/AbstractKafkaPartitioner.java | 7 ++++---
2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c464f064/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 4f2f704..c021c1c 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -177,6 +177,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
@Override
public void committed(long windowId)
{
+ if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST)
+ return;
//ask kafka consumer wrapper to store the committed offsets
for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) {
Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next();
@@ -202,7 +204,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
emitTuple(tuple.getLeft(), msg);
AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(),
msg.topic(), msg.partition());
- offsetTrack.put(pm, msg.offset());
+ offsetTrack.put(pm, msg.offset() + 1);
}
emitCount += count;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c464f064/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 2159e4f..57c6998 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -256,14 +256,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
return false;
}
PartitionMeta that = (PartitionMeta)o;
- return Objects.equals(cluster, that.cluster) &&
- Objects.equals(getTopicPartition(), that.getTopicPartition());
+ return Objects.equals(partitionId, that.partitionId) &&
+ Objects.equals(cluster, that.cluster) &&
+ Objects.equals(topic, that.topic);
}
@Override
public int hashCode()
{
- return Objects.hash(cluster, getTopicPartition());
+ return Objects.hash(cluster, topic, partitionId);
}
@Override
[3/3] incubator-apex-malhar git commit: Merge branch 'MALHAR-1970' of
https://github.com/siyuanh/incubator-apex-malhar into devel-3
Posted by th...@apache.org.
Merge branch 'MALHAR-1970' of https://github.com/siyuanh/incubator-apex-malhar into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9ba99b0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9ba99b0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9ba99b0c
Branch: refs/heads/devel-3
Commit: 9ba99b0ca3cc556f8710393b2ee026bfbf21bb84
Parents: 8b37ee3 c464f06
Author: Thomas Weise <th...@datatorrent.com>
Authored: Wed Jan 13 23:53:27 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Wed Jan 13 23:53:27 2016 -0800
----------------------------------------------------------------------
.../kafka/AbstractKafkaInputOperator.java | 4 +++-
.../malhar/kafka/AbstractKafkaPartitioner.java | 7 +++---
.../apex/malhar/kafka/OneToManyPartitioner.java | 4 ++--
.../malhar/kafka/KafkaInputOperatorTest.java | 23 ++++++++++++--------
4 files changed, 23 insertions(+), 15 deletions(-)
----------------------------------------------------------------------