You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/06/04 20:09:00 UTC

[samza] branch master updated: Samza-2540: Legacy Task application are missing app.run.id for both batch & stream jobs (#1375)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new c4f2c5d  Samza-2540: Legacy Task application are missing app.run.id for both batch & stream jobs (#1375)
c4f2c5d is described below

commit c4f2c5d92ec8762fbc1e058bcc1df314972fb53c
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Thu Jun 4 13:08:52 2020 -0700

    Samza-2540: Legacy Task application are missing app.run.id for both batch & stream jobs (#1375)
---
 .../org/apache/samza/execution/JobPlanner.java     | 14 +++++++----
 .../org/apache/samza/execution/TestJobPlanner.java | 27 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 5 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index 72c2972..416a769 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -83,13 +83,20 @@ public abstract class JobPlanner {
       if (StringUtils.isBlank(userConfig.get(TaskConfig.INPUT_STREAMS))) {
         allowedUserConfig.remove(TaskConfig.INPUT_STREAMS);
       }
-      generatedConfig.putAll(getGeneratedConfig(runId));
+      generatedConfig.putAll(getGeneratedConfig());
     }
 
     if (ApplicationConfig.ApplicationMode.BATCH.name().equals(generatedConfig.get(ApplicationConfig.APP_MODE))) {
       allowedUserConfig.remove(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED);
     }
 
+    // APP_RUN_ID should be generated for both LegacyTaskApplications & descriptor based applications
+    // This config is used in BATCH mode to create new intermediate streams on runs and in stream mode use by
+    // Container Placements to identify a deployment of Samza
+    if (StringUtils.isNoneEmpty(runId)) {
+      generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId);
+    }
+
     // merge user-provided configuration with generated configuration. generated configuration has lower priority.
     Config mergedConfig = JobNodeConfigurationGenerator.mergeConfig(allowedUserConfig, generatedConfig);
 
@@ -125,11 +132,8 @@ public abstract class JobPlanner {
     }
   }
 
-  private Map<String, String> getGeneratedConfig(String runId) {
+  private Map<String, String> getGeneratedConfig() {
     Map<String, String> generatedConfig = new HashMap<>();
-    if (StringUtils.isNoneEmpty(runId)) {
-      generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId);
-    }
 
     Map<String, String> systemStreamConfigs = generateSystemStreamConfigs(appDesc);
     generatedConfig.putAll(systemStreamConfigs);
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java
index 49bfd7f..68e4919 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobPlanner.java
@@ -20,10 +20,15 @@
 package org.apache.samza.execution;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import org.apache.samza.application.LegacyTaskApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestJobPlanner {
 
@@ -59,4 +64,26 @@ public class TestJobPlanner {
     Assert.assertEquals(generatedConfig.get("job.id"), "should-exist-id");
   }
 
+  @Test
+  public void testRunIdisConfiguredForAllTypesOfApps() {
+    Map<String, String> testConfig = new HashMap<>();
+    testConfig.put("app.id", "should-exist-id");
+    testConfig.put("app.name", "should-exist-name");
+
+    ApplicationDescriptorImpl applicationDescriptor = Mockito.mock(ApplicationDescriptorImpl.class);
+
+    Mockito.when(applicationDescriptor.getConfig()).thenReturn(new MapConfig(testConfig));
+    Mockito.when(applicationDescriptor.getAppClass()).thenReturn(LegacyTaskApplication.class);
+
+    JobPlanner jobPlanner = new JobPlanner(applicationDescriptor) {
+      @Override
+      public List<JobConfig> prepareJobs() {
+        return null;
+      }
+    };
+
+    ExecutionPlan plan = jobPlanner.getExecutionPlan("custom-run-id");
+    Assert.assertNotNull(plan.getApplicationConfig().getRunId(), "custom-run-id");
+  }
+
 }