You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2022/11/09 01:50:06 UTC
[samza] branch master updated: SAMZA-2741: [Pipeline Drain] Fix ApplicationUtil.isHighLevelApiJob to work for anonymous SamzaApplication (#1640)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 86d9b51cd SAMZA-2741: [Pipeline Drain] Fix ApplicationUtil.isHighLevelApiJob to work for anonymous SamzaApplication (#1640)
86d9b51cd is described below
commit 86d9b51cd76bd4f43b49fbcc4d2aa051a8e23ccf
Author: ajo thomas <aj...@linkedin.com>
AuthorDate: Tue Nov 8 17:50:00 2022 -0800
SAMZA-2741: [Pipeline Drain] Fix ApplicationUtil.isHighLevelApiJob to work for anonymous SamzaApplication (#1640)
---
.../samza/application/ApplicationApiType.java | 37 ++++++++++++++++++++++
.../apache/samza/application/ApplicationUtil.java | 15 +--------
.../org/apache/samza/config/ApplicationConfig.java | 8 +++--
.../org/apache/samza/container/RunLoopFactory.java | 6 ++++
.../org/apache/samza/execution/JobPlanner.java | 23 ++++++++++++++
.../samza/application/TestApplicationUtil.java | 15 ++++++++-
6 files changed, 87 insertions(+), 17 deletions(-)
diff --git a/samza-api/src/main/java/org/apache/samza/application/ApplicationApiType.java b/samza-api/src/main/java/org/apache/samza/application/ApplicationApiType.java
new file mode 100644
index 000000000..6e9609d5e
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/application/ApplicationApiType.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+/**
+ * Enum to denote the possible API types for a samza application.
+ * */
+public enum ApplicationApiType {
+ /**
+ * Defined using {@link StreamApplication}
+ * */
+ HIGH_LEVEL,
+ /**
+ * Defined using {@link TaskApplication}
+ * */
+ LOW_LEVEL,
+ /**
+ * Defined using defined using task.class config.
+ * */
+ LEGACY
+}
diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
index cf52a2568..07badc782 100644
--- a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
@@ -18,7 +18,6 @@
*/
package org.apache.samza.application;
-import com.google.common.base.Strings;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.ApplicationConfig;
@@ -68,22 +67,10 @@ public class ApplicationUtil {
/**
* Determines if the job is a Samza high-level job.
- * High-level job implements StreamApplication.
* @param config config
* */
public static boolean isHighLevelApiJob(Config config) {
final ApplicationConfig applicationConfig = new ApplicationConfig(config);
- final String appClass = applicationConfig.getAppClass();
- if (!Strings.isNullOrEmpty(appClass)) {
- try {
- return StreamApplication.class.isAssignableFrom(Class.forName(appClass));
- } catch (ClassNotFoundException e) {
- LOG.debug("Error while determining if the job is a high level API job", e);
- return false;
- }
- } else {
- LOG.warn("Config {} is empty or null. Cannot determine if the job is a high-level API job", ApplicationConfig.APP_CLASS);
- return false;
- }
+ return applicationConfig.getAppApiType() == ApplicationApiType.HIGH_LEVEL;
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
index 3dabd05c9..96781ad0c 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -18,10 +18,10 @@
*/
package org.apache.samza.config;
+import org.apache.samza.application.ApplicationApiType;
import org.apache.samza.clustermanager.DefaultApplicationMain;
import org.apache.samza.runtime.UUIDGenerator;
-
/**
* Accessors for configs associated with Application scope
*/
@@ -60,7 +60,7 @@ public class ApplicationConfig extends MapConfig {
public static final String APP_MAIN_CLASS = "app.main.class";
public static final String APP_MAIN_ARGS = "app.main.args";
public static final String APP_RUNNER_CLASS = "app.runner.class";
-
+ public static final String APP_API_TYPE = "app.api.type";
private static final String DEFAULT_APP_RUNNER = "org.apache.samza.runtime.RemoteApplicationRunner";
public ApplicationConfig(Config config) {
@@ -110,4 +110,8 @@ public class ApplicationConfig extends MapConfig {
public String getAppRunnerClass() {
return get(APP_RUNNER_CLASS, DEFAULT_APP_RUNNER);
}
+
+ public ApplicationApiType getAppApiType() {
+ return ApplicationApiType.valueOf(get(APP_API_TYPE, ApplicationApiType.HIGH_LEVEL.name()).toUpperCase());
+ }
}
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index 54c5bc5a9..07fa88a70 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -69,6 +69,12 @@ public class RunLoopFactory {
log.info("Got current run Id: {}.", runId);
+ if (isHighLevelApiJob) {
+ log.info("The application uses high-level API.");
+ } else {
+ log.info("The application doesn't use high-level API.");
+ }
+
log.info("Run loop in asynchronous mode.");
return new RunLoop(
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index 416a769dd..68423c639 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -18,21 +18,27 @@
*/
package org.apache.samza.execution;
+import com.google.common.base.Strings;
import java.io.File;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationApiType;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.TaskApplication;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.ShellCommandConfig;
@@ -97,6 +103,23 @@ public abstract class JobPlanner {
generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId);
}
+ // Assign app.api.type config
+ if (StreamApplication.class.isAssignableFrom(appDesc.getAppClass())) {
+ generatedConfig.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.HIGH_LEVEL.name());
+ } else if (TaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
+ generatedConfig.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.LOW_LEVEL.name());
+ } else if (LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
+ generatedConfig.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.LEGACY.name());
+ } else {
+ final TaskConfig taskConfig = new TaskConfig(userConfig);
+ final Optional<String> taskClassOption = taskConfig.getTaskClass();
+ if (!taskClassOption.isPresent() || Strings.isNullOrEmpty(taskClassOption.get())) {
+ // no task.class defined either. This is wrong.
+ throw new ConfigException("Legacy task applications must set a non-empty task.class in configuration.");
+ }
+ generatedConfig.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.LEGACY.name());
+ }
+
// merge user-provided configuration with generated configuration. generated configuration has lower priority.
Config mergedConfig = JobNodeConfigurationGenerator.mergeConfig(allowedUserConfig, generatedConfig);
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
index 1d0bd88a6..abcd81239 100644
--- a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
@@ -30,7 +30,7 @@ import org.apache.samza.config.TaskConfig;
import org.apache.samza.task.MockStreamTask;
import org.junit.Test;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
/**
@@ -86,6 +86,19 @@ public class TestApplicationUtil {
ApplicationUtil.fromConfig(new MapConfig(configMap));
}
+ @Test
+ public void testIsHighLevelJob() {
+ final Map<String, String> configMap = new HashMap<>();
+ configMap.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.HIGH_LEVEL.name());
+ assertTrue(ApplicationUtil.isHighLevelApiJob(new MapConfig(configMap)));
+
+ configMap.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.LOW_LEVEL.name());
+ assertFalse(ApplicationUtil.isHighLevelApiJob(new MapConfig(configMap)));
+
+ configMap.put(ApplicationConfig.APP_API_TYPE, ApplicationApiType.LEGACY.name());
+ assertFalse(ApplicationUtil.isHighLevelApiJob(new MapConfig(configMap)));
+ }
+
/**
* Test class of {@link TaskApplication} for unit tests
*/