You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/01 22:11:58 UTC

samza git commit: SAMZA-1852: Adding default job system in TestRunner, disabling host affinity to support TableDescriptors and refining addConfig method for TestRunner API

Repository: samza
Updated Branches:
  refs/heads/master 334d24e68 -> 094ff1641


SAMZA-1852: Adding default job system in TestRunner, disabling host affinity to support TableDescriptors and refining addConfig method for TestRunner API

- The default system is a required config for intermediate streams, and since no user will write assertions against them, defaulting it makes it easier for the user to write test
- To support stateful jobs using Table API descriptors we need to disable host affinity, which is enabled by table API by default
- vjagadish pointed out addConfigs vs addOverrideConfig to be a confusing user-facing API. We now support only addConfig with different signatures, this configs takes precedence over any descriptor or TestRunner generated configs

Author: Sanil15 <sa...@gmail.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>, Yi Pan <ni...@gmail.com>

Closes #651 from Sanil15/SAMZA-1852


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/094ff164
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/094ff164
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/094ff164

Branch: refs/heads/master
Commit: 094ff1641c330e87f081d650b9066a0842f77963
Parents: 334d24e
Author: Sanil15 <sa...@gmail.com>
Authored: Mon Oct 1 15:11:54 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Mon Oct 1 15:11:54 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/test/framework/TestRunner.java | 64 ++++++++++----------
 .../AsyncStreamTaskIntegrationTest.java         |  2 +-
 .../StreamApplicationIntegrationTest.java       | 23 -------
 .../framework/StreamTaskIntegrationTest.java    |  4 +-
 .../table/TestLocalTableWithSideInputs.java     | 14 ++---
 5 files changed, 40 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index fe8581b..a1103dd 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -33,7 +33,9 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.InMemorySystemConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
@@ -74,11 +76,14 @@ import org.junit.Assert;
  *    <li>"task.name.grouper.factory" = {@link SingleContainerGrouperFactory}</li>
  *    <li>"job.name" = "test-samza"</li>
  *    <li>"processor.id" = "1"</li>
+ *    <li>"job.default.system" = {@code JOB_DEFAULT_SYSTEM}</li>
+ *    <li>"job.host-affinity.enabled" = "false"</li>
  *  </ol>
  *
  */
 public class TestRunner {
-  public static final String JOB_NAME = "samza-test";
+  private static final String JOB_DEFAULT_SYSTEM = "default-samza-system";
+  private static final String JOB_NAME = "samza-test";
 
   private Map<String, String> configs;
   private SamzaApplication app;
@@ -96,6 +101,11 @@ public class TestRunner {
     configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
     configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+    addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM);
+    // This is important because Table Api enables host affinity by default for RocksDb
+    addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString());
+    addConfig(InMemorySystemConfig.INMEMORY_SCOPE, inMemoryScope);
+    addConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).withInMemoryScope(inMemoryScope).toConfig());
   }
 
   /**
@@ -142,13 +152,17 @@ public class TestRunner {
   }
 
   /**
-   * Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it.
-   * @param config configs for the application
+   * Adds a config to Samza application. This config takes precedence over default configs and descriptor generated configs
+   *
+   * @param key of the config
+   * @param value of the config
    * @return this {@link TestRunner}
    */
