You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2022/11/09 01:50:06 UTC

[samza] branch master updated: SAMZA-2741: [Pipeline Drain] Fix ApplicationUtil.isHighLevelApiJob to work for anonymous SamzaApplication (#1640)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 86d9b51cd SAMZA-2741: [Pipeline Drain] Fix ApplicationUtil.isHighLevelApiJob to work for anonymous SamzaApplication (#1640)
86d9b51cd is described below

commit 86d9b51cd76bd4f43b49fbcc4d2aa051a8e23ccf
Author: ajo thomas <aj...@linkedin.com>
AuthorDate: Tue Nov 8 17:50:00 2022 -0800

    SAMZA-2741: [Pipeline Drain] Fix ApplicationUtil.isHighLevelApiJob to work for anonymous SamzaApplication (#1640)
---
 .../samza/application/ApplicationApiType.java      | 37 ++++++++++++++++++++++
 .../apache/samza/application/ApplicationUtil.java  | 15 +--------
 .../org/apache/samza/config/ApplicationConfig.java |  8 +++--
 .../org/apache/samza/container/RunLoopFactory.java |  6 ++++
 .../org/apache/samza/execution/JobPlanner.java     | 23 ++++++++++++++
 .../samza/application/TestApplicationUtil.java     | 15 ++++++++-
 6 files changed, 87 insertions(+), 17 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/application/ApplicationApiType.java b/samza-api/src/main/java/org/apache/samza/application/ApplicationApiType.java
new file mode 100644
index 000000000..6e9609d5e
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/application/ApplicationApiType.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+/**
+ * Enum to denote the possible API types for a samza application.
+ * */
+public enum ApplicationApiType {
+  /**
+   * Defined using {@link StreamApplication}
+   * */
+  HIGH_LEVEL,
+  /**
+   * Defined using {@link TaskApplication}
+   * */
+  LOW_LEVEL,
+  /**
+   * Defined using defined using task.class config.
+   * */
+  LEGACY
+}
diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
index cf52a2568..07badc782 100644
--- a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.application;
 
-import com.google.common.base.Strings;
 import java.util.Optional;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ApplicationConfig;
@@ -68,22 +67,10 @@ public class ApplicationUtil {
 
   /**
    * Determines if the job is a Samza high-level job.
-   * High-level job implements StreamApplication.
    * @param config config
    * */
   public static boolean isHighLevelApiJob(Config config) {
     final ApplicationConfig applicationConfig = new ApplicationConfig(config);
-    final String appClass = applicationConfig.getAppClass();
-    if (!Strings.isNullOrEmpty(appClass)) {
-      try {
-        return StreamApplication.class.isAssignableFrom(Class.forName(appClass));
-      } catch (ClassNotFoundException e) {
-        LOG.debug("Error while determining if the job is a high level API job", e);
-        return false;
-      }
-    } else {
-      LOG.warn("Config {} is empty or null. Cannot determine if the job is a high-level API job", ApplicationConfig.APP_CLASS);
-      return false;
-    }
+    return applicationConfig.getAppApiType() == ApplicationApiType.HIGH_LEVEL;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
index 3dabd05c9..96781ad0c 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -18,10 +18,10 @@
  */
 package org.apache.samza.config;
 
+import org.apache.samza.application.ApplicationApiType;
 import org.apache.samza.clustermanager.DefaultApplicationMain;
 import org.apache.samza.runtime.UUIDGenerator;
 
-
 /**
  * Accessors for configs associated with Application scope
  */
@@ -60,7 +60,7 @@ public class ApplicationConfig extends MapConfig {
   public static final String APP_MAIN_CLASS = "app.main.class";
   public static final String APP_MAIN_ARGS = "app.main.args";
   public static final String APP_RUNNER_CLASS = "app.runner.class";
-
+  public static final String APP_API_TYPE = "app.api.type";
   private static final String DEFAULT_APP_RUNNER = "org.apache.samza.runtime.RemoteApplicationRunner";
 
   public ApplicationConfig(Config config) {
@@ -110,4 +110,8 @@ public class ApplicationConfig extends MapConfig {
   public String getAppRunnerClass() {
     return get(APP_RUNNER_CLASS, DEFAULT_APP_RUNNER);
   }
+
+  public ApplicationApiType getAppApiType() {
+    return ApplicationApiType.valueOf(get(APP_API_TYPE, ApplicationApiType.HIGH_LEVEL.name()).toUpperCase());
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index 54c5bc5a9..07fa88a70 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -69,6 +69,12 @@ public class RunLoopFactory {
 
     log.info("Got current run Id: {}.", runId);
 
+    if (isHighLevelApiJob) {
+      log.info("The application uses high-level API.");
+    } else {
+      log.info("The application doesn't use high-level API.");
+    }
+
     log.info("Run loop in asynchronous mode.");
 
     return new RunLoop(
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index 416a769dd..68423c639 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -18,21 +18,27 @@
  */
 package org.apache.samza.execution;
 
+import com.google.common.base.Strings;
 import java.io.File;
 import java.io.PrintWriter;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationApiType;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.TaskApplication;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
@@ -97,6 +103,23 @@ public abstract class JobPlanner {
       generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId);
     }
 
+    // Assign app.api.type config
+    if (StreamApplication.class.isAssignableFrom(appDesc.getAppClass())) {
+      generatedConfig.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.HIGH_LEVEL.name());
+    } else if (TaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
+      generatedConfig.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.LOW_LEVEL.name());
+    } else if (LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
+      generatedConfig.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.LEGACY.name());
+    } else {
+      final TaskConfig taskConfig = new TaskConfig(userConfig);
+      final Optional<String> taskClassOption = taskConfig.getTaskClass();
+      if (!taskClassOption.isPresent() || Strings.isNullOrEmpty(taskClassOption.get())) {
+        // no task.class defined either. This is wrong.
+        throw new ConfigException("Legacy task applications must set a non-empty task.class in configuration.");
+      }
+      generatedConfig.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.LEGACY.name());
+    }
+
     // merge user-provided configuration with generated configuration. generated configuration has lower priority.
     Config mergedConfig = JobNodeConfigurationGenerator.mergeConfig(allowedUserConfig, generatedConfig);
 
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
index 1d0bd88a6..abcd81239 100644
--- a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
@@ -30,7 +30,7 @@ import org.apache.samza.config.TaskConfig;
 import org.apache.samza.task.MockStreamTask;
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 
 /**
@@ -86,6 +86,19 @@ public class TestApplicationUtil {
     ApplicationUtil.fromConfig(new MapConfig(configMap));
   }
 
+  @Test
+  public void testIsHighLevelJob() {
+    final Map<String, String> configMap = new HashMap<>();
+    configMap.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.HIGH_LEVEL.name());
+    assertTrue(ApplicationUtil.isHighLevelApiJob(new MapConfig(configMap)));
+
+    configMap.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.LOW_LEVEL.name());
+    assertFalse(ApplicationUtil.isHighLevelApiJob(new MapConfig(configMap)));
+
+    configMap.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.LEGACY.name());
+    assertFalse(ApplicationUtil.isHighLevelApiJob(new MapConfig(configMap)));
+  }
+
   /**
    * Test class of {@link TaskApplication} for unit tests
    */