You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/10/03 22:23:52 UTC

[14/16] samza git commit: SAMZA-1434: Fix issues found in Hadoop

SAMZA-1434: Fix issues found in Hadoop

Fix the following bugs found when running Samza on hadoop:

1. Hdfs allows output partitions to be 0 (empty folder)
2. Add null check for the changelog topic generation
3. Call getStreamSpec() instead of using streamSpec member in StreamEdge. This is due to getStreamSpec will do more transformation.
4. Bound the auto-generated intermediate topic partition by a certain count (256).

Author: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>

Reviewers: Jagadish Venkatraman <ja...@apache.org>

Closes #307 from xinyuiscool/SAMZA-1434


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a1f01444
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a1f01444
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a1f01444

Branch: refs/heads/master
Commit: a1f01444ec12f49684213cc69b1cce16ff0f8232
Parents: 2819cbc
Author: Xinyu Liu <xi...@gmail.com>
Authored: Fri Sep 29 15:05:55 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Committed: Fri Sep 29 15:05:55 2017 -0700

----------------------------------------------------------------------
 samza-api/src/main/java/org/apache/samza/system/StreamSpec.java | 5 +++--
 .../main/java/org/apache/samza/config/JavaStorageConfig.java    | 4 +++-
 .../main/java/org/apache/samza/execution/ExecutionPlanner.java  | 5 ++++-
 .../src/main/java/org/apache/samza/execution/StreamEdge.java    | 3 ++-
 4 files changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a1f01444/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index 8d7401a..6ea1a22 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -158,8 +158,9 @@ public class StreamSpec {
     validateLogicalIdentifier("streamId", id);
     validateLogicalIdentifier("systemName", systemName);
 
-    if (partitionCount < 1) {
-      throw new IllegalArgumentException("Parameter 'partitionCount' must be greater than 0");
+    // partition count being 0 is a valid use case in Hadoop when the output stream is an empty folder
+    if (partitionCount < 0) {
+      throw new IllegalArgumentException("Parameter 'partitionCount' must be >= 0");
     }
 
     this.id = id;

http://git-wip-us.apache.org/repos/asf/samza/blob/a1f01444/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
index 4e9a58a..34e5683 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
@@ -73,7 +73,9 @@ public class JavaStorageConfig extends MapConfig {
       systemStreamRes = systemStream;
     }
 
-    systemStreamRes = StreamManager.createUniqueNameForBatch(systemStreamRes, this);
+    if (systemStreamRes != null) {
+      systemStreamRes = StreamManager.createUniqueNameForBatch(systemStreamRes, this);
+    }
     return systemStreamRes;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a1f01444/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index e258d13..998ea1e 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory;
 public class ExecutionPlanner {
   private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
 
+  private static final int MAX_INFERRED_PARTITIONS = 256;
+
   private final Config config;
   private final StreamManager streamManager;
 
@@ -253,9 +255,10 @@ public class ExecutionPlanner {
     if (partitions < 0) {
       // use the following simple algo to figure out the partitions
       // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
+      // partition will be further bounded by MAX_INFERRED_PARTITIONS. This is important when running in hadoop.
       int maxInPartitions = maxPartition(jobGraph.getSources());
       int maxOutPartitions = maxPartition(jobGraph.getSinks());
-      partitions = Math.max(maxInPartitions, maxOutPartitions);
+      partitions = Math.min(Math.max(maxInPartitions, maxOutPartitions), MAX_INFERRED_PARTITIONS);
     }
     for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
       if (edge.getPartitionCount() <= 0) {

http://git-wip-us.apache.org/repos/asf/samza/blob/a1f01444/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index f545490..792fde5 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -82,7 +82,8 @@ public class StreamEdge {
   }
 
   SystemStream getSystemStream() {
-    return new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
+    StreamSpec spec = getStreamSpec();
+    return new SystemStream(spec.getSystemName(), spec.getPhysicalName());
   }
 
   String getFormattedSystemStream() {