You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2019/03/12 19:39:36 UTC

[samza] branch master updated: SAMZA-2126: Bug fixes for batch-mode generated stream specs (#950)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dea84c  SAMZA-2126: Bug fixes for batch-mode generated stream specs (#950)
8dea84c is described below

commit 8dea84c944540884d6609be9e1c0a8f93aadb6ab
Author: xinyuiscool <xi...@gmail.com>
AuthorDate: Tue Mar 12 12:17:42 2019 -0700

    SAMZA-2126: Bug fixes for batch-mode generated stream specs (#950)
---
 .../java/org/apache/samza/execution/StreamEdge.java  |  7 ++++++-
 .../org/apache/samza/execution/TestStreamEdge.java   | 11 +++++++++--
 .../apache/samza/system/kafka/KafkaSystemAdmin.java  |  5 ++++-
 .../samza/system/kafka/KafkaSystemFactory.scala      |  2 +-
 .../system/kafka_deprecated/KafkaSystemAdmin.scala   |  5 ++++-
 .../system/kafka_deprecated/KafkaSystemFactory.scala |  1 -
 .../system/kafka/TestKafkaSystemFactoryJava.java     | 20 +++++++++++++++-----
 7 files changed, 39 insertions(+), 12 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index 051abcf..c318118 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -75,7 +75,12 @@ public class StreamEdge {
     StreamSpec spec = (partitions == PARTITIONS_UNKNOWN) ?
         streamSpec : streamSpec.copyWithPartitionCount(partitions);
 
-    if (isIntermediate) {
+    // For intermediate stream that physical name is the same as id,
+    // meaning the physical name is auto-generated, and not overrided
+    // by user or batch processing.
+    if (isIntermediate && spec.getId().equals(spec.getPhysicalName())) {
+      // Append unique id to the batch intermediate streams
+      // Note this will only happen for batch processing
       String physicalName = StreamManager.createUniqueNameForBatch(spec.getPhysicalName(), config);
       if (!physicalName.equals(spec.getPhysicalName())) {
         spec = spec.copyWithPhysicalName(physicalName);
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
index 5928db1..408f638 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
@@ -50,8 +50,15 @@ public class TestStreamEdge {
     Map<String, String> config = new HashMap<>();
     config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
     config.put(ApplicationConfig.APP_RUN_ID, "123");
-    StreamEdge edge = new StreamEdge(spec, true, false, new MapConfig(config));
-    assertEquals(edge.getStreamSpec().getPhysicalName(), spec.getPhysicalName() + "-123");
+
+    // For batch, if the physical name hasn't been generated, it should append the unique id
+    StreamSpec batchSpec = new StreamSpec("stream-1", "stream-1", "system-1");
+    StreamEdge edge = new StreamEdge(batchSpec, true, false, new MapConfig(config));
+    assertEquals(edge.getStreamSpec().getPhysicalName(), batchSpec.getPhysicalName() + "-123");
+
+    // if the physical name has already been geneated somehow, then don't change
+    edge = new StreamEdge(spec, true, false, new MapConfig(config));
+    assertEquals(edge.getStreamSpec().getPhysicalName(), spec.getPhysicalName());
   }
 
   @Test
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index c3c66c7..36aa695 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -581,7 +581,10 @@ public class KafkaSystemAdmin implements SystemAdmin {
           new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), systemName, 1, coordinatorStreamReplicationFactor,
               coordinatorStreamProperties);
     } else if (intermediateStreamProperties.containsKey(spec.getId())) {
-      kafkaSpec = KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties.get(spec.getId()));
+      kafkaSpec = KafkaStreamSpec.fromSpec(spec);
+      Properties properties = kafkaSpec.getProperties();
+      properties.putAll(intermediateStreamProperties.get(spec.getId()));
+      kafkaSpec = kafkaSpec.copyWithProperties(properties);
     } else {
       kafkaSpec = KafkaStreamSpec.fromSpec(spec);
     }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 035d3a8..d6d3340 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -109,8 +109,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     if (appConfig.getAppMode == ApplicationMode.BATCH) {
       val streamConfig = new StreamConfig(config)
       streamConfig.getStreamIds().filter(streamConfig.getIsIntermediateStream(_)).map(streamId => {
+        // only the override here
         val properties = new Properties()
-        properties.putAll(streamConfig.getStreamProperties(streamId))
         properties.putIfAbsent("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
         (streamId, properties)
       }).toMap
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
index f16e507..36c819f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
@@ -428,7 +428,10 @@ class KafkaSystemAdmin(
       new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, coordinatorStreamReplicationFactor,
         coordinatorStreamProperties)
     } else if (intermediateStreamProperties.contains(spec.getId)) {
-      KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties(spec.getId))
+      val kafkaSpec = KafkaStreamSpec.fromSpec(spec)
+      val properties = kafkaSpec.getProperties
+      properties.putAll(intermediateStreamProperties(spec.getId))
+      kafkaSpec.copyWithProperties(properties)
     } else {
       KafkaStreamSpec.fromSpec(spec)
     }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
index cf7c5e8..7e8509d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
@@ -158,7 +158,6 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       val streamConfig = new StreamConfig(config)
       streamConfig.getStreamIds().filter(streamConfig.getIsIntermediateStream(_)).map(streamId => {
         val properties = new Properties()
-        properties.putAll(streamConfig.getStreamProperties(streamId))
         properties.putIfAbsent("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
         (streamId, properties)
       }).toMap
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java
index a886bab..130f0f0 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java
@@ -22,17 +22,20 @@ package org.apache.samza.system.kafka;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 import scala.collection.JavaConversions;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-public class TestKafkaSystemFactoryJava {
+public class TestKafkaSystemFactoryJava extends TestKafkaSystemAdmin {
 
   @Test
   public void testGetIntermediateStreamProperties() {
@@ -50,11 +53,18 @@ public class TestKafkaSystemFactoryJava {
     assertTrue(properties.isEmpty());
 
     config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
-    properties = JavaConversions.mapAsJavaMap(
-        factory.getIntermediateStreamProperties(new MapConfig(config)));
-    assertTrue(!properties.isEmpty());
-    Properties prop = properties.get("test");
+
+    KafkaSystemAdmin admin = createSystemAdmin(SYSTEM(), config);
+    StreamSpec spec = new StreamSpec("test", "test", SYSTEM(),
+        Collections.singletonMap("replication.factor", "1"));
+    KafkaStreamSpec kspec = admin.toKafkaSpec(spec);
+
+    Properties prop = kspec.getProperties();
     assertEquals(prop.getProperty("retention.ms"), String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH()));
     assertEquals(prop.getProperty("compression.type"), "lz4");
+
+    // replication.factor should be removed from the properties and set on the spec directly
+    assertEquals(kspec.getReplicationFactor(), 1);
+    assertNull(prop.getProperty("replication.factor"));
   }
 }