You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/29 19:34:36 UTC
samza git commit: SAMZA-1516: Another round of issues found by BEAM
tests
Repository: samza
Updated Branches:
refs/heads/master 3b2a1fa43 -> 12e61e98c
SAMZA-1516: Another round of issues found by BEAM tests
A couple of more fixes: 1. fix a bug of identifying input streams for an operator. 2. for partitionBy, set the partitionKey to 0L when key is null.
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Reviewers: Jagadish V <vj...@gmail.com>
Closes #370 from xinyuiscool/SAMZA-1516
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/12e61e98
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/12e61e98
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/12e61e98
Branch: refs/heads/master
Commit: 12e61e98c18a50cb04e368201d3a5797bc4eb7b0
Parents: 3b2a1fa
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Authored: Wed Nov 29 11:34:28 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Wed Nov 29 11:34:28 2017 -0800
----------------------------------------------------------------------
.../samza/operators/impl/OperatorImplGraph.java | 14 +++++++++++---
.../samza/operators/impl/PartitionByOperatorImpl.java | 3 ++-
2 files changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/12e61e98/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 49b29c8..0bb12d2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -155,6 +155,7 @@ public class OperatorImplGraph {
*/
OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
SystemStream inputStream, Config config, TaskContext context) {
+
if (!operatorImpls.containsKey(operatorSpec.getOpId()) || operatorSpec instanceof JoinOperatorSpec) {
// Either this is the first time we've seen this operatorSpec, or this is a join operator spec
// and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
@@ -174,9 +175,16 @@ public class OperatorImplGraph {
});
return operatorImpl;
} else {
- // the implementation corresponding to operatorSpec has already been instantiated
- // and registered, so we do not need to traverse the DAG further.
- return operatorImpls.get(operatorSpec.getOpId());
+ // the implementation corresponding to operatorSpec has already been instantiated and registered.
+ OperatorImpl operatorImpl = operatorImpls.get(operatorSpec.getOpId());
+ operatorImpl.registerInputStream(inputStream);
+
+ // We still need to traverse the DAG further to register the input streams.
+ Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs();
+ registeredSpecs.forEach(registeredSpec -> {
+ createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context);
+ });
+ return operatorImpl;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/12e61e98/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
index 424c10f..b3fb4b2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -77,7 +77,8 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
TaskCoordinator coordinator) {
K key = keyFunction.apply(message);
V value = valueFunction.apply(message);
- collector.send(new OutgoingMessageEnvelope(systemStream, null, key, value));
+ Long partitionKey = key == null ? 0L : null;
+ collector.send(new OutgoingMessageEnvelope(systemStream, partitionKey, key, value));
return Collections.emptyList();
}