You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/19 00:13:44 UTC

samza git commit: Do not try to generate configurations based on descriptors for LegacyTaskApplication in JobPlanner

Repository: samza
Updated Branches:
  refs/heads/master c79cc1808 -> 469050240


Do not try to generate configurations based on descriptors for LegacyTaskApplication in JobPlanner

Author: Prateek Maheshwari <pm...@apache.org>

Reviewers: Ray Mathuru <rm...@linkedin.com>, Jagadish Venkatraman <vj...@linkedin.com>, Sanil Jain <sn...@linkedin.com>

Closes #743 from prateekm/app-mode-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/46905024
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/46905024
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/46905024

Branch: refs/heads/master
Commit: 469050240aed010e774b4cd731682ebb457274c8
Parents: c79cc18
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Thu Oct 18 17:13:41 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Thu Oct 18 17:13:41 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/execution/JobPlanner.java    | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/46905024/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index 7ea7367..b569437 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -70,17 +70,23 @@ public abstract class JobPlanner {
 
   /* package private */
   ExecutionPlan getExecutionPlan(String runId) {
-    Map<String, String> generatedConfig = getGeneratedConfig(runId);
+    Map<String, String> allowedUserConfig = new HashMap<>(userConfig);
+    Map<String, String> generatedConfig = new HashMap<>();
 
-    // merge user-provided configuration with generated configuration. generated configuration has lower priority.
     // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811
-    Map<String, String> allowedUserConfig = new HashMap<>(userConfig);
+    // Don't generate any configurations for LegacyTaskApplications
     if (!LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
-      LOG.warn("SamzaApplications should not specify task.inputs in configuration. " +
-          "Ignoring configured value of " + userConfig.get(TaskConfig.INPUT_STREAMS()));
-      allowedUserConfig.remove(TaskConfig.INPUT_STREAMS()); // must be set using descriptors or operators
+      if (userConfig.containsKey(TaskConfig.INPUT_STREAMS())) {
+        LOG.warn("SamzaApplications should not specify task.inputs in configuration. " +
+            "Specify them using InputDescriptors instead. Ignoring configured task.inputs value of " +
+            userConfig.get(TaskConfig.INPUT_STREAMS()));
+        allowedUserConfig.remove(TaskConfig.INPUT_STREAMS());
+      }
+
+      generatedConfig.putAll(getGeneratedConfig(runId));
     }
 
+    // merge user-provided configuration with generated configuration. generated configuration has lower priority.
     Config mergedConfig = JobNodeConfigurationGenerator.mergeConfig(allowedUserConfig, generatedConfig);
 
     // creating the StreamManager to get all input/output streams' metadata for planning