You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/02/14 02:02:26 UTC
[gobblin] branch master updated: [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service (#3638)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4382a008d [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service (#3638)
4382a008d is described below
commit 4382a008d365fe1b40327d0c10b85cb853a9f45a
Author: Andy Jiang <an...@outlook.com>
AuthorDate: Mon Feb 13 18:02:19 2023 -0800
[GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service (#3638)
* Add in YarnService changes and unit test
* Test Cleanup
* Add in comments
* Removing unused mocks
* Remove unused imports
* Address comments
* Add javadoc
---
.../java/org/apache/gobblin/yarn/YarnService.java | 15 ++-
.../org/apache/gobblin/yarn/YarnServiceTest.java | 150 +++++++++++++++++++++
.../src/test/resources/YarnServiceTest.conf | 73 ++++++++++
3 files changed, 235 insertions(+), 3 deletions(-)
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index f899bf74e..86a377f44 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -207,6 +207,7 @@ public class YarnService extends AbstractIdleService {
private final Map<String, Integer> resourcePriorityMap = new HashMap<>();
private volatile boolean shutdownInProgress = false;
+ private volatile boolean startupInProgress = true;
public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus, HelixManager helixManager, HelixAdmin helixAdmin) throws Exception {
@@ -342,7 +343,7 @@ public class YarnService extends AbstractIdleService {
}
@Override
- protected void startUp() throws Exception {
+ protected synchronized void startUp() throws Exception {
LOGGER.info("Starting the YarnService");
// Register itself with the EventBus for container-related requests
@@ -363,6 +364,7 @@ public class YarnService extends AbstractIdleService {
LOGGER.info("Requesting initial containers");
requestInitialContainers(this.initialContainers);
+ startupInProgress = false;
}
private void purgeHelixOfflineInstances(long laggingThresholdMs) {
@@ -462,10 +464,16 @@ public class YarnService extends AbstractIdleService {
*
* @param yarnContainerRequestBundle the desired containers information, including numbers, resource and helix tag
* @param inUseInstances a set of in use instances
+ * @return whether successfully requested the target number of containers
*/
- public synchronized void requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
+ public synchronized boolean requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances count is {}, container map size is {}",
yarnContainerRequestBundle.getTotalContainers(), inUseInstances.size(), this.containerMap.size());
+ if (startupInProgress) {
+ LOGGER.warn("YarnService is still starting up. Unable to request containers from yarn until YarnService is finished starting up.");
+ return false;
+ }
+
int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
// YARN can allocate more than the requested number of containers, compute additional allocations and deallocations
// based on the max of the requested and actual allocated counts
@@ -524,6 +532,7 @@ public class YarnService extends AbstractIdleService {
this.yarnContainerRequest = yarnContainerRequestBundle;
LOGGER.info("Current tag-container desired count:{}, tag-container allocated: {}",
yarnContainerRequestBundle.getHelixTagContainerCountMap(), this.allocatedContainerCountMap);
+ return true;
}
// Request initial containers with default resource and helix tag
@@ -624,7 +633,7 @@ public class YarnService extends AbstractIdleService {
}
- private ByteBuffer getSecurityTokens() throws IOException {
+ protected ByteBuffer getSecurityTokens() throws IOException {
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
Closer closer = Closer.create();
try {
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
new file mode 100644
index 000000000..76f094798
--- /dev/null
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class, RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+ final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+ private TestYarnService yarnService;
+ private Config config;
+ private YarnConfiguration clusterConf = new YarnConfiguration();
+ private final EventBus eventBus = new EventBus("YarnServiceTest");
+
+ AMRMClientAsync mockAMRMClient;
+ RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+ Resource mockResource;
+ FileSystem mockFs;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ mockAMRMClient = Mockito.mock(AMRMClientAsync.class);
+ mockRegisterApplicationMasterResponse = Mockito.mock(RegisterApplicationMasterResponse.class);
+ mockResource = Mockito.mock(Resource.class);
+ mockFs = Mockito.mock(FileSystem.class);
+
+ URL url = YarnServiceTest.class.getClassLoader()
+ .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+ Assert.assertNotNull(url, "Could not find resource " + url);
+
+ this.config = ConfigFactory.parseURL(url).resolve();
+
+ PowerMockito.mockStatic(AMRMClientAsync.class);
+ PowerMockito.mockStatic(AMRMClientAsyncImpl.class);
+
+ when(AMRMClientAsync.createAMRMClientAsync(anyInt(), any(AMRMClientAsync.CallbackHandler.class)))
+ .thenReturn(mockAMRMClient);
+ doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
+ when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), anyString()))
+ .thenReturn(mockRegisterApplicationMasterResponse);
+ when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
+ .thenReturn(mockResource);
+
+ // Create the test yarn service, but don't start yet
+ this.yarnService = new TestYarnService(this.config, "testApp", "appId",
+ this.clusterConf, mockFs, this.eventBus);
+ }
+
+ /**
+ * Testing the race condition between the yarn start up and creating yarn container request
+ * Block on creating new yarn containers until start up of the yarn service and purging is complete
+ */
+ @Test(groups = {"gobblin.yarn"})
+ public void testYarnStartUpFirst() throws Exception{
+ // Not allowed to request target number of containers since yarnService hasn't started up yet.
+ Assert.assertFalse(this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET));
+
+ // Start the yarn service
+ this.yarnService.startUp();
+
+ // Allowed to request target number of containers after yarnService is started up.
+ Assert.assertTrue(this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET));
+ }
+
+ static class TestYarnService extends YarnService {
+ public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
+ FileSystem fs, EventBus eventBus) throws Exception {
+ super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, getMockHelixManager(config), getMockHelixAdmin());
+ }
+
+ private static HelixManager getMockHelixManager(Config config) {
+ HelixManager helixManager = Mockito.mock(HelixManager.class);
+ Mockito.when(helixManager.getClusterName()).thenReturn(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+ Mockito.when(helixManager.getMetadataStoreConnectionString()).thenReturn("stub");
+ return helixManager;
+ }
+
+ private static HelixAdmin getMockHelixAdmin() { return Mockito.mock(HelixAdmin.class); }
+
+ protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo)
+ throws IOException {
+ return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), Collections.emptyMap(),
+ Arrays.asList("sleep", "60000"), Collections.emptyMap(), null, Collections.emptyMap());
+ }
+
+ @Override
+ protected ByteBuffer getSecurityTokens() throws IOException { return mock(ByteBuffer.class); }
+ }
+
+ @ObjectFactory
+ public IObjectFactory getObjectFactory() {
+ return new PowerMockObjectFactory();
+ }
+}
diff --git a/gobblin-yarn/src/test/resources/YarnServiceTest.conf b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
new file mode 100644
index 000000000..73ecf85cd
--- /dev/null
+++ b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Yarn/Helix configuration properties
+gobblin.cluster.helix.cluster.name=YarnServiceTest
+gobblin.cluster.helixInstanceTags=GobblinKafkaStreaming
+gobblin.yarn.app.name=YarnServiceTest
+gobblin.yarn.work.dir=YarnServiceTest
+
+gobblin.yarn.lib.jars.dir="build/gobblin-yarn/libs"
+gobblin.yarn.conf.dir="gobblin-yarn/src/test/resources"
+gobblin.yarn.app.master.files.local=${gobblin.yarn.conf.dir}"/log4j-yarn.properties,"${gobblin.yarn.conf.dir}"/application.conf,yarn-site.xml,dynamic.conf"
+gobblin.yarn.container.files.local=${gobblin.yarn.app.master.files.local}
+gobblin.yarn.app.queue=default
+gobblin.yarn.app.master.memory.mbs=64
+gobblin.yarn.app.master.cores=1
+gobblin.yarn.app.report.interval.minutes=1
+gobblin.yarn.max.get.app.report.failures=4
+gobblin.yarn.email.notification.on.shutdown=false
+gobblin.yarn.initial.containers=0
+gobblin.yarn.container.memory.mbs=64
+gobblin.yarn.container.cores=1
+gobblin.yarn.container.affinity.enabled=true
+gobblin.yarn.helix.instance.max.retries=2
+gobblin.yarn.logs.sink.root.dir=${gobblin.yarn.work.dir}/applogs
+
+# File system URIs
+fs.uri="file:///"
+writer.fs.uri=${fs.uri}
+state.store.fs.uri=${fs.uri}
+
+# Writer related configuration properties
+writer.destination.type=HDFS
+writer.output.format=AVRO
+writer.staging.dir=${gobblin.yarn.work.dir}/task-staging
+writer.output.dir=${gobblin.yarn.work.dir}/task-output
+
+# Data publisher related configuration properties
+data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
+data.publisher.final.dir=${gobblin.yarn.work.dir}/job-output
+data.publisher.replace.final.dir=false
+
+# Directory where job/task state files are stored
+state.store.dir=${gobblin.yarn.work.dir}/state-store
+
+# Directory where error files from the quality checkers are stored
+qualitychecker.row.err.file=${gobblin.yarn.work.dir}/err
+
+# Disable job locking for now
+job.lock.enabled=false
+
+# Interval of task state reporting in milliseconds
+task.status.reportintervalinms=1000
+
+# If the job execution history server should be enabled
+job.execinfo.server.enabled=false
+
+# Enable metrics / events
+metrics.enabled=false
\ No newline at end of file