You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/06/04 20:09:00 UTC
[samza] branch master updated: Samza-2540: Legacy Task application
are missing app.run.id for both batch & stream jobs (#1375)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new c4f2c5d Samza-2540: Legacy Task application are missing app.run.id for both batch & stream jobs (#1375)
c4f2c5d is described below
commit c4f2c5d92ec8762fbc1e058bcc1df314972fb53c
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Thu Jun 4 13:08:52 2020 -0700
Samza-2540: Legacy Task application are missing app.run.id for both batch & stream jobs (#1375)
---
.../org/apache/samza/execution/JobPlanner.java | 14 +++++++----
.../org/apache/samza/execution/TestJobPlanner.java | 27 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 5 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index 72c2972..416a769 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -83,13 +83,20 @@ public abstract class JobPlanner {
if (StringUtils.isBlank(userConfig.get(TaskConfig.INPUT_STREAMS))) {
allowedUserConfig.remove(TaskConfig.INPUT_STREAMS);
}
- generatedConfig.putAll(getGeneratedConfig(runId));
+ generatedConfig.putAll(getGeneratedConfig());
}
if (ApplicationConfig.ApplicationMode.BATCH.name().equals(generatedConfig.get(ApplicationConfig.APP_MODE))) {
allowedUserConfig.remove(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED);
}
+ // APP_RUN_ID should be generated for both LegacyTaskApplications & descriptor based applications
+ // This config is used in BATCH mode to create new intermediate streams on runs and in stream mode use by
+ // Container Placements to identify a deployment of Samza
+ if (StringUtils.isNoneEmpty(runId)) {
+ generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId);
+ }
+
// merge user-provided configuration with generated configuration. generated configuration has lower priority.
Config mergedConfig = JobNodeConfigurationGenerator.mergeConfig(allowedUserConfig, generatedConfig);
@@ -125,11 +132,8 @@ public abstract class JobPlanner {
}
}
- private Map<String, String> getGeneratedConfig(String runId) {
+ private Map<String, String> getGeneratedConfig() {
Map<String, String> generatedConfig = new HashMap<>();
- if (StringUtils.isNoneEmpty(runId)) {
- generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId);
- }
Map<String, String> systemStreamConfigs = generateSystemStreamConfigs(appDesc);
generatedConfig.putAll(systemStreamConfigs);
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java
index 49bfd7f..68e4919 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java
@@ -20,10 +20,15 @@
package org.apache.samza.execution;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.samza.application.LegacyTaskApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
public class TestJobPlanner {
@@ -59,4 +64,26 @@ public class TestJobPlanner {
Assert.assertEquals(generatedConfig.get("job.id"), "should-exist-id");
}
+ @Test
+ public void testRunIdisConfiguredForAllTypesOfApps() {
+ Map<String, String> testConfig = new HashMap<>();
+ testConfig.put("app.id", "should-exist-id");
+ testConfig.put("app.name", "should-exist-name");
+
+ ApplicationDescriptorImpl applicationDescriptor = Mockito.mock(ApplicationDescriptorImpl.class);
+
+ Mockito.when(applicationDescriptor.getConfig()).thenReturn(new MapConfig(testConfig));
+ Mockito.when(applicationDescriptor.getAppClass()).thenReturn(LegacyTaskApplication.class);
+
+ JobPlanner jobPlanner = new JobPlanner(applicationDescriptor) {
+ @Override
+ public List<JobConfig> prepareJobs() {
+ return null;
+ }
+ };
+
+ ExecutionPlan plan = jobPlanner.getExecutionPlan("custom-run-id");
+ Assert.assertNotNull(plan.getApplicationConfig().getRunId(), "custom-run-id");
+ }
+
}