-  public TestRunner addConfigs(Map<String, String> config) {
-    Preconditions.checkNotNull(config);
-    config.forEach(this.configs::putIfAbsent);
+  public TestRunner addConfig(String key, String value) {
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(value);
+    String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
+    configs.put(String.format("%s%s", configPrefix, key), value);
     return this;
   }
 
@@ -157,24 +171,10 @@ public class TestRunner {
    * @param config configs for the application
    * @return this {@link TestRunner}
    */
-  public TestRunner addConfigs(Map<String, String> config, String configPrefix) {
+  public TestRunner addConfig(Map<String, String> config) {
     Preconditions.checkNotNull(config);
-    config.forEach((key, value) -> this.configs.putIfAbsent(String.format("%s%s", configPrefix, key), value));
-    return this;
-  }
-
-  /**
-   * Adds a config to {@code configs} if its not already present. Overrides a config value for which key is already
-   * exisiting in {@code configs}
-   * @param key key of the config
-   * @param value value of the config
-   * @return this {@link TestRunner}
-   */
-  public TestRunner addOverrideConfig(String key, String value) {
-    Preconditions.checkNotNull(key);
-    Preconditions.checkNotNull(value);
-    String configKeyPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
-    configs.put(String.format("%s%s", configKeyPrefix, key), value);
+    String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
+    config.forEach((key, value) -> this.configs.put(String.format("%s%s", configPrefix, key), value));
     return this;
   }
 
@@ -202,7 +202,8 @@ public class TestRunner {
   }
 
   /**
-   * Adds the provided input stream with mock data to the test application.
+   * Adds the provided input stream with mock data to the test application. Default configs and user added configs have
+   * a higher precedence over system and stream descriptor generated configs.
    * @param descriptor describes the stream that is supposed to be input to Samza application
    * @param messages map whose key is partitionId and value is messages in the partition
    * @param <StreamMessageType> message with null key or a KV {@link org.apache.samza.operators.KV}.
@@ -220,12 +221,13 @@ public class TestRunner {
   }
 
   /**
-   * Adds the provided output stream to the test application.
+   * Adds the provided output stream to the test application. Default configs and user added configs have a higher
+   * precedence over system and stream descriptor generated configs.
    * @param streamDescriptor describes the stream that is supposed to be output for the Samza application
    * @param partitionCount partition count of output stream
    * @return this {@link TestRunner}
    */
-  public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int partitionCount) {
+  public TestRunner addOutputStream(InMemoryOutputDescriptor<?> streamDescriptor, int partitionCount) {
     Preconditions.checkNotNull(streamDescriptor);
     Preconditions.checkState(partitionCount >= 1);
     InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) streamDescriptor.getSystemDescriptor();
@@ -238,8 +240,8 @@ public class TestRunner {
     factory
         .getAdmin(streamDescriptor.getSystemName(), config)
         .createStream(spec);
-    addConfigs(streamDescriptor.toConfig());
-    addConfigs(streamDescriptor.getSystemDescriptor().toConfig());
+    addConfig(streamDescriptor.toConfig());
+    addConfig(streamDescriptor.getSystemDescriptor().toConfig());
     return this;
   }
 
@@ -340,7 +342,7 @@ public class TestRunner {
    *                 messages in the partition
    * @param descriptor describes a stream to initialize with the in memory system
    */
-  private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor descriptor,
+  private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor<?> descriptor,
       Map<Integer, Iterable<StreamMessageType>> partitonData) {
     String systemName = descriptor.getSystemName();
     String streamName = (String) descriptor.getPhysicalName().orElse(descriptor.getStreamId());
@@ -352,8 +354,8 @@ public class TestRunner {
     }
     InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) descriptor.getSystemDescriptor();
     imsd.withInMemoryScope(this.inMemoryScope);
-    addConfigs(descriptor.toConfig());
-    addConfigs(descriptor.getSystemDescriptor().toConfig(), String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()));
+    addConfig(descriptor.toConfig());
+    addConfig(descriptor.getSystemDescriptor().toConfig());
     StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitonData.size());
     SystemFactory factory = new InMemorySystemFactory();
     Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig());

http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
index ef9508a..f1757ab 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
@@ -123,7 +123,7 @@ public class AsyncStreamTaskIntegrationTest {
         .of(MyAsyncStreamTask.class)
         .addInputStream(imid, inputPartitionData)
         .addOutputStream(imod, 5)
-        .addOverrideConfig("task.max.concurrency", "4")
+        .addConfig("task.max.concurrency", "4")
         .run(Duration.ofSeconds(2));
 
     StreamAssert.containsInAnyOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));

