You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/02/14 02:02:26 UTC

[gobblin] branch master updated: [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service (#3638)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4382a008d [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service (#3638)
4382a008d is described below

commit 4382a008d365fe1b40327d0c10b85cb853a9f45a
Author: Andy Jiang <an...@outlook.com>
AuthorDate: Mon Feb 13 18:02:19 2023 -0800

    [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service (#3638)
    
    * Add in YarnService changes and unit test
    
    * Test Cleanup
    
    * Add in comments
    
    * Removing unused mocks
    
    * Remove unused imports
    
    * Address comments
    
    * Add javadoc
---
 .../java/org/apache/gobblin/yarn/YarnService.java  |  15 ++-
 .../org/apache/gobblin/yarn/YarnServiceTest.java   | 150 +++++++++++++++++++++
 .../src/test/resources/YarnServiceTest.conf        |  73 ++++++++++
 3 files changed, 235 insertions(+), 3 deletions(-)

diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index f899bf74e..86a377f44 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -207,6 +207,7 @@ public class YarnService extends AbstractIdleService {
   private final Map<String, Integer> resourcePriorityMap = new HashMap<>();
 
   private volatile boolean shutdownInProgress = false;
+  private volatile boolean startupInProgress = true;
 
   public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
       FileSystem fs, EventBus eventBus, HelixManager helixManager, HelixAdmin helixAdmin) throws Exception {
@@ -342,7 +343,7 @@ public class YarnService extends AbstractIdleService {
   }
 
   @Override
-  protected void startUp() throws Exception {
+  protected synchronized void startUp() throws Exception {
     LOGGER.info("Starting the YarnService");
 
     // Register itself with the EventBus for container-related requests
@@ -363,6 +364,7 @@ public class YarnService extends AbstractIdleService {
 
     LOGGER.info("Requesting initial containers");
     requestInitialContainers(this.initialContainers);
+    startupInProgress = false;
   }
 
   private void purgeHelixOfflineInstances(long laggingThresholdMs) {
@@ -462,10 +464,16 @@ public class YarnService extends AbstractIdleService {
    *
    * @param yarnContainerRequestBundle the desired containers information, including numbers, resource and helix tag
    * @param inUseInstances  a set of in use instances
+   * @return whether successfully requested the target number of containers
    */
-  public synchronized void requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
+  public synchronized boolean requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
     LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances count is {}, container map size is {}",
         yarnContainerRequestBundle.getTotalContainers(), inUseInstances.size(), this.containerMap.size());
+    if (startupInProgress) {
+      LOGGER.warn("YarnService is still starting up. Unable to request containers from yarn until YarnService is finished starting up.");
+      return false;
+    }
+
     int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
     // YARN can allocate more than the requested number of containers, compute additional allocations and deallocations
     // based on the max of the requested and actual allocated counts
@@ -524,6 +532,7 @@ public class YarnService extends AbstractIdleService {
     this.yarnContainerRequest = yarnContainerRequestBundle;
     LOGGER.info("Current tag-container desired count:{}, tag-container allocated: {}",
         yarnContainerRequestBundle.getHelixTagContainerCountMap(), this.allocatedContainerCountMap);
+    return true;
   }
 
   // Request initial containers with default resource and helix tag
@@ -624,7 +633,7 @@ public class YarnService extends AbstractIdleService {
   }
 
 
-  private ByteBuffer getSecurityTokens() throws IOException {
+  protected ByteBuffer getSecurityTokens() throws IOException {
     Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
     Closer closer = Closer.create();
     try {
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
new file mode 100644
index 000000000..76f094798
--- /dev/null
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class, RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+  final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+  private TestYarnService yarnService;
+  private Config config;
+  private YarnConfiguration clusterConf = new YarnConfiguration();
+  private final EventBus eventBus = new EventBus("YarnServiceTest");
+
+  AMRMClientAsync mockAMRMClient;
+  RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+  Resource mockResource;
+  FileSystem mockFs;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    mockAMRMClient = Mockito.mock(AMRMClientAsync.class);
+    mockRegisterApplicationMasterResponse = Mockito.mock(RegisterApplicationMasterResponse.class);
+    mockResource = Mockito.mock(Resource.class);
+    mockFs = Mockito.mock(FileSystem.class);
+
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+
+    this.config = ConfigFactory.parseURL(url).resolve();
+
+    PowerMockito.mockStatic(AMRMClientAsync.class);
+    PowerMockito.mockStatic(AMRMClientAsyncImpl.class);
+
+    when(AMRMClientAsync.createAMRMClientAsync(anyInt(), any(AMRMClientAsync.CallbackHandler.class)))
+        .thenReturn(mockAMRMClient);
+    doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
+    when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), anyString()))
+        .thenReturn(mockRegisterApplicationMasterResponse);
+    when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
+        .thenReturn(mockResource);
+
+    // Create the test yarn service, but don't start yet
+    this.yarnService = new TestYarnService(this.config, "testApp", "appId",
+        this.clusterConf, mockFs, this.eventBus);
+  }
+
+  /**
+   * Testing the race condition between the yarn start up and creating yarn container request
+   * Block on creating new yarn containers until start up of the yarn service and purging is complete
+   */
+  @Test(groups = {"gobblin.yarn"})
+  public void testYarnStartUpFirst() throws Exception{
+    // Not allowed to request target number of containers since yarnService hasn't started up yet.
+    Assert.assertFalse(this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET));
+
+    // Start the yarn service
+    this.yarnService.startUp();
+
+    // Allowed to request target number of containers after yarnService is started up.
+    Assert.assertTrue(this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET));
+  }
+
+  static class TestYarnService extends YarnService {
+    public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
+        FileSystem fs, EventBus eventBus) throws Exception {
+      super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, getMockHelixManager(config), getMockHelixAdmin());
+    }
+
+    private static HelixManager getMockHelixManager(Config config) {
+      HelixManager helixManager = Mockito.mock(HelixManager.class);
+      Mockito.when(helixManager.getClusterName()).thenReturn(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+      Mockito.when(helixManager.getMetadataStoreConnectionString()).thenReturn("stub");
+      return helixManager;
+    }
+
+    private static HelixAdmin getMockHelixAdmin() { return Mockito.mock(HelixAdmin.class); }
+
+    protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo)
+        throws IOException {
+      return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), Collections.emptyMap(),
+          Arrays.asList("sleep", "60000"), Collections.emptyMap(), null, Collections.emptyMap());
+    }
+
+    @Override
+    protected ByteBuffer getSecurityTokens() throws IOException { return mock(ByteBuffer.class); }
+  }
+
+  @ObjectFactory
+  public IObjectFactory getObjectFactory() {
+    return new PowerMockObjectFactory();
+  }
+}
diff --git a/gobblin-yarn/src/test/resources/YarnServiceTest.conf b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
new file mode 100644
index 000000000..73ecf85cd
--- /dev/null
+++ b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Yarn/Helix configuration properties
+gobblin.cluster.helix.cluster.name=YarnServiceTest
+gobblin.cluster.helixInstanceTags=GobblinKafkaStreaming
+gobblin.yarn.app.name=YarnServiceTest
+gobblin.yarn.work.dir=YarnServiceTest
+
+gobblin.yarn.lib.jars.dir="build/gobblin-yarn/libs"
+gobblin.yarn.conf.dir="gobblin-yarn/src/test/resources"
+gobblin.yarn.app.master.files.local=${gobblin.yarn.conf.dir}"/log4j-yarn.properties,"${gobblin.yarn.conf.dir}"/application.conf,yarn-site.xml,dynamic.conf"
+gobblin.yarn.container.files.local=${gobblin.yarn.app.master.files.local}
+gobblin.yarn.app.queue=default
+gobblin.yarn.app.master.memory.mbs=64
+gobblin.yarn.app.master.cores=1
+gobblin.yarn.app.report.interval.minutes=1
+gobblin.yarn.max.get.app.report.failures=4
+gobblin.yarn.email.notification.on.shutdown=false
+gobblin.yarn.initial.containers=0
+gobblin.yarn.container.memory.mbs=64
+gobblin.yarn.container.cores=1
+gobblin.yarn.container.affinity.enabled=true
+gobblin.yarn.helix.instance.max.retries=2
+gobblin.yarn.logs.sink.root.dir=${gobblin.yarn.work.dir}/applogs
+
+# File system URIs
+fs.uri="file:///"
+writer.fs.uri=${fs.uri}
+state.store.fs.uri=${fs.uri}
+
+# Writer related configuration properties
+writer.destination.type=HDFS
+writer.output.format=AVRO
+writer.staging.dir=${gobblin.yarn.work.dir}/task-staging
+writer.output.dir=${gobblin.yarn.work.dir}/task-output
+
+# Data publisher related configuration properties
+data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
+data.publisher.final.dir=${gobblin.yarn.work.dir}/job-output
+data.publisher.replace.final.dir=false
+
+# Directory where job/task state files are stored
+state.store.dir=${gobblin.yarn.work.dir}/state-store
+
+# Directory where error files from the quality checkers are stored
+qualitychecker.row.err.file=${gobblin.yarn.work.dir}/err
+
+# Disable job locking for now
+job.lock.enabled=false
+
+# Interval of task state reporting in milliseconds
+task.status.reportintervalinms=1000
+
+# If the job execution history server should be enabled
+job.execinfo.server.enabled=false
+
+# Enable metrics / events
+metrics.enabled=false
\ No newline at end of file