You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/29 19:36:46 UTC

[7/8] samza git commit: SAMZA-1516: Another round of issues found by BEAM tests

SAMZA-1516: Another round of issues found by BEAM tests

A couple of more fixes: 1. fix a bug of identifying input streams for an operator. 2. for partitionBy, set the partitionKey to 0L when key is null.

Author: xiliu <xi...@xiliu-ld1.linkedin.biz>

Reviewers: Jagadish V <vj...@gmail.com>

Closes #370 from xinyuiscool/SAMZA-1516


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/12e61e98
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/12e61e98
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/12e61e98

Branch: refs/heads/0.14.0
Commit: 12e61e98c18a50cb04e368201d3a5797bc4eb7b0
Parents: 3b2a1fa
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Authored: Wed Nov 29 11:34:28 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Wed Nov 29 11:34:28 2017 -0800

----------------------------------------------------------------------
 .../samza/operators/impl/OperatorImplGraph.java       | 14 +++++++++++---
 .../samza/operators/impl/PartitionByOperatorImpl.java |  3 ++-
 2 files changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/12e61e98/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 49b29c8..0bb12d2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -155,6 +155,7 @@ public class OperatorImplGraph {
    */
   OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
       SystemStream inputStream, Config config, TaskContext context) {
+
     if (!operatorImpls.containsKey(operatorSpec.getOpId()) || operatorSpec instanceof JoinOperatorSpec) {
       // Either this is the first time we've seen this operatorSpec, or this is a join operator spec
       // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
@@ -174,9 +175,16 @@ public class OperatorImplGraph {
         });
       return operatorImpl;
     } else {
-      // the implementation corresponding to operatorSpec has already been instantiated
-      // and registered, so we do not need to traverse the DAG further.
-      return operatorImpls.get(operatorSpec.getOpId());
+      // the implementation corresponding to operatorSpec has already been instantiated and registered.
+      OperatorImpl operatorImpl = operatorImpls.get(operatorSpec.getOpId());
+      operatorImpl.registerInputStream(inputStream);
+
+      // We still need to traverse the DAG further to register the input streams.
+      Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs();
+      registeredSpecs.forEach(registeredSpec -> {
+          createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context);
+        });
+      return operatorImpl;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/12e61e98/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
index 424c10f..b3fb4b2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -77,7 +77,8 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
       TaskCoordinator coordinator) {
     K key = keyFunction.apply(message);
     V value = valueFunction.apply(message);
-    collector.send(new OutgoingMessageEnvelope(systemStream, null, key, value));
+    Long partitionKey = key == null ? 0L : null;
+    collector.send(new OutgoingMessageEnvelope(systemStream, partitionKey, key, value));
     return Collections.emptyList();
   }