You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2020/03/18 02:20:59 UTC
[samza] branch master updated: SAMZA-2488: Add
JobCoordinatorLaunchUtil to handle common logic when launching job
coordiantor. (#1318)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new b7cab95 SAMZA-2488: Add JobCoordinatorLaunchUtil to handle common logic when launching job coordiantor. (#1318)
b7cab95 is described below
commit b7cab952317bd4320f12e18c2cc3e3cee77770da
Author: Ke Wu <ke...@icloud.com>
AuthorDate: Tue Mar 17 19:20:52 2020 -0700
SAMZA-2488: Add JobCoordinatorLaunchUtil to handle common logic when launching job coordiantor. (#1318)
---
.../clustermanager/ClusterBasedJobCoordinator.java | 59 +++-------------
.../clustermanager/JobCoordinatorLaunchUtil.java | 81 ++++++++++++++++++++++
.../TestClusterBasedJobCoordinator.java | 35 ----------
.../TestJobCoordinatorLaunchUtil.java | 79 +++++++++++++++++++++
4 files changed, 170 insertions(+), 84 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index b9d054b..e7647f4 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -35,9 +35,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.application.ApplicationUtil;
-import org.apache.samza.application.descriptors.ApplicationDescriptor;
-import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
-import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.classloader.IsolatingClassLoaderFactory;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
import org.apache.samza.config.ApplicationConfig;
@@ -58,7 +55,6 @@ import org.apache.samza.coordinator.StreamRegexMonitor;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
-import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.JobModelUtil;
@@ -571,8 +567,16 @@ public class ClusterBasedJobCoordinator {
throw new SamzaException(e);
}
} else {
- ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig);
- jc.run();
+ JobConfig jobConfig = new JobConfig(submissionConfig);
+
+ if (!jobConfig.getConfigLoaderFactory().isPresent()) {
+ throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader");
+ }
+
+ // load full job config with ConfigLoader
+ Config originalConfig = ConfigUtil.loadConfig(submissionConfig);
+
+ JobCoordinatorLaunchUtil.run(ApplicationUtil.fromConfig(originalConfig), originalConfig);
}
LOG.info("Finished running ClusterBasedJobCoordinator");
@@ -615,49 +619,6 @@ public class ClusterBasedJobCoordinator {
}
/**
- * Initialize {@link ClusterBasedJobCoordinator} with submission config, full job config will be fetched using
- * specified {@link org.apache.samza.config.ConfigLoaderFactory}
- *
- * @param submissionConfig specifies {@link org.apache.samza.config.ConfigLoaderFactory}
- * @return {@link ClusterBasedJobCoordinator}
- */
- @VisibleForTesting
- static ClusterBasedJobCoordinator createFromConfigLoader(Config submissionConfig) {
- JobConfig jobConfig = new JobConfig(submissionConfig);
-
- if (!jobConfig.getConfigLoaderFactory().isPresent()) {
- throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader");
- }
-
- MetricsRegistryMap metrics = new MetricsRegistryMap();
- // load full job config with ConfigLoader
- Config originalConfig = ConfigUtil.loadConfig(submissionConfig);
-
- // Execute planning
- ApplicationDescriptorImpl<? extends ApplicationDescriptor>
- appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig);
- RemoteJobPlanner planner = new RemoteJobPlanner(appDesc);
- List<JobConfig> jobConfigs = planner.prepareJobs();
-
- if (jobConfigs.size() != 1) {
- throw new SamzaException("Only support single remote job is supported.");
- }
-
- Config config = jobConfigs.get(0);
-
- // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
- CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true);
- DiagnosticsUtil.createDiagnosticsStream(config);
- MetadataStore metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config), metrics);
- metadataStore.init();
-
- return new ClusterBasedJobCoordinator(
- metrics,
- metadataStore,
- config);
- }
-
- /**
* Convert Samza config to command line arguments to invoke app.main.class
*
* @param config Samza config to convert.
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
new file mode 100644
index 0000000..fc1d34e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.List;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.execution.RemoteJobPlanner;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.util.CoordinatorStreamUtil;
+import org.apache.samza.util.DiagnosticsUtil;
+
+
+/**
+ * Util class to launch and run {@link ClusterBasedJobCoordinator}.
+ * This util is being used by both high/low and beam API Samza jobs.
+ */
+public class JobCoordinatorLaunchUtil {
+ /**
+ * Run {@link ClusterBasedJobCoordinator} with full job config.
+ *
+ * @param app SamzaApplication to run.
+ * @param config full job config.
+ */
+ @SuppressWarnings("rawtypes")
+ public static void run(SamzaApplication app, Config config) {
+ // Execute planning
+ ApplicationDescriptorImpl<? extends ApplicationDescriptor>
+ appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
+ RemoteJobPlanner planner = new RemoteJobPlanner(appDesc);
+ List<JobConfig> jobConfigs = planner.prepareJobs();
+
+ if (jobConfigs.size() != 1) {
+ throw new SamzaException("Only support single remote job is supported.");
+ }
+
+ Config finalConfig = jobConfigs.get(0);
+
+ // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
+ CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
+ DiagnosticsUtil.createDiagnosticsStream(finalConfig);
+ MetricsRegistryMap metrics = new MetricsRegistryMap();
+ MetadataStore
+ metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(finalConfig), metrics);
+ // MetadataStore will be closed in ClusterBasedJobCoordinator#onShutDown
+ // initialization of MetadataStore can be moved to ClusterBasedJobCoordinator after we clean up
+ // ClusterBasedJobCoordinator#createFromMetadataStore
+ metadataStore.init();
+
+ ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
+ metrics,
+ metadataStore,
+ finalConfig);
+ jc.run();
+ }
+
+ private JobCoordinatorLaunchUtil() {}
+}
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 513c075..967bc81 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -23,30 +23,25 @@ import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
-import org.apache.samza.application.MockStreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.system.MockSystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.ConfigUtil;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.junit.After;
import org.junit.Before;
@@ -68,7 +63,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.verifyPrivate;
-import static org.powermock.api.mockito.PowerMockito.verifyNew;
/**
@@ -213,35 +207,6 @@ public class TestClusterBasedJobCoordinator {
verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
}
- @Test(expected = SamzaException.class)
- public void testCreateFromConfigLoaderWithoutConfigLoaderFactory() {
- ClusterBasedJobCoordinator.createFromConfigLoader(new MapConfig());
- }
-
- @Test
- public void testCreateFromConfigLoader() throws Exception {
- Map<String, String> config = new HashMap<>();
- config.put(ApplicationConfig.APP_CLASS, MockStreamApplication.class.getCanonicalName());
- config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getCanonicalName());
- config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path",
- getClass().getResource("/test.properties").getPath());
- Config submissionConfig = new MapConfig(config);
- JobConfig fullJobConfig = new JobConfig(ConfigUtil.loadConfig(submissionConfig));
-
- RemoteJobPlanner mockJobPlanner = mock(RemoteJobPlanner.class);
- CoordinatorStreamStore mockCoordinatorStreamStore = mock(CoordinatorStreamStore.class);
-
- PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mock(ClusterBasedJobCoordinator.class));
- PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, "buildCoordinatorStreamConfig", any());
- PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
- PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(mockJobPlanner);
- when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullJobConfig));
-
- ClusterBasedJobCoordinator.createFromConfigLoader(submissionConfig);
-
- verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(fullJobConfig));
- }
-
@Test
public void testToArgs() {
ApplicationConfig appConfig = new ApplicationConfig(new MapConfig(ImmutableMap.of(
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
new file mode 100644
index 0000000..827e312
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.application.MockStreamApplication;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.execution.RemoteJobPlanner;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.util.ConfigUtil;
+import org.apache.samza.util.CoordinatorStreamUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.verifyNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+ CoordinatorStreamUtil.class,
+ JobCoordinatorLaunchUtil.class,
+ CoordinatorStreamStore.class,
+ RemoteJobPlanner.class})
+public class TestJobCoordinatorLaunchUtil {
+ @Test
+ public void testCreateFromConfigLoader() throws Exception {
+ Map<String, String> config = new HashMap<>();
+ config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getName());
+ config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path",
+ getClass().getResource("/test.properties").getPath());
+ JobConfig originalConfig = new JobConfig(ConfigUtil.loadConfig(new MapConfig(config)));
+ JobConfig fullJobConfig = new JobConfig(new MapConfig(originalConfig, Collections.singletonMap("isAfterPlanning", "true")));
+
+ RemoteJobPlanner mockJobPlanner = mock(RemoteJobPlanner.class);
+ CoordinatorStreamStore mockCoordinatorStreamStore = mock(CoordinatorStreamStore.class);
+ ClusterBasedJobCoordinator mockJC = mock(ClusterBasedJobCoordinator.class);
+
+ PowerMockito.mockStatic(CoordinatorStreamUtil.class);
+ PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, "buildCoordinatorStreamConfig", any());
+ PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
+ PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(mockJobPlanner);
+ PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mockJC);
+ when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullJobConfig));
+
+ JobCoordinatorLaunchUtil.run(new MockStreamApplication(), originalConfig);
+
+ verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(fullJobConfig));
+ verify(mockJC, times(1)).run();
+ }
+}