You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2019/11/13 17:27:41 UTC

[samza] branch 1.3.0 updated: SAMZA-2377: Batch mode should be computed based on generated configs (#1215)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/1.3.0 by this push:
     new eeee4b5  SAMZA-2377: Batch mode should be computed based on generated configs (#1215)
eeee4b5 is described below

commit eeee4b5e0b629376c860e811ec46ec8dae4321c5
Author: xinyuiscool <xi...@gmail.com>
AuthorDate: Tue Nov 12 14:44:41 2019 -0800

    SAMZA-2377: Batch mode should be computed based on generated configs (#1215)
---
 .../org/apache/samza/execution/JobPlanner.java     | 27 +++++++++++++++-------
 1 file changed, 19 insertions(+), 8 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index f300804..f8e0684 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -31,6 +31,7 @@ import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -85,6 +86,10 @@ public abstract class JobPlanner {
       generatedConfig.putAll(getGeneratedConfig(runId));
     }
 
+    if (ApplicationConfig.ApplicationMode.BATCH.name().equals(generatedConfig.get(ApplicationConfig.APP_MODE))) {
+      allowedUserConfig.remove(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED);
+    }
+
     // merge user-provided configuration with generated configuration. generated configuration has lower priority.
     Config mergedConfig = JobNodeConfigurationGenerator.mergeConfig(allowedUserConfig, generatedConfig);
 
@@ -126,17 +131,23 @@ public abstract class JobPlanner {
       generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId);
     }
 
-    StreamConfig streamConfig = new StreamConfig(userConfig);
+    Map<String, String> systemStreamConfigs = generateSystemStreamConfigs(appDesc);
+    generatedConfig.putAll(systemStreamConfigs);
+
+    StreamConfig streamConfig = new StreamConfig(new MapConfig(generatedConfig));
     Set<String> inputStreamIds = new HashSet<>(appDesc.getInputStreamIds());
     inputStreamIds.removeAll(appDesc.getOutputStreamIds()); // exclude intermediate streams
-    ApplicationConfig.ApplicationMode mode =
-        inputStreamIds.stream().allMatch(streamConfig::getIsBounded)
-            ? ApplicationConfig.ApplicationMode.BATCH
-            : ApplicationConfig.ApplicationMode.STREAM;
-    generatedConfig.put(ApplicationConfig.APP_MODE, mode.name());
 
-    Map<String, String> systemStreamConfigs = generateSystemStreamConfigs(appDesc);
-    generatedConfig.putAll(systemStreamConfigs);
+    final ApplicationConfig.ApplicationMode mode;
+    if (inputStreamIds.isEmpty()) {
+      mode = ApplicationConfig.ApplicationMode.STREAM; // use stream by default
+    } else {
+      mode = inputStreamIds.stream().allMatch(streamConfig::getIsBounded)
+          ? ApplicationConfig.ApplicationMode.BATCH
+          : ApplicationConfig.ApplicationMode.STREAM;
+    }
+
+    generatedConfig.put(ApplicationConfig.APP_MODE, mode.name());
 
     // adding app.class in the configuration, unless it is LegacyTaskApplication
     if (!LegacyTaskApplication.class.getName().equals(appDesc.getAppClass().getName())) {