You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/03/20 17:39:40 UTC

[samza] branch master updated: SAMZA-2135: Provide a way inject ExternalContext to TestRunner

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 55ce288  SAMZA-2135: Provide a way inject ExternalContext to TestRunner
55ce288 is described below

commit 55ce28850090dbb8fe410b9953eed35be062dcc7
Author: Sanil15 <sa...@gmail.com>
AuthorDate: Wed Mar 20 10:39:35 2019 -0700

    SAMZA-2135: Provide a way inject ExternalContext to TestRunner
    
    - Added a public API to pass ExternalContext to TestRunner
    - Refactored existing tests to test it
    
    Author: Sanil15 <sa...@gmail.com>
    
    Reviewers: Cameron Lee <ca...@linkedin.com>
    
    Closes #962 from Sanil15/SAMZA-2135
---
 .../apache/samza/test/framework/TestRunner.java    | 27 ++++++++++++----------
 .../samza/test/framework/MyStreamTestTask.java     | 14 +++++++++--
 .../test/framework/StreamTaskIntegrationTest.java  | 21 +++++++++++++++--
 3 files changed, 46 insertions(+), 16 deletions(-)

diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index b2c24a2..c2fbfa7 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -27,7 +27,6 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.commons.lang.RandomStringUtils;
@@ -97,6 +96,7 @@ public class TestRunner {
 
   private Map<String, String> configs;
   private SamzaApplication app;
+  private ExternalContext externalContext;
   /*
    * inMemoryScope is a unique global key per TestRunner, this key when configured with {@link InMemorySystemDescriptor}
    * provides an isolated state to run with in memory system
@@ -182,7 +182,7 @@ public class TestRunner {
   }
 
   /**
-   * Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it.
+   * Adds a config to Samza application. This config takes precedence over default configs and descriptor generated configs
    * @param config configs for the application
    * @return this {@link TestRunner}
    */
@@ -193,6 +193,18 @@ public class TestRunner {
   }
 
   /**
+   * Passes the user provided external context to {@link LocalApplicationRunner}
+   *
+   * @param externalContext external context provided by user
+   * @return this {@link TestRunner}
+   */
+  public TestRunner addExternalContext(ExternalContext externalContext) {
+    Preconditions.checkNotNull(externalContext);
+    this.externalContext = externalContext;
+    return this;
+  }
+
+  /**
    * Adds the provided input stream with mock data to the test application.
    *
    * @param descriptor describes the stream that is supposed to be input to Samza application
@@ -272,7 +284,7 @@ public class TestRunner {
     deleteStoreDirectories();
     Config config = new MapConfig(JobPlanner.generateSingleJobConfig(configs));
     final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
-    runner.run(buildExternalContext(config).orElse(null));
+    runner.run(externalContext);
     if (!runner.waitForFinish(timeout)) {
       throw new SamzaException("Timed out waiting for application to finish");
     }
@@ -417,13 +429,4 @@ public class TestRunner {
     this.configs.put(keySerdeConfigKey, null);
     this.configs.put(msgSerdeConfigKey, null);
   }
-
-  private static Optional<ExternalContext> buildExternalContext(Config config) {
-    /*
-     * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass
-     * a non-empty ExternalContext. Only config should be used to build the external context. In the future, components
-     * like the application descriptor may not be available.
-     */
-    return Optional.empty();
-  }
 }
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java b/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java
index a07fe74..a2cbdc5 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java
@@ -19,20 +19,30 @@
 
 package org.apache.samza.test.framework;
 
+import org.apache.samza.context.Context;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.TaskCoordinator;
 
 
-public class MyStreamTestTask implements StreamTask {
+public class MyStreamTestTask implements StreamTask, InitableTask {
+  private int multiplier;
+
   @Override
   public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
       throws Exception {
     Integer obj = (Integer) envelope.getMessage();
     collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "output"),
-        envelope.getKey(), envelope.getKey(), obj * 10));
+        envelope.getKey(), envelope.getKey(), obj * multiplier));
+  }
+
+  @Override
+  public void init(Context context) throws Exception {
+    TestContext testContext = (TestContext) context.getExternalContext();
+    multiplier = testContext.getMultiplier();
   }
 }
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index ab650f2..1b57352 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -30,6 +30,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.application.TaskApplication;
 import org.apache.samza.context.Context;
 import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.context.ExternalContext;
 import org.apache.samza.operators.KV;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
@@ -60,7 +61,6 @@ import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
 import static org.apache.samza.test.table.TestTableData.PageView;
 import static org.apache.samza.test.table.TestTableData.Profile;
 
-
 public class StreamTaskIntegrationTest {
 
   @Test
@@ -107,6 +107,7 @@ public class StreamTaskIntegrationTest {
         .of(MyStreamTestTask.class)
         .addInputStream(imid, inputList)
         .addOutputStream(imod, 1)
+        .addExternalContext(new TestContext(10))
         .run(Duration.ofSeconds(1));
 
     Assert.assertThat(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(0),
@@ -133,6 +134,7 @@ public class StreamTaskIntegrationTest {
         .of(MyStreamTestTask.class)
         .addInputStream(imid, inputList)
         .addOutputStream(imod, 1)
+        .addExternalContext(new TestContext(10))
         .run(Duration.ofSeconds(1));
   }
 
@@ -154,6 +156,7 @@ public class StreamTaskIntegrationTest {
         .addInputStream(imid, inputList)
         .addOutputStream(imod, 1)
         .addConfig("job.container.thread.pool.size", "4")
+        .addExternalContext(new TestContext(10))
         .run(Duration.ofSeconds(1));
 
     StreamAssert.containsInOrder(outputList, imod, Duration.ofMillis(1000));
@@ -177,6 +180,7 @@ public class StreamTaskIntegrationTest {
         .of(MyStreamTestTask.class)
         .addInputStream(imid, inputPartitionData)
         .addOutputStream(imod, 5)
+        .addExternalContext(new TestContext(10))
         .run(Duration.ofSeconds(2));
 
     StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
@@ -201,6 +205,7 @@ public class StreamTaskIntegrationTest {
         .addInputStream(imid, inputPartitionData)
         .addOutputStream(imod, 5)
         .addConfig("job.container.thread.pool.size", "4")
+        .addExternalContext(new TestContext(10))
         .run(Duration.ofSeconds(2));
 
     StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
@@ -263,4 +268,16 @@ public class StreamTaskIntegrationTest {
     }
   }
 
-}
\ No newline at end of file
+}
+
+class TestContext implements ExternalContext {
+  final private int multiplier;
+
+  public TestContext(int multiplier) {
+    this.multiplier = multiplier;
+  }
+
+  public int getMultiplier() {
+    return this.multiplier;
+  }
+}