You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2019/03/12 19:39:36 UTC
[samza] branch master updated: SAMZA-2126: Bug fixes for batch-mode
generated stream specs (#950)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 8dea84c SAMZA-2126: Bug fixes for batch-mode generated stream specs (#950)
8dea84c is described below
commit 8dea84c944540884d6609be9e1c0a8f93aadb6ab
Author: xinyuiscool <xi...@gmail.com>
AuthorDate: Tue Mar 12 12:17:42 2019 -0700
SAMZA-2126: Bug fixes for batch-mode generated stream specs (#950)
---
.../java/org/apache/samza/execution/StreamEdge.java | 7 ++++++-
.../org/apache/samza/execution/TestStreamEdge.java | 11 +++++++++--
.../apache/samza/system/kafka/KafkaSystemAdmin.java | 5 ++++-
.../samza/system/kafka/KafkaSystemFactory.scala | 2 +-
.../system/kafka_deprecated/KafkaSystemAdmin.scala | 5 ++++-
.../system/kafka_deprecated/KafkaSystemFactory.scala | 1 -
.../system/kafka/TestKafkaSystemFactoryJava.java | 20 +++++++++++++++-----
7 files changed, 39 insertions(+), 12 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index 051abcf..c318118 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -75,7 +75,12 @@ public class StreamEdge {
StreamSpec spec = (partitions == PARTITIONS_UNKNOWN) ?
streamSpec : streamSpec.copyWithPartitionCount(partitions);
- if (isIntermediate) {
+ // For intermediate stream that physical name is the same as id,
+ // meaning the physical name is auto-generated, and not overrided
+ // by user or batch processing.
+ if (isIntermediate && spec.getId().equals(spec.getPhysicalName())) {
+ // Append unique id to the batch intermediate streams
+ // Note this will only happen for batch processing
String physicalName = StreamManager.createUniqueNameForBatch(spec.getPhysicalName(), config);
if (!physicalName.equals(spec.getPhysicalName())) {
spec = spec.copyWithPhysicalName(physicalName);
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
index 5928db1..408f638 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
@@ -50,8 +50,15 @@ public class TestStreamEdge {
Map<String, String> config = new HashMap<>();
config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
config.put(ApplicationConfig.APP_RUN_ID, "123");
- StreamEdge edge = new StreamEdge(spec, true, false, new MapConfig(config));
- assertEquals(edge.getStreamSpec().getPhysicalName(), spec.getPhysicalName() + "-123");
+
+ // For batch, if the physical name hasn't been generated, it should append the unique id
+ StreamSpec batchSpec = new StreamSpec("stream-1", "stream-1", "system-1");
+ StreamEdge edge = new StreamEdge(batchSpec, true, false, new MapConfig(config));
+ assertEquals(edge.getStreamSpec().getPhysicalName(), batchSpec.getPhysicalName() + "-123");
+
+ // if the physical name has already been geneated somehow, then don't change
+ edge = new StreamEdge(spec, true, false, new MapConfig(config));
+ assertEquals(edge.getStreamSpec().getPhysicalName(), spec.getPhysicalName());
}
@Test
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index c3c66c7..36aa695 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -581,7 +581,10 @@ public class KafkaSystemAdmin implements SystemAdmin {
new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), systemName, 1, coordinatorStreamReplicationFactor,
coordinatorStreamProperties);
} else if (intermediateStreamProperties.containsKey(spec.getId())) {
- kafkaSpec = KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties.get(spec.getId()));
+ kafkaSpec = KafkaStreamSpec.fromSpec(spec);
+ Properties properties = kafkaSpec.getProperties();
+ properties.putAll(intermediateStreamProperties.get(spec.getId()));
+ kafkaSpec = kafkaSpec.copyWithProperties(properties);
} else {
kafkaSpec = KafkaStreamSpec.fromSpec(spec);
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 035d3a8..d6d3340 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -109,8 +109,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
if (appConfig.getAppMode == ApplicationMode.BATCH) {
val streamConfig = new StreamConfig(config)
streamConfig.getStreamIds().filter(streamConfig.getIsIntermediateStream(_)).map(streamId => {
+ // only the override here
val properties = new Properties()
- properties.putAll(streamConfig.getStreamProperties(streamId))
properties.putIfAbsent("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
(streamId, properties)
}).toMap
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
index f16e507..36c819f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
@@ -428,7 +428,10 @@ class KafkaSystemAdmin(
new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, coordinatorStreamReplicationFactor,
coordinatorStreamProperties)
} else if (intermediateStreamProperties.contains(spec.getId)) {
- KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties(spec.getId))
+ val kafkaSpec = KafkaStreamSpec.fromSpec(spec)
+ val properties = kafkaSpec.getProperties
+ properties.putAll(intermediateStreamProperties(spec.getId))
+ kafkaSpec.copyWithProperties(properties)
} else {
KafkaStreamSpec.fromSpec(spec)
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
index cf7c5e8..7e8509d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
@@ -158,7 +158,6 @@ class KafkaSystemFactory extends SystemFactory with Logging {
val streamConfig = new StreamConfig(config)
streamConfig.getStreamIds().filter(streamConfig.getIsIntermediateStream(_)).map(streamId => {
val properties = new Properties()
- properties.putAll(streamConfig.getStreamProperties(streamId))
properties.putIfAbsent("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
(streamId, properties)
}).toMap
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java
index a886bab..130f0f0 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java
@@ -22,17 +22,20 @@ package org.apache.samza.system.kafka;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.StreamSpec;
import org.junit.Test;
import scala.collection.JavaConversions;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-public class TestKafkaSystemFactoryJava {
+public class TestKafkaSystemFactoryJava extends TestKafkaSystemAdmin {
@Test
public void testGetIntermediateStreamProperties() {
@@ -50,11 +53,18 @@ public class TestKafkaSystemFactoryJava {
assertTrue(properties.isEmpty());
config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
- properties = JavaConversions.mapAsJavaMap(
- factory.getIntermediateStreamProperties(new MapConfig(config)));
- assertTrue(!properties.isEmpty());
- Properties prop = properties.get("test");
+
+ KafkaSystemAdmin admin = createSystemAdmin(SYSTEM(), config);
+ StreamSpec spec = new StreamSpec("test", "test", SYSTEM(),
+ Collections.singletonMap("replication.factor", "1"));
+ KafkaStreamSpec kspec = admin.toKafkaSpec(spec);
+
+ Properties prop = kspec.getProperties();
assertEquals(prop.getProperty("retention.ms"), String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH()));
assertEquals(prop.getProperty("compression.type"), "lz4");
+
+ // replication.factor should be removed from the properties and set on the spec directly
+ assertEquals(kspec.getReplicationFactor(), 1);
+ assertNull(prop.getProperty("replication.factor"));
}
}