You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2019/11/13 17:27:41 UTC
[samza] branch 1.3.0 updated: SAMZA-2377: Batch mode should be
computed based on generated configs (#1215)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/1.3.0 by this push:
new eeee4b5 SAMZA-2377: Batch mode should be computed based on generated configs (#1215)
eeee4b5 is described below
commit eeee4b5e0b629376c860e811ec46ec8dae4321c5
Author: xinyuiscool <xi...@gmail.com>
AuthorDate: Tue Nov 12 14:44:41 2019 -0800
SAMZA-2377: Batch mode should be computed based on generated configs (#1215)
---
.../org/apache/samza/execution/JobPlanner.java | 27 +++++++++++++++-------
1 file changed, 19 insertions(+), 8 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index f300804..f8e0684 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -31,6 +31,7 @@ import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
@@ -85,6 +86,10 @@ public abstract class JobPlanner {
generatedConfig.putAll(getGeneratedConfig(runId));
}
+ if (ApplicationConfig.ApplicationMode.BATCH.name().equals(generatedConfig.get(ApplicationConfig.APP_MODE))) {
+ allowedUserConfig.remove(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED);
+ }
+
// merge user-provided configuration with generated configuration. generated configuration has lower priority.
Config mergedConfig = JobNodeConfigurationGenerator.mergeConfig(allowedUserConfig, generatedConfig);
@@ -126,17 +131,23 @@ public abstract class JobPlanner {
generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId);
}
- StreamConfig streamConfig = new StreamConfig(userConfig);
+ Map<String, String> systemStreamConfigs = generateSystemStreamConfigs(appDesc);
+ generatedConfig.putAll(systemStreamConfigs);
+
+ StreamConfig streamConfig = new StreamConfig(new MapConfig(generatedConfig));
Set<String> inputStreamIds = new HashSet<>(appDesc.getInputStreamIds());
inputStreamIds.removeAll(appDesc.getOutputStreamIds()); // exclude intermediate streams
- ApplicationConfig.ApplicationMode mode =
- inputStreamIds.stream().allMatch(streamConfig::getIsBounded)
- ? ApplicationConfig.ApplicationMode.BATCH
- : ApplicationConfig.ApplicationMode.STREAM;
- generatedConfig.put(ApplicationConfig.APP_MODE, mode.name());
- Map<String, String> systemStreamConfigs = generateSystemStreamConfigs(appDesc);
- generatedConfig.putAll(systemStreamConfigs);
+ final ApplicationConfig.ApplicationMode mode;
+ if (inputStreamIds.isEmpty()) {
+ mode = ApplicationConfig.ApplicationMode.STREAM; // use stream by default
+ } else {
+ mode = inputStreamIds.stream().allMatch(streamConfig::getIsBounded)
+ ? ApplicationConfig.ApplicationMode.BATCH
+ : ApplicationConfig.ApplicationMode.STREAM;
+ }
+
+ generatedConfig.put(ApplicationConfig.APP_MODE, mode.name());
// adding app.class in the configuration, unless it is LegacyTaskApplication
if (!LegacyTaskApplication.class.getName().equals(appDesc.getAppClass().getName())) {