You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/29 21:54:19 UTC

[2/2] samza git commit: SAMZA-1967: Tests failing when Job uses any serde other than NoOp

SAMZA-1967: Tests failing when Job uses any serde other than NoOp

Context: Serde is configured in JobNodeConfigurationGenerator and any StreamDescriptor#toConfig does not generate key and msg serde configs

Problem: Tests failing when Job uses any serde other than NoOp, since ApplicationDescriptor serdes take precedence in absence of any user-supplied configs

Solution: Passing null msg and key serde configs in userConfigs for StreamDescriptors ensures ApplicationDescriptor generated serde configs don't take precedence

prateekm rmatharu please take a look

Author: Sanil15 <sa...@gmail.com>

Reviewers: Prateek M<pm...@linkedin.com>

Closes #764 from Sanil15/SAMZA-1967


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bdae04b0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bdae04b0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bdae04b0

Branch: refs/heads/1.0.0
Commit: bdae04b097dc89d44ee93694cbc47ea07fe81d3c
Parents: 85d19bb
Author: Sanil15 <sa...@gmail.com>
Authored: Fri Oct 26 17:51:58 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Mon Oct 29 13:34:36 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/test/framework/TestRunner.java | 23 ++++++++++-
 .../StreamApplicationIntegrationTest.java       | 43 ++++++++++++++------
 .../framework/StreamTaskIntegrationTest.java    |  5 ++-
 3 files changed, 54 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/bdae04b0/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 531b0ef..c80ce1b 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -39,6 +39,7 @@ import org.apache.samza.config.InMemorySystemConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.job.ApplicationStatus;
@@ -55,6 +56,7 @@ import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.descriptors.StreamDescriptor;
 import org.apache.samza.system.inmemory.InMemorySystemFactory;
 import org.apache.samza.task.AsyncStreamTask;
 import org.apache.samza.task.StreamTask;
@@ -81,6 +83,7 @@ import org.slf4j.LoggerFactory;
  *    <li>"job.host-affinity.enabled" = "false"</li>
  *  </ol>
  *
+ * TestRunner only supports NoOpSerde i.e. inputs to Test Framework should be deserialized
  */
 public class TestRunner {
   private static final Logger LOG = LoggerFactory.getLogger(TestRunner.class);
@@ -187,7 +190,7 @@ public class TestRunner {
    * Adds the provided input stream with mock data to the test application.
    *
    * @param descriptor describes the stream that is supposed to be input to Samza application
-   * @param messages messages used to initialize the single partition stream
+   * @param messages messages used to initialize the single partition stream. These message should always be deserialized
    * @param <StreamMessageType> a message with null key or a KV {@link org.apache.samza.operators.KV}.
    *                            key of KV represents key of {@link org.apache.samza.system.IncomingMessageEnvelope} or
    *                           {@link org.apache.samza.system.OutgoingMessageEnvelope} and value is message
@@ -206,7 +209,8 @@ public class TestRunner {
    * Adds the provided input stream with mock data to the test application. Default configs and user added configs have
    * a higher precedence over system and stream descriptor generated configs.
    * @param descriptor describes the stream that is supposed to be input to Samza application
-   * @param messages map whose key is partitionId and value is messages in the partition
+   * @param messages map whose key is partitionId and value is messages in the partition. These message should always
+   *                 be deserialized
    * @param <StreamMessageType> message with null key or a KV {@link org.apache.samza.operators.KV}.
    *                           A key of which represents key of {@link org.apache.samza.system.IncomingMessageEnvelope} or
    *                           {@link org.apache.samza.system.OutgoingMessageEnvelope} and value is message
@@ -243,6 +247,7 @@ public class TestRunner {
         .createStream(spec);
     addConfig(streamDescriptor.toConfig());
     addConfig(streamDescriptor.getSystemDescriptor().toConfig());
+    addSerdeConfigs(streamDescriptor);
     return this;
   }
 
@@ -359,6 +364,7 @@ public class TestRunner {
     imsd.withInMemoryScope(this.inMemoryScope);
     addConfig(descriptor.toConfig());
     addConfig(descriptor.getSystemDescriptor().toConfig());
+    addSerdeConfigs(descriptor);
     StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitionData.size());
     SystemFactory factory = new InMemorySystemFactory();
     Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig());
@@ -391,4 +397,17 @@ public class TestRunner {
       LOG.warn("Could not delete the directory " + path);
     }
   }
+
+  /**
+   * Test Framework only supports NoOpSerde. This method ensures null key and msg serde config for input and output streams
+   * takes preference when configs are merged in {@link org.apache.samza.execution.JobPlanner#getExecutionPlan}
+   * over {@link org.apache.samza.application.descriptors.ApplicationDescriptor} generated configs
+   */
+  private void addSerdeConfigs(StreamDescriptor descriptor) {
+    String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), descriptor.getStreamId());
+    String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE();
+    String msgSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE();
+    this.configs.put(keySerdeConfigKey, null);
+    this.configs.put(msgSerdeConfigKey, null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/bdae04b0/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 476c0dc..b629317 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
@@ -33,6 +34,7 @@ import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
@@ -57,16 +59,23 @@ public class StreamApplicationIntegrationTest {
 
   @Test
   public void testStatefulJoinWithLocalTable() {
-    List<TestTableData.PageView> pageViews = Arrays.asList(TestTableData.generatePageViews(10));
-    List<TestTableData.Profile> profiles = Arrays.asList(TestTableData.generateProfiles(10));
+    Random random = new Random();
+    List<KV<String, TestTableData.PageView>> pageViews = Arrays.asList(TestTableData.generatePageViews(10))
+        .stream()
+        .map(x -> KV.of(PAGEKEYS[random.nextInt(PAGEKEYS.length)], x))
+        .collect(Collectors.toList());
+    List<KV<String, TestTableData.Profile>> profiles = Arrays.asList(TestTableData.generateProfiles(10))
+        .stream()
+        .map(x -> KV.of(PAGEKEYS[random.nextInt(PAGEKEYS.length)], x))
+        .collect(Collectors.toList());
 
     InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
 
-    InMemoryInputDescriptor<TestTableData.PageView> pageViewStreamDesc = isd
-        .getInputDescriptor("PageView", new NoOpSerde<TestTableData.PageView>());
+    InMemoryInputDescriptor<KV<String, TestTableData.PageView>> pageViewStreamDesc = isd
+        .getInputDescriptor("PageView", new NoOpSerde<KV<String, TestTableData.PageView>>());
 
-    InMemoryInputDescriptor<TestTableData.Profile> profileStreamDesc = isd
-        .getInputDescriptor("Profile", new NoOpSerde<TestTableData.Profile>())
+    InMemoryInputDescriptor<KV<String, TestTableData.Profile>> profileStreamDesc = isd
+        .getInputDescriptor("Profile", new NoOpSerde<KV<String, TestTableData.Profile>>())
         .shouldBootstrap();
 
     InMemoryOutputDescriptor<TestTableData.EnrichedPageView> outputStreamDesc = isd
@@ -127,6 +136,7 @@ public class StreamApplicationIntegrationTest {
         .run(Duration.ofMillis(1000));
   }
 
+
   private static class PageViewProfileViewJoinApplication implements StreamApplication {
     @Override
     public void describe(StreamApplicationDescriptor appDescriptor) {
@@ -135,16 +145,23 @@ public class StreamApplicationIntegrationTest {
               KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));
 
       KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
-      KafkaInputDescriptor<TestTableData.Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-      appDescriptor.getInputStream(profileISD).map(m -> new KV(m.getMemberId(), m)).sendTo(table);
 
-      KafkaInputDescriptor<TestTableData.PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      KafkaInputDescriptor<KV<String, TestTableData.Profile>> profileISD =
+          ksd.getInputDescriptor("Profile", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
+
+      appDescriptor
+          .getInputStream(profileISD)
+          .map(m -> new KV(m.getValue().getMemberId(), m.getValue()))
+          .sendTo(table);
+
+      KafkaInputDescriptor<KV<String, TestTableData.PageView>> pageViewISD =
+          ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
       KafkaOutputDescriptor<TestTableData.EnrichedPageView> enrichedPageViewOSD =
-          ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
+          ksd.getOutputDescriptor("EnrichedPageView", new JsonSerdeV2<>());
+
       OutputStream<TestTableData.EnrichedPageView> outputStream = appDescriptor.getOutputStream(enrichedPageViewOSD);
       appDescriptor.getInputStream(pageViewISD)
-          .partitionBy(TestTableData.PageView::getMemberId,  pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(
-              TestTableData.PageView.class)), "p1")
+          .partitionBy(pv -> pv.getValue().getMemberId() ,  pv -> pv.getValue(), KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(TestTableData.PageView.class)), "p1")
           .join(table, new PageViewToProfileJoinFunction())
           .sendTo(outputStream);
     }
@@ -155,7 +172,7 @@ public class StreamApplicationIntegrationTest {
     public void describe(StreamApplicationDescriptor appDescriptor) {
       KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
       KafkaInputDescriptor<KV<String, PageView>> isd =
-          ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
+          ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
       MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
       inputStream.map(KV::getValue).filter(pv -> pv.getPageKey().equals("inbox"));
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/bdae04b0/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 5fee762..2137a46 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.samza.context.Context;
 import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
@@ -209,8 +210,8 @@ public class StreamTaskIntegrationTest {
     @Override
     public void describe(TaskApplicationDescriptor appDescriptor) {
       KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
-      KafkaInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-      KafkaInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      KafkaInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new JsonSerdeV2<>());
+      KafkaInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new JsonSerdeV2<>());
       KafkaOutputDescriptor<EnrichedPageView> enrichedPageViewOSD =
           ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
       appDescriptor