You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/01 22:11:58 UTC
samza git commit: SAMZA-1852: Adding default job system in TestRunner,
disabling host affinity to support TableDescriptors and refining addConfig
method for TestRunner API
Repository: samza
Updated Branches:
refs/heads/master 334d24e68 -> 094ff1641
SAMZA-1852: Adding default job system in TestRunner, disabling host affinity to support TableDescriptors and refining addConfig method for TestRunner API
- The default system is a required config for intermediate streams, and since no user will write assertions against them, defaulting it makes it easier for the user to write test
- To support stateful jobs using Table API descriptors we need to disable host affinity, which is enabled by table API by default
- vjagadish pointed out addConfigs vs addOverrideConfig to be a confusing user-facing API. We now support only addConfig with different signatures, this configs takes precedence over any descriptor or TestRunner generated configs
Author: Sanil15 <sa...@gmail.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>, Yi Pan <ni...@gmail.com>
Closes #651 from Sanil15/SAMZA-1852
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/094ff164
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/094ff164
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/094ff164
Branch: refs/heads/master
Commit: 094ff1641c330e87f081d650b9066a0842f77963
Parents: 334d24e
Author: Sanil15 <sa...@gmail.com>
Authored: Mon Oct 1 15:11:54 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Mon Oct 1 15:11:54 2018 -0700
----------------------------------------------------------------------
.../apache/samza/test/framework/TestRunner.java | 64 ++++++++++----------
.../AsyncStreamTaskIntegrationTest.java | 2 +-
.../StreamApplicationIntegrationTest.java | 23 -------
.../framework/StreamTaskIntegrationTest.java | 4 +-
.../table/TestLocalTableWithSideInputs.java | 14 ++---
5 files changed, 40 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index fe8581b..a1103dd 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -33,7 +33,9 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
+import org.apache.samza.config.InMemorySystemConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
@@ -74,11 +76,14 @@ import org.junit.Assert;
* <li>"task.name.grouper.factory" = {@link SingleContainerGrouperFactory}</li>
* <li>"job.name" = "test-samza"</li>
* <li>"processor.id" = "1"</li>
+ * <li>"job.default.system" = {@code JOB_DEFAULT_SYSTEM}</li>
+ * <li>"job.host-affinity.enabled" = "false"</li>
* </ol>
*
*/
public class TestRunner {
- public static final String JOB_NAME = "samza-test";
+ private static final String JOB_DEFAULT_SYSTEM = "default-samza-system";
+ private static final String JOB_NAME = "samza-test";
private Map<String, String> configs;
private SamzaApplication app;
@@ -96,6 +101,11 @@ public class TestRunner {
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+ addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM);
+ // This is important because Table Api enables host affinity by default for RocksDb
+ addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString());
+ addConfig(InMemorySystemConfig.INMEMORY_SCOPE, inMemoryScope);
+ addConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).withInMemoryScope(inMemoryScope).toConfig());
}
/**
@@ -142,13 +152,17 @@ public class TestRunner {
}
/**
- * Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it.
- * @param config configs for the application
+ * Adds a config to Samza application. This config takes precedence over default configs and descriptor generated configs
+ *
+ * @param key of the config
+ * @param value of the config
* @return this {@link TestRunner}
*/
- public TestRunner addConfigs(Map<String, String> config) {
- Preconditions.checkNotNull(config);
- config.forEach(this.configs::putIfAbsent);
+ public TestRunner addConfig(String key, String value) {
+ Preconditions.checkNotNull(key);
+ Preconditions.checkNotNull(value);
+ String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
+ configs.put(String.format("%s%s", configPrefix, key), value);
return this;
}
@@ -157,24 +171,10 @@ public class TestRunner {
* @param config configs for the application
* @return this {@link TestRunner}
*/
- public TestRunner addConfigs(Map<String, String> config, String configPrefix) {
+ public TestRunner addConfig(Map<String, String> config) {
Preconditions.checkNotNull(config);
- config.forEach((key, value) -> this.configs.putIfAbsent(String.format("%s%s", configPrefix, key), value));
- return this;
- }
-
- /**
- * Adds a config to {@code configs} if its not already present. Overrides a config value for which key is already
- * exisiting in {@code configs}
- * @param key key of the config
- * @param value value of the config
- * @return this {@link TestRunner}
- */
- public TestRunner addOverrideConfig(String key, String value) {
- Preconditions.checkNotNull(key);
- Preconditions.checkNotNull(value);
- String configKeyPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
- configs.put(String.format("%s%s", configKeyPrefix, key), value);
+ String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
+ config.forEach((key, value) -> this.configs.put(String.format("%s%s", configPrefix, key), value));
return this;
}
@@ -202,7 +202,8 @@ public class TestRunner {
}
/**
- * Adds the provided input stream with mock data to the test application.
+ * Adds the provided input stream with mock data to the test application. Default configs and user added configs have
+ * a higher precedence over system and stream descriptor generated configs.
* @param descriptor describes the stream that is supposed to be input to Samza application
* @param messages map whose key is partitionId and value is messages in the partition
* @param <StreamMessageType> message with null key or a KV {@link org.apache.samza.operators.KV}.
@@ -220,12 +221,13 @@ public class TestRunner {
}
/**
- * Adds the provided output stream to the test application.
+ * Adds the provided output stream to the test application. Default configs and user added configs have a higher
+ * precedence over system and stream descriptor generated configs.
* @param streamDescriptor describes the stream that is supposed to be output for the Samza application
* @param partitionCount partition count of output stream
* @return this {@link TestRunner}
*/
- public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int partitionCount) {
+ public TestRunner addOutputStream(InMemoryOutputDescriptor<?> streamDescriptor, int partitionCount) {
Preconditions.checkNotNull(streamDescriptor);
Preconditions.checkState(partitionCount >= 1);
InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) streamDescriptor.getSystemDescriptor();
@@ -238,8 +240,8 @@ public class TestRunner {
factory
.getAdmin(streamDescriptor.getSystemName(), config)
.createStream(spec);
- addConfigs(streamDescriptor.toConfig());
- addConfigs(streamDescriptor.getSystemDescriptor().toConfig());
+ addConfig(streamDescriptor.toConfig());
+ addConfig(streamDescriptor.getSystemDescriptor().toConfig());
return this;
}
@@ -340,7 +342,7 @@ public class TestRunner {
* messages in the partition
* @param descriptor describes a stream to initialize with the in memory system
*/
- private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor descriptor,
+ private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor<?> descriptor,
Map<Integer, Iterable<StreamMessageType>> partitonData) {
String systemName = descriptor.getSystemName();
String streamName = (String) descriptor.getPhysicalName().orElse(descriptor.getStreamId());
@@ -352,8 +354,8 @@ public class TestRunner {
}
InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) descriptor.getSystemDescriptor();
imsd.withInMemoryScope(this.inMemoryScope);
- addConfigs(descriptor.toConfig());
- addConfigs(descriptor.getSystemDescriptor().toConfig(), String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()));
+ addConfig(descriptor.toConfig());
+ addConfig(descriptor.getSystemDescriptor().toConfig());
StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitonData.size());
SystemFactory factory = new InMemorySystemFactory();
Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig());
http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
index ef9508a..f1757ab 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
@@ -123,7 +123,7 @@ public class AsyncStreamTaskIntegrationTest {
.of(MyAsyncStreamTask.class)
.addInputStream(imid, inputPartitionData)
.addOutputStream(imod, 5)
- .addOverrideConfig("task.max.concurrency", "4")
+ .addConfig("task.max.concurrency", "4")
.run(Duration.ofSeconds(2));
StreamAssert.containsInAnyOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 6dd9159..4ebe95f 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -96,7 +96,6 @@ public class StreamApplicationIntegrationTest {
.of(pageViewRepartition)
.addInputStream(imid, pageviews)
.addOutputStream(imod, 10)
- .addOverrideConfig("job.default.system", "test")
.run(Duration.ofMillis(1500));
Assert.assertEquals(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1);
@@ -109,27 +108,6 @@ public class StreamApplicationIntegrationTest {
}
/**
- * Job should fail since it is missing config "job.default.system" for partitionBy Operator
- */
- @Test(expected = SamzaException.class)
- public void testSamzaJobStartMissingConfigFailureForStreamApplication() {
-
- InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
-
- InMemoryInputDescriptor<PageView> imid = isd
- .getInputDescriptor("PageView", new NoOpSerde<PageView>());
-
- InMemoryOutputDescriptor<PageView> imod = isd
- .getOutputDescriptor("Output", new NoOpSerde<PageView>());
-
- TestRunner
- .of(pageViewRepartition)
- .addInputStream(imid, new ArrayList<>())
- .addOutputStream(imod, 10)
- .run(Duration.ofMillis(1000));
- }
-
- /**
* Null page key is passed in input data which should fail filter logic
*/
@Test(expected = SamzaException.class)
@@ -154,7 +132,6 @@ public class StreamApplicationIntegrationTest {
TestRunner.of(pageViewFilter)
.addInputStream(imid, pageviews)
.addOutputStream(imod, 10)
- .addOverrideConfig("job.default.system", "test")
.run(Duration.ofMillis(1000));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index bc5cba7..55021d3 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -102,7 +102,7 @@ public class StreamTaskIntegrationTest {
.of(MyStreamTestTask.class)
.addInputStream(imid, inputList)
.addOutputStream(imod, 1)
- .addOverrideConfig("job.container.thread.pool.size", "4")
+ .addConfig("job.container.thread.pool.size", "4")
.run(Duration.ofSeconds(1));
StreamAssert.containsInOrder(outputList, imod, Duration.ofMillis(1000));
@@ -149,7 +149,7 @@ public class StreamTaskIntegrationTest {
.of(MyStreamTestTask.class)
.addInputStream(imid, inputPartitionData)
.addOutputStream(imod, 5)
- .addOverrideConfig("job.container.thread.pool.size", "4")
+ .addConfig("job.container.thread.pool.size", "4")
.run(Duration.ofSeconds(2));
StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
http://git-wip-us.apache.org/repos/asf/samza/blob/094ff164/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index 3c22818..814ad92 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -30,10 +30,8 @@ import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
-import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.serializers.IntegerSerde;
@@ -64,7 +62,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
@Test
public void testJoinWithSideInputsTable() {
runTest(
- "side-input-join",
+ "test",
new PageViewProfileJoin(),
Arrays.asList(TestTableData.generatePageViews(10)),
Arrays.asList(TestTableData.generateProfiles(10)));
@@ -73,7 +71,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
@Test
public void testJoinWithDurableSideInputTable() {
runTest(
- "durable-side-input",
+ "test",
new DurablePageViewProfileJoin(),
Arrays.asList(TestTableData.generatePageViews(5)),
Arrays.asList(TestTableData.generateProfiles(5)));
@@ -85,7 +83,6 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName);
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName);
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName);
- configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName);
InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);
@@ -103,8 +100,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
.addInputStream(pageViewStreamDesc, pageViews)
.addInputStream(profileStreamDesc, profiles)
.addOutputStream(outputStreamDesc, 1)
- .addConfigs(new MapConfig(configs))
- .addOverrideConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString())
+ .addConfig(new MapConfig(configs))
.run(Duration.ofMillis(100000));
try {
@@ -135,7 +131,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
public void describe(StreamApplicationDescriptor appDesc) {
Table<KV<Integer, TestTableData.Profile>> table = appDesc.getTable(getTableDescriptor());
KafkaSystemDescriptor sd =
- new KafkaSystemDescriptor(appDesc.getConfig().get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM)));
+ new KafkaSystemDescriptor("test");
appDesc.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
.partitionBy(TestTableData.PageView::getMemberId, v -> v, "partition-page-view")
.join(table, new PageViewToProfileJoinFunction())
@@ -148,7 +144,6 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
.withSideInputsProcessor((msg, store) -> {
Profile profile = (Profile) msg.getMessage();
int key = profile.getMemberId();
-
return ImmutableList.of(new Entry<>(key, profile));
});
}
@@ -162,7 +157,6 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
.withSideInputsProcessor((msg, store) -> {
TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
int key = profile.getMemberId();
-
return ImmutableList.of(new Entry<>(key, profile));
});
}