You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/10/27 00:53:05 UTC
samza git commit: SAMZA-1967: Tests failing when Job uses any serde
other than NoOp
Repository: samza
Updated Branches:
refs/heads/master 1c2d6effb -> 08496c96d
SAMZA-1967: Tests failing when Job uses any serde other than NoOp
Context: Serde is configured in JobNodeConfigurationGenerator and any StreamDescriptor#toConfig does not generate key and msg serde configs
Problem: Tests failing when Job uses any serde other than NoOp, since ApplicationDescriptor serdes take precedence in absence of any user-supplied configs
Solution: Passing null msg and key serde configs in userConfigs for StreamDescriptors ensures ApplicationDescriptor generated serde configs don't take precedence
prateekm rmatharu please take a look
Author: Sanil15 <sa...@gmail.com>
Reviewers: Prateek M<pm...@linkedin.com>
Closes #764 from Sanil15/SAMZA-1967
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/08496c96
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/08496c96
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/08496c96
Branch: refs/heads/master
Commit: 08496c96d229582456d5fe48f442876745c11112
Parents: 1c2d6ef
Author: Sanil15 <sa...@gmail.com>
Authored: Fri Oct 26 17:51:58 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Oct 26 17:51:58 2018 -0700
----------------------------------------------------------------------
.../apache/samza/test/framework/TestRunner.java | 23 ++++++++++-
.../StreamApplicationIntegrationTest.java | 43 ++++++++++++++------
.../framework/StreamTaskIntegrationTest.java | 5 ++-
3 files changed, 54 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/08496c96/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 531b0ef..c80ce1b 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -39,6 +39,7 @@ import org.apache.samza.config.InMemorySystemConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.job.ApplicationStatus;
@@ -55,6 +56,7 @@ import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.descriptors.StreamDescriptor;
import org.apache.samza.system.inmemory.InMemorySystemFactory;
import org.apache.samza.task.AsyncStreamTask;
import org.apache.samza.task.StreamTask;
@@ -81,6 +83,7 @@ import org.slf4j.LoggerFactory;
* <li>"job.host-affinity.enabled" = "false"</li>
* </ol>
*
+ * TestRunner only supports NoOpSerde i.e. inputs to Test Framework should be deserialized
*/
public class TestRunner {
private static final Logger LOG = LoggerFactory.getLogger(TestRunner.class);
@@ -187,7 +190,7 @@ public class TestRunner {
* Adds the provided input stream with mock data to the test application.
*
* @param descriptor describes the stream that is supposed to be input to Samza application
- * @param messages messages used to initialize the single partition stream
+ * @param messages messages used to initialize the single partition stream. These message should always be deserialized
* @param <StreamMessageType> a message with null key or a KV {@link org.apache.samza.operators.KV}.
* key of KV represents key of {@link org.apache.samza.system.IncomingMessageEnvelope} or
* {@link org.apache.samza.system.OutgoingMessageEnvelope} and value is message
@@ -206,7 +209,8 @@ public class TestRunner {
* Adds the provided input stream with mock data to the test application. Default configs and user added configs have
* a higher precedence over system and stream descriptor generated configs.
* @param descriptor describes the stream that is supposed to be input to Samza application
- * @param messages map whose key is partitionId and value is messages in the partition
+ * @param messages map whose key is partitionId and value is messages in the partition. These message should always
+ * be deserialized
* @param <StreamMessageType> message with null key or a KV {@link org.apache.samza.operators.KV}.
* A key of which represents key of {@link org.apache.samza.system.IncomingMessageEnvelope} or
* {@link org.apache.samza.system.OutgoingMessageEnvelope} and value is message
@@ -243,6 +247,7 @@ public class TestRunner {
.createStream(spec);
addConfig(streamDescriptor.toConfig());
addConfig(streamDescriptor.getSystemDescriptor().toConfig());
+ addSerdeConfigs(streamDescriptor);
return this;
}
@@ -359,6 +364,7 @@ public class TestRunner {
imsd.withInMemoryScope(this.inMemoryScope);
addConfig(descriptor.toConfig());
addConfig(descriptor.getSystemDescriptor().toConfig());
+ addSerdeConfigs(descriptor);
StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitionData.size());
SystemFactory factory = new InMemorySystemFactory();
Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig());
@@ -391,4 +397,17 @@ public class TestRunner {
LOG.warn("Could not delete the directory " + path);
}
}
+
+ /**
+ * Test Framework only supports NoOpSerde. This method ensures null key and msg serde config for input and output streams
+ * takes preference when configs are merged in {@link org.apache.samza.execution.JobPlanner#getExecutionPlan}
+ * over {@link org.apache.samza.application.descriptors.ApplicationDescriptor} generated configs
+ */
+ private void addSerdeConfigs(StreamDescriptor descriptor) {
+ String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), descriptor.getStreamId());
+ String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE();
+ String msgSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE();
+ this.configs.put(keySerdeConfigKey, null);
+ this.configs.put(msgSerdeConfigKey, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/08496c96/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 476c0dc..b629317 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
+import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
@@ -33,6 +34,7 @@ import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.StringSerde;
import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
@@ -57,16 +59,23 @@ public class StreamApplicationIntegrationTest {
@Test
public void testStatefulJoinWithLocalTable() {
- List<TestTableData.PageView> pageViews = Arrays.asList(TestTableData.generatePageViews(10));
- List<TestTableData.Profile> profiles = Arrays.asList(TestTableData.generateProfiles(10));
+ Random random = new Random();
+ List<KV<String, TestTableData.PageView>> pageViews = Arrays.asList(TestTableData.generatePageViews(10))
+ .stream()
+ .map(x -> KV.of(PAGEKEYS[random.nextInt(PAGEKEYS.length)], x))
+ .collect(Collectors.toList());
+ List<KV<String, TestTableData.Profile>> profiles = Arrays.asList(TestTableData.generateProfiles(10))
+ .stream()
+ .map(x -> KV.of(PAGEKEYS[random.nextInt(PAGEKEYS.length)], x))
+ .collect(Collectors.toList());
InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
- InMemoryInputDescriptor<TestTableData.PageView> pageViewStreamDesc = isd
- .getInputDescriptor("PageView", new NoOpSerde<TestTableData.PageView>());
+ InMemoryInputDescriptor<KV<String, TestTableData.PageView>> pageViewStreamDesc = isd
+ .getInputDescriptor("PageView", new NoOpSerde<KV<String, TestTableData.PageView>>());
- InMemoryInputDescriptor<TestTableData.Profile> profileStreamDesc = isd
- .getInputDescriptor("Profile", new NoOpSerde<TestTableData.Profile>())
+ InMemoryInputDescriptor<KV<String, TestTableData.Profile>> profileStreamDesc = isd
+ .getInputDescriptor("Profile", new NoOpSerde<KV<String, TestTableData.Profile>>())
.shouldBootstrap();
InMemoryOutputDescriptor<TestTableData.EnrichedPageView> outputStreamDesc = isd
@@ -127,6 +136,7 @@ public class StreamApplicationIntegrationTest {
.run(Duration.ofMillis(1000));
}
+
private static class PageViewProfileViewJoinApplication implements StreamApplication {
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
@@ -135,16 +145,23 @@ public class StreamApplicationIntegrationTest {
KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
- KafkaInputDescriptor<TestTableData.Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
- appDescriptor.getInputStream(profileISD).map(m -> new KV(m.getMemberId(), m)).sendTo(table);
- KafkaInputDescriptor<TestTableData.PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+ KafkaInputDescriptor<KV<String, TestTableData.Profile>> profileISD =
+ ksd.getInputDescriptor("Profile", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
+
+ appDescriptor
+ .getInputStream(profileISD)
+ .map(m -> new KV(m.getValue().getMemberId(), m.getValue()))
+ .sendTo(table);
+
+ KafkaInputDescriptor<KV<String, TestTableData.PageView>> pageViewISD =
+ ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
KafkaOutputDescriptor<TestTableData.EnrichedPageView> enrichedPageViewOSD =
- ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
+ ksd.getOutputDescriptor("EnrichedPageView", new JsonSerdeV2<>());
+
OutputStream<TestTableData.EnrichedPageView> outputStream = appDescriptor.getOutputStream(enrichedPageViewOSD);
appDescriptor.getInputStream(pageViewISD)
- .partitionBy(TestTableData.PageView::getMemberId, pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(
- TestTableData.PageView.class)), "p1")
+ .partitionBy(pv -> pv.getValue().getMemberId() , pv -> pv.getValue(), KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(TestTableData.PageView.class)), "p1")
.join(table, new PageViewToProfileJoinFunction())
.sendTo(outputStream);
}
@@ -155,7 +172,7 @@ public class StreamApplicationIntegrationTest {
public void describe(StreamApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
KafkaInputDescriptor<KV<String, PageView>> isd =
- ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
+ ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
inputStream.map(KV::getValue).filter(pv -> pv.getPageKey().equals("inbox"));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/08496c96/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 5fee762..2137a46 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.samza.context.Context;
import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
@@ -209,8 +210,8 @@ public class StreamTaskIntegrationTest {
@Override
public void describe(TaskApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
- KafkaInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
- KafkaInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+ KafkaInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new JsonSerdeV2<>());
+ KafkaInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new JsonSerdeV2<>());
KafkaOutputDescriptor<EnrichedPageView> enrichedPageViewOSD =
ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
appDescriptor