You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/03/20 17:39:40 UTC
[samza] branch master updated: SAMZA-2135: Provide a way inject
ExternalContext to TestRunner
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 55ce288 SAMZA-2135: Provide a way inject ExternalContext to TestRunner
55ce288 is described below
commit 55ce28850090dbb8fe410b9953eed35be062dcc7
Author: Sanil15 <sa...@gmail.com>
AuthorDate: Wed Mar 20 10:39:35 2019 -0700
SAMZA-2135: Provide a way inject ExternalContext to TestRunner
- Added a public API to pass ExternalContext to TestRunner
- Refactored existing tests to test it
Author: Sanil15 <sa...@gmail.com>
Reviewers: Cameron Lee <ca...@linkedin.com>
Closes #962 from Sanil15/SAMZA-2135
---
.../apache/samza/test/framework/TestRunner.java | 27 ++++++++++++----------
.../samza/test/framework/MyStreamTestTask.java | 14 +++++++++--
.../test/framework/StreamTaskIntegrationTest.java | 21 +++++++++++++++--
3 files changed, 46 insertions(+), 16 deletions(-)
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index b2c24a2..c2fbfa7 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -27,7 +27,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang.RandomStringUtils;
@@ -97,6 +96,7 @@ public class TestRunner {
private Map<String, String> configs;
private SamzaApplication app;
+ private ExternalContext externalContext;
/*
* inMemoryScope is a unique global key per TestRunner, this key when configured with {@link InMemorySystemDescriptor}
* provides an isolated state to run with in memory system
@@ -182,7 +182,7 @@ public class TestRunner {
}
/**
- * Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it.
+ * Adds a config to Samza application. This config takes precedence over default configs and descriptor generated configs
* @param config configs for the application
* @return this {@link TestRunner}
*/
@@ -193,6 +193,18 @@ public class TestRunner {
}
/**
+ * Passes the user provided external context to {@link LocalApplicationRunner}
+ *
+ * @param externalContext external context provided by user
+ * @return this {@link TestRunner}
+ */
+ public TestRunner addExternalContext(ExternalContext externalContext) {
+ Preconditions.checkNotNull(externalContext);
+ this.externalContext = externalContext;
+ return this;
+ }
+
+ /**
* Adds the provided input stream with mock data to the test application.
*
* @param descriptor describes the stream that is supposed to be input to Samza application
@@ -272,7 +284,7 @@ public class TestRunner {
deleteStoreDirectories();
Config config = new MapConfig(JobPlanner.generateSingleJobConfig(configs));
final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
- runner.run(buildExternalContext(config).orElse(null));
+ runner.run(externalContext);
if (!runner.waitForFinish(timeout)) {
throw new SamzaException("Timed out waiting for application to finish");
}
@@ -417,13 +429,4 @@ public class TestRunner {
this.configs.put(keySerdeConfigKey, null);
this.configs.put(msgSerdeConfigKey, null);
}
-
- private static Optional<ExternalContext> buildExternalContext(Config config) {
- /*
- * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass
- * a non-empty ExternalContext. Only config should be used to build the external context. In the future, components
- * like the application descriptor may not be available.
- */
- return Optional.empty();
- }
}
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java b/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java
index a07fe74..a2cbdc5 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java
@@ -19,20 +19,30 @@
package org.apache.samza.test.framework;
+import org.apache.samza.context.Context;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.InitableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskCoordinator;
-public class MyStreamTestTask implements StreamTask {
+public class MyStreamTestTask implements StreamTask, InitableTask {
+ private int multiplier;
+
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
throws Exception {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "output"),
- envelope.getKey(), envelope.getKey(), obj * 10));
+ envelope.getKey(), envelope.getKey(), obj * multiplier));
+ }
+
+ @Override
+ public void init(Context context) throws Exception {
+ TestContext testContext = (TestContext) context.getExternalContext();
+ multiplier = testContext.getMultiplier();
}
}
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index ab650f2..1b57352 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -30,6 +30,7 @@ import org.apache.samza.SamzaException;
import org.apache.samza.application.TaskApplication;
import org.apache.samza.context.Context;
import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.context.ExternalContext;
import org.apache.samza.operators.KV;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
@@ -60,7 +61,6 @@ import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
import static org.apache.samza.test.table.TestTableData.PageView;
import static org.apache.samza.test.table.TestTableData.Profile;
-
public class StreamTaskIntegrationTest {
@Test
@@ -107,6 +107,7 @@ public class StreamTaskIntegrationTest {
.of(MyStreamTestTask.class)
.addInputStream(imid, inputList)
.addOutputStream(imod, 1)
+ .addExternalContext(new TestContext(10))
.run(Duration.ofSeconds(1));
Assert.assertThat(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(0),
@@ -133,6 +134,7 @@ public class StreamTaskIntegrationTest {
.of(MyStreamTestTask.class)
.addInputStream(imid, inputList)
.addOutputStream(imod, 1)
+ .addExternalContext(new TestContext(10))
.run(Duration.ofSeconds(1));
}
@@ -154,6 +156,7 @@ public class StreamTaskIntegrationTest {
.addInputStream(imid, inputList)
.addOutputStream(imod, 1)
.addConfig("job.container.thread.pool.size", "4")
+ .addExternalContext(new TestContext(10))
.run(Duration.ofSeconds(1));
StreamAssert.containsInOrder(outputList, imod, Duration.ofMillis(1000));
@@ -177,6 +180,7 @@ public class StreamTaskIntegrationTest {
.of(MyStreamTestTask.class)
.addInputStream(imid, inputPartitionData)
.addOutputStream(imod, 5)
+ .addExternalContext(new TestContext(10))
.run(Duration.ofSeconds(2));
StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
@@ -201,6 +205,7 @@ public class StreamTaskIntegrationTest {
.addInputStream(imid, inputPartitionData)
.addOutputStream(imod, 5)
.addConfig("job.container.thread.pool.size", "4")
+ .addExternalContext(new TestContext(10))
.run(Duration.ofSeconds(2));
StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
@@ -263,4 +268,16 @@ public class StreamTaskIntegrationTest {
}
}
-}
\ No newline at end of file
+}
+
+class TestContext implements ExternalContext {
+ final private int multiplier;
+
+ public TestContext(int multiplier) {
+ this.multiplier = multiplier;
+ }
+
+ public int getMultiplier() {
+ return this.multiplier;
+ }
+}