http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 6dd9159..4ebe95f 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -96,7 +96,6 @@ public class StreamApplicationIntegrationTest {
         .of(pageViewRepartition)
         .addInputStream(imid, pageviews)
         .addOutputStream(imod, 10)
-        .addOverrideConfig("job.default.system", "test")
         .run(Duration.ofMillis(1500));
 
     Assert.assertEquals(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1);
@@ -109,27 +108,6 @@ public class StreamApplicationIntegrationTest {
   }
 
   /**
-   * Job should fail since it is missing config "job.default.system" for partitionBy Operator
-   */
-  @Test(expected = SamzaException.class)
-  public void testSamzaJobStartMissingConfigFailureForStreamApplication() {
-
-    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
-
-    InMemoryInputDescriptor<PageView> imid = isd
-        .getInputDescriptor("PageView", new NoOpSerde<PageView>());
-
-    InMemoryOutputDescriptor<PageView> imod = isd
-        .getOutputDescriptor("Output", new NoOpSerde<PageView>());
-
-    TestRunner
-        .of(pageViewRepartition)
-        .addInputStream(imid, new ArrayList<>())
-        .addOutputStream(imod, 10)
-        .run(Duration.ofMillis(1000));
-  }
-
-  /**
    * Null page key is passed in input data which should fail filter logic
    */
   @Test(expected = SamzaException.class)
@@ -154,7 +132,6 @@ public class StreamApplicationIntegrationTest {
     TestRunner.of(pageViewFilter)
         .addInputStream(imid, pageviews)
         .addOutputStream(imod, 10)
-        .addOverrideConfig("job.default.system", "test")
         .run(Duration.ofMillis(1000));
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index bc5cba7..55021d3 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -102,7 +102,7 @@ public class StreamTaskIntegrationTest {
         .of(MyStreamTestTask.class)
         .addInputStream(imid, inputList)
         .addOutputStream(imod, 1)
-        .addOverrideConfig("job.container.thread.pool.size", "4")
+        .addConfig("job.container.thread.pool.size", "4")
         .run(Duration.ofSeconds(1));
 
     StreamAssert.containsInOrder(outputList, imod, Duration.ofMillis(1000));
@@ -149,7 +149,7 @@ public class StreamTaskIntegrationTest {
         .of(MyStreamTestTask.class)
         .addInputStream(imid, inputPartitionData)
         .addOutputStream(imod, 5)
-        .addOverrideConfig("job.container.thread.pool.size", "4")
+        .addConfig("job.container.thread.pool.size", "4")
         .run(Duration.ofSeconds(2));
 
     StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));

http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index 3c22818..814ad92 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -30,10 +30,8 @@ import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
-import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.serializers.IntegerSerde;
@@ -64,7 +62,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
   @Test
   public void testJoinWithSideInputsTable() {
     runTest(
-        "side-input-join",
+        "test",
         new PageViewProfileJoin(),
         Arrays.asList(TestTableData.generatePageViews(10)),
         Arrays.asList(TestTableData.generateProfiles(10)));
@@ -73,7 +71,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
   @Test
   public void testJoinWithDurableSideInputTable() {
     runTest(
-        "durable-side-input",
+        "test",
         new DurablePageViewProfileJoin(),
         Arrays.asList(TestTableData.generatePageViews(5)),
         Arrays.asList(TestTableData.generateProfiles(5)));
@@ -85,7 +83,6 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName);
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName);
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName);
-    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName);
 
     InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);
 
@@ -103,8 +100,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
         .addInputStream(pageViewStreamDesc, pageViews)
         .addInputStream(profileStreamDesc, profiles)
         .addOutputStream(outputStreamDesc, 1)
-        .addConfigs(new MapConfig(configs))
-        .addOverrideConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString())
+        .addConfig(new MapConfig(configs))
         .run(Duration.ofMillis(100000));
 
     try {
@@ -135,7 +131,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
     public void describe(StreamApplicationDescriptor appDesc) {
       Table<KV<Integer, TestTableData.Profile>> table = appDesc.getTable(getTableDescriptor());
       KafkaSystemDescriptor sd =
-          new KafkaSystemDescriptor(appDesc.getConfig().get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM)));
+          new KafkaSystemDescriptor("test");
       appDesc.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
           .partitionBy(TestTableData.PageView::getMemberId, v -> v, "partition-page-view")
           .join(table, new PageViewToProfileJoinFunction())
@@ -148,7 +144,6 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
           .withSideInputsProcessor((msg, store) -> {
               Profile profile = (Profile) msg.getMessage();
               int key = profile.getMemberId();
-
               return ImmutableList.of(new Entry<>(key, profile));
             });
     }
@@ -162,7 +157,6 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
           .withSideInputsProcessor((msg, store) -> {
               TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
               int key = profile.getMemberId();
-
               return ImmutableList.of(new Entry<>(key, profile));
             });
     }