You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "AndyJiang99 (via GitHub)" <gi...@apache.org> on 2023/02/07 23:15:00 UTC

[GitHub] [gobblin] AndyJiang99 opened a new pull request, #3638: [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service

AndyJiang99 opened a new pull request, #3638:
URL: https://github.com/apache/gobblin/pull/3638

   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   Helix instances are purged during startup of the yarn service. This operation must be done without new helix instances being added or removed (i.e. the API call is not thread safe).
   The current implementation blocks the yarn service from allocating initial containers while the helix instance purging is enabled, but it does not prevent other external services from requesting containers through its public methods.
   These 2 services start up concurrently, and it's possible that the AutoScalingYarnManager starts up before the Yarn Service is completely finished purging. This means leads to the AutoScalingYarnManager to requestContainers while the instances are still purging.
   
   This change makes the yarn service start up synchronous and also adds a flag to indicate if start up is complete. This flag is switched when the yarn service has completed start up. The requestTargetNumberOfContainers function will read this flag to see if yarn service startup is completed. This prevents the requestContainers to execute prior to the completion of yarn service startup and prevents a race condition.
   
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1781
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] AndyJiang99 commented on a diff in pull request #3638: [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service

Posted by "AndyJiang99 (via GitHub)" <gi...@apache.org>.
AndyJiang99 commented on code in PR #3638:
URL: https://github.com/apache/gobblin/pull/3638#discussion_r1099445812


##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class, RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+  final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+  private TestYarnService yarnService;
+  private Config config;
+  private YarnConfiguration clusterConf = new YarnConfiguration();
+  private final EventBus eventBus = new EventBus("YarnServiceTest");
+  @Mock
+  AMRMClientAsync mockAMRMClient;
+  @Mock
+  RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+  @Mock
+  Resource mockResource;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    mockAMRMClient = Mockito.mock(AMRMClientAsync.class);

Review Comment:
   The mock annotations doesn't seem to run these mocks, still had to reinit these. But I got rid of the mock annotations at the top and it's good to go



##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class, RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+  final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+  private TestYarnService yarnService;
+  private Config config;
+  private YarnConfiguration clusterConf = new YarnConfiguration();
+  private final EventBus eventBus = new EventBus("YarnServiceTest");
+  @Mock
+  AMRMClientAsync mockAMRMClient;
+  @Mock
+  RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+  @Mock
+  Resource mockResource;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    mockAMRMClient = Mockito.mock(AMRMClientAsync.class);
+    mockRegisterApplicationMasterResponse = Mockito.mock(RegisterApplicationMasterResponse.class);
+    mockResource = Mockito.mock(Resource.class);
+
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+
+    this.config = ConfigFactory.parseURL(url).resolve();
+
+    PowerMockito.mockStatic(AMRMClientAsync.class);
+    PowerMockito.mockStatic(AMRMClientAsyncImpl.class);
+
+    when(AMRMClientAsync.createAMRMClientAsync(anyInt(), any(AMRMClientAsync.CallbackHandler.class)))
+        .thenReturn(mockAMRMClient);
+    doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
+    when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), anyString()))
+        .thenReturn(mockRegisterApplicationMasterResponse);
+    when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
+        .thenReturn(mockResource);
+    FileSystem fs = Mockito.mock(FileSystem.class);
+
+    // Create the test yarn service, but don't start yet
+    this.yarnService = new TestYarnService(this.config, "testApp", "appId",
+        this.clusterConf, fs, this.eventBus);
+  }
+
+  /**
+   * Testing the race condition between the yarn start up and creating yarn container request
+   * Block on creating new yarn containers until start up of the yarn service and purging is complete
+   */
+  @Test(groups = {"gobblin.yarn"})
+  public void testYarnStartUpFirst() throws Exception{
+    boolean canRequestNewContainers = this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET);
+
+    // Not allowed to request target number of containers since yarnService hasn't started up yet.
+    Assert.assertFalse(canRequestNewContainers);
+
+    // Start the yarn service
+    this.yarnService.startUp();
+    canRequestNewContainers = this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET);
+
+    // Allowed to request target number of containers after yarnService is started up.
+    Assert.assertTrue(canRequestNewContainers);
+  }
+
+  static class TestYarnService extends YarnService {
+    public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
+        FileSystem fs, EventBus eventBus) throws Exception {
+      super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, getMockHelixManager(config), getMockHelixAdmin());
+    }
+
+    private static HelixManager getMockHelixManager(Config config) {
+      HelixManager helixManager = Mockito.mock(HelixManager.class);
+      HelixDataAccessor helixDataAccessor = Mockito.mock(HelixDataAccessor.class);
+      PropertyKey propertyKey = Mockito.mock(PropertyKey.class);
+      PropertyKey.Builder propertyKeyBuilder = Mockito.mock(PropertyKey.Builder.class);
+
+      Mockito.when(helixManager.getInstanceName()).thenReturn("helixInstance1");
+      Mockito.when(helixManager.getClusterName()).thenReturn(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+
+      Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
+      Mockito.when(helixManager.getMetadataStoreConnectionString()).thenReturn("stub");
+      Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(propertyKeyBuilder);
+      Mockito.when(propertyKeyBuilder.liveInstance(Mockito.anyString())).thenReturn(propertyKey);
+      Mockito.when(helixDataAccessor.getProperty(propertyKey)).thenReturn(null);
+
+      return helixManager;
+    }
+
+    private static HelixAdmin getMockHelixAdmin() {
+      HelixAdmin helixAdmin = Mockito.mock(HelixAdmin.class);
+      Mockito.doNothing().when(helixAdmin).purgeOfflineInstances(Mockito.anyString(), Mockito.anyLong());
+      Mockito.doNothing().when(helixAdmin).enableInstance(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean());

Review Comment:
   These aren't used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3638: [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3638:
URL: https://github.com/apache/gobblin/pull/3638#discussion_r1099396196


##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -208,6 +208,9 @@ public class YarnService extends AbstractIdleService {
 
   private volatile boolean shutdownInProgress = false;
 
+  @VisibleForTesting

Review Comment:
   Is this really referenced in the tests?



##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class, RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+  final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+  private TestYarnService yarnService;
+  private Config config;
+  private YarnConfiguration clusterConf = new YarnConfiguration();
+  private final EventBus eventBus = new EventBus("YarnServiceTest");
+  @Mock
+  AMRMClientAsync mockAMRMClient;
+  @Mock
+  RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+  @Mock
+  Resource mockResource;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    mockAMRMClient = Mockito.mock(AMRMClientAsync.class);
+    mockRegisterApplicationMasterResponse = Mockito.mock(RegisterApplicationMasterResponse.class);
+    mockResource = Mockito.mock(Resource.class);
+
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+
+    this.config = ConfigFactory.parseURL(url).resolve();
+
+    PowerMockito.mockStatic(AMRMClientAsync.class);
+    PowerMockito.mockStatic(AMRMClientAsyncImpl.class);
+
+    when(AMRMClientAsync.createAMRMClientAsync(anyInt(), any(AMRMClientAsync.CallbackHandler.class)))
+        .thenReturn(mockAMRMClient);
+    doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
+    when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), anyString()))
+        .thenReturn(mockRegisterApplicationMasterResponse);
+    when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
+        .thenReturn(mockResource);
+    FileSystem fs = Mockito.mock(FileSystem.class);
+
+    // Create the test yarn service, but don't start yet
+    this.yarnService = new TestYarnService(this.config, "testApp", "appId",
+        this.clusterConf, fs, this.eventBus);
+  }
+
+  /**
+   * Testing the race condition between the yarn start up and creating yarn container request
+   * Block on creating new yarn containers until start up of the yarn service and purging is complete
+   */
+  @Test(groups = {"gobblin.yarn"})
+  public void testYarnStartUpFirst() throws Exception{
+    boolean canRequestNewContainers = this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET);
+
+    // Not allowed to request target number of containers since yarnService hasn't started up yet.
+    Assert.assertFalse(canRequestNewContainers);
+
+    // Start the yarn service
+    this.yarnService.startUp();
+    canRequestNewContainers = this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET);
+
+    // Allowed to request target number of containers after yarnService is started up.
+    Assert.assertTrue(canRequestNewContainers);
+  }
+
+  static class TestYarnService extends YarnService {
+    public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
+        FileSystem fs, EventBus eventBus) throws Exception {
+      super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, getMockHelixManager(config), getMockHelixAdmin());
+    }
+
+    private static HelixManager getMockHelixManager(Config config) {
+      HelixManager helixManager = Mockito.mock(HelixManager.class);
+      HelixDataAccessor helixDataAccessor = Mockito.mock(HelixDataAccessor.class);
+      PropertyKey propertyKey = Mockito.mock(PropertyKey.class);
+      PropertyKey.Builder propertyKeyBuilder = Mockito.mock(PropertyKey.Builder.class);
+
+      Mockito.when(helixManager.getInstanceName()).thenReturn("helixInstance1");
+      Mockito.when(helixManager.getClusterName()).thenReturn(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+
+      Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
+      Mockito.when(helixManager.getMetadataStoreConnectionString()).thenReturn("stub");
+      Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(propertyKeyBuilder);
+      Mockito.when(propertyKeyBuilder.liveInstance(Mockito.anyString())).thenReturn(propertyKey);
+      Mockito.when(helixDataAccessor.getProperty(propertyKey)).thenReturn(null);

Review Comment:
   Are these ever used? 



##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class, RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+  final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+  private TestYarnService yarnService;
+  private Config config;
+  private YarnConfiguration clusterConf = new YarnConfiguration();
+  private final EventBus eventBus = new EventBus("YarnServiceTest");
+  @Mock
+  AMRMClientAsync mockAMRMClient;
+  @Mock
+  RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+  @Mock
+  Resource mockResource;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    mockAMRMClient = Mockito.mock(AMRMClientAsync.class);
+    mockRegisterApplicationMasterResponse = Mockito.mock(RegisterApplicationMasterResponse.class);
+    mockResource = Mockito.mock(Resource.class);
+
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+
+    this.config = ConfigFactory.parseURL(url).resolve();
+
+    PowerMockito.mockStatic(AMRMClientAsync.class);
+    PowerMockito.mockStatic(AMRMClientAsyncImpl.class);
+
+    when(AMRMClientAsync.createAMRMClientAsync(anyInt(), any(AMRMClientAsync.CallbackHandler.class)))
+        .thenReturn(mockAMRMClient);
+    doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
+    when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), anyString()))
+        .thenReturn(mockRegisterApplicationMasterResponse);
+    when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
+        .thenReturn(mockResource);
+    FileSystem fs = Mockito.mock(FileSystem.class);
+
+    // Create the test yarn service, but don't start yet
+    this.yarnService = new TestYarnService(this.config, "testApp", "appId",
+        this.clusterConf, fs, this.eventBus);
+  }
+
+  /**
+   * Testing the race condition between the yarn start up and creating yarn container request
+   * Block on creating new yarn containers until start up of the yarn service and purging is complete
+   */
+  @Test(groups = {"gobblin.yarn"})
+  public void testYarnStartUpFirst() throws Exception{
+    boolean canRequestNewContainers = this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET);
+
+    // Not allowed to request target number of containers since yarnService hasn't started up yet.
+    Assert.assertFalse(canRequestNewContainers);
+
+    // Start the yarn service
+    this.yarnService.startUp();
+    canRequestNewContainers = this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET);

Review Comment:
   Nit: I think the extra variable here seems overkill and differs from the style in other unit tests. Most other unit tests just assert the method call directly, especially if it's a true or false return value



##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class, RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+  final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+  private TestYarnService yarnService;
+  private Config config;
+  private YarnConfiguration clusterConf = new YarnConfiguration();
+  private final EventBus eventBus = new EventBus("YarnServiceTest");
+  @Mock
+  AMRMClientAsync mockAMRMClient;
+  @Mock
+  RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+  @Mock
+  Resource mockResource;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    mockAMRMClient = Mockito.mock(AMRMClientAsync.class);
+    mockRegisterApplicationMasterResponse = Mockito.mock(RegisterApplicationMasterResponse.class);
+    mockResource = Mockito.mock(Resource.class);
+
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+
+    this.config = ConfigFactory.parseURL(url).resolve();
+
+    PowerMockito.mockStatic(AMRMClientAsync.class);
+    PowerMockito.mockStatic(AMRMClientAsyncImpl.class);
+
+    when(AMRMClientAsync.createAMRMClientAsync(anyInt(), any(AMRMClientAsync.CallbackHandler.class)))
+        .thenReturn(mockAMRMClient);
+    doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
+    when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), anyString()))
+        .thenReturn(mockRegisterApplicationMasterResponse);
+    when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
+        .thenReturn(mockResource);
+    FileSystem fs = Mockito.mock(FileSystem.class);
+
+    // Create the test yarn service, but don't start yet
+    this.yarnService = new TestYarnService(this.config, "testApp", "appId",
+        this.clusterConf, fs, this.eventBus);
+  }
+
+  /**
+   * Testing the race condition between the yarn start up and creating yarn container request
+   * Block on creating new yarn containers until start up of the yarn service and purging is complete
+   */
+  @Test(groups = {"gobblin.yarn"})
+  public void testYarnStartUpFirst() throws Exception{
+    boolean canRequestNewContainers = this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET);
+
+    // Not allowed to request target number of containers since yarnService hasn't started up yet.
+    Assert.assertFalse(canRequestNewContainers);
+
+    // Start the yarn service
+    this.yarnService.startUp();
+    canRequestNewContainers = this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET);
+
+    // Allowed to request target number of containers after yarnService is started up.
+    Assert.assertTrue(canRequestNewContainers);
+  }
+
+  static class TestYarnService extends YarnService {
+    public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
+        FileSystem fs, EventBus eventBus) throws Exception {
+      super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, getMockHelixManager(config), getMockHelixAdmin());
+    }
+
+    private static HelixManager getMockHelixManager(Config config) {
+      HelixManager helixManager = Mockito.mock(HelixManager.class);
+      HelixDataAccessor helixDataAccessor = Mockito.mock(HelixDataAccessor.class);
+      PropertyKey propertyKey = Mockito.mock(PropertyKey.class);
+      PropertyKey.Builder propertyKeyBuilder = Mockito.mock(PropertyKey.Builder.class);
+
+      Mockito.when(helixManager.getInstanceName()).thenReturn("helixInstance1");
+      Mockito.when(helixManager.getClusterName()).thenReturn(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+
+      Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
+      Mockito.when(helixManager.getMetadataStoreConnectionString()).thenReturn("stub");
+      Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(propertyKeyBuilder);
+      Mockito.when(propertyKeyBuilder.liveInstance(Mockito.anyString())).thenReturn(propertyKey);
+      Mockito.when(helixDataAccessor.getProperty(propertyKey)).thenReturn(null);
+
+      return helixManager;
+    }
+
+    private static HelixAdmin getMockHelixAdmin() {
+      HelixAdmin helixAdmin = Mockito.mock(HelixAdmin.class);
+      Mockito.doNothing().when(helixAdmin).purgeOfflineInstances(Mockito.anyString(), Mockito.anyLong());
+      Mockito.doNothing().when(helixAdmin).enableInstance(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean());

Review Comment:
   Is this used? I thought purge offline instances is disabled



##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class, RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+  final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+  private TestYarnService yarnService;
+  private Config config;
+  private YarnConfiguration clusterConf = new YarnConfiguration();
+  private final EventBus eventBus = new EventBus("YarnServiceTest");
+  @Mock
+  AMRMClientAsync mockAMRMClient;
+  @Mock
+  RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+  @Mock
+  Resource mockResource;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    mockAMRMClient = Mockito.mock(AMRMClientAsync.class);
+    mockRegisterApplicationMasterResponse = Mockito.mock(RegisterApplicationMasterResponse.class);
+    mockResource = Mockito.mock(Resource.class);
+
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+
+    this.config = ConfigFactory.parseURL(url).resolve();
+
+    PowerMockito.mockStatic(AMRMClientAsync.class);
+    PowerMockito.mockStatic(AMRMClientAsyncImpl.class);
+
+    when(AMRMClientAsync.createAMRMClientAsync(anyInt(), any(AMRMClientAsync.CallbackHandler.class)))
+        .thenReturn(mockAMRMClient);
+    doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
+    when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), anyString()))
+        .thenReturn(mockRegisterApplicationMasterResponse);
+    when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
+        .thenReturn(mockResource);
+    FileSystem fs = Mockito.mock(FileSystem.class);

Review Comment:
   Nit: This mock call feels out of place, since you init the mocks elsewhere as annotations. I'd suggest having this as an annotation too



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -208,6 +208,9 @@ public class YarnService extends AbstractIdleService {
 
   private volatile boolean shutdownInProgress = false;
 
+  @VisibleForTesting
+  protected volatile boolean startupInProgress = true;

Review Comment:
   What's the reason for this being protected?



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -463,9 +467,13 @@ private EventSubmitter buildEventSubmitter() {
    * @param yarnContainerRequestBundle the desired containers information, including numbers, resource and helix tag
    * @param inUseInstances  a set of in use instances
    */
-  public synchronized void requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
+  public synchronized boolean requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
     LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances count is {}, container map size is {}",
         yarnContainerRequestBundle.getTotalContainers(), inUseInstances.size(), this.containerMap.size());
+    if (startupInProgress) {
+      LOGGER.info("RequestTargetNumberOfContainers waiting for startup to complete first.");
+      return false;
+    }

Review Comment:
   Nit: add a new line after code block



##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class, RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+  final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+  private TestYarnService yarnService;
+  private Config config;
+  private YarnConfiguration clusterConf = new YarnConfiguration();
+  private final EventBus eventBus = new EventBus("YarnServiceTest");
+  @Mock
+  AMRMClientAsync mockAMRMClient;
+  @Mock
+  RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+  @Mock
+  Resource mockResource;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    mockAMRMClient = Mockito.mock(AMRMClientAsync.class);
+    mockRegisterApplicationMasterResponse = Mockito.mock(RegisterApplicationMasterResponse.class);
+    mockResource = Mockito.mock(Resource.class);
+
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+
+    this.config = ConfigFactory.parseURL(url).resolve();
+
+    PowerMockito.mockStatic(AMRMClientAsync.class);
+    PowerMockito.mockStatic(AMRMClientAsyncImpl.class);
+
+    when(AMRMClientAsync.createAMRMClientAsync(anyInt(), any(AMRMClientAsync.CallbackHandler.class)))
+        .thenReturn(mockAMRMClient);
+    doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
+    when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), anyString()))
+        .thenReturn(mockRegisterApplicationMasterResponse);
+    when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
+        .thenReturn(mockResource);
+    FileSystem fs = Mockito.mock(FileSystem.class);

Review Comment:
   Also, you prefix your variables with mock. but this one is missing that prefix



##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class, RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+  final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+  private TestYarnService yarnService;
+  private Config config;
+  private YarnConfiguration clusterConf = new YarnConfiguration();
+  private final EventBus eventBus = new EventBus("YarnServiceTest");
+  @Mock
+  AMRMClientAsync mockAMRMClient;
+  @Mock
+  RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+  @Mock
+  Resource mockResource;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    mockAMRMClient = Mockito.mock(AMRMClientAsync.class);

Review Comment:
   If you are using the mock annotations, I think you don't need to reinit these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3638: [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3638:
URL: https://github.com/apache/gobblin/pull/3638#discussion_r1105137753


##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -462,10 +464,16 @@ private EventSubmitter buildEventSubmitter() {
    *
    * @param yarnContainerRequestBundle the desired containers information, including numbers, resource and helix tag
    * @param inUseInstances  a set of in use instances
+   * @return whether the requestTargetNumberOfContainers function has executed yet

Review Comment:
   I would argue that the return value is whether or not the requested number of containers could be actually obtained after service initialization.  describing the execution itself is ambiguous since it still executes when you return false



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] codecov-commenter commented on pull request #3638: [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #3638:
URL: https://github.com/apache/gobblin/pull/3638#issuecomment-1421690291

   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3638](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (32242fc) into [master](https://codecov.io/gh/apache/gobblin/commit/69f3f9c33c9679d723c447fbe626db8f1b7afa9e?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (69f3f9c) will **increase** coverage by `3.48%`.
   > The diff coverage is `16.66%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3638      +/-   ##
   ============================================
   + Coverage     46.58%   50.07%   +3.48%     
   + Complexity    10672     5766    -4906     
   ============================================
     Files          2133     1065    -1068     
     Lines         83557    40499   -43058     
     Branches       9290     4541    -4749     
   ============================================
   - Hits          38928    20278   -18650     
   + Misses        41068    18466   -22602     
   + Partials       3561     1755    -1806     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...main/java/org/apache/gobblin/yarn/YarnService.java](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vWWFyblNlcnZpY2UuamF2YQ==) | `15.08% <16.66%> (+0.05%)` | :arrow_up: |
   | [...a/management/copy/publisher/CopyDataPublisher.java](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvcHVibGlzaGVyL0NvcHlEYXRhUHVibGlzaGVyLmphdmE=) | `72.25% <0.00%> (-1.30%)` | :arrow_down: |
   | [...anagement/copy/replication/ConfigBasedDataset.java](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvcmVwbGljYXRpb24vQ29uZmlnQmFzZWREYXRhc2V0LmphdmE=) | `68.87% <0.00%> (ø)` | |
   | [...apache/gobblin/service/modules/flow/FlowUtils.java](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9mbG93L0Zsb3dVdGlscy5qYXZh) | | |
   | [...a/org/apache/gobblin/util/http/HttpLimiterKey.java](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvaHR0cC9IdHRwTGltaXRlcktleS5qYXZh) | | |
   | [...ain/java/org/apache/gobblin/writer/JdbcWriter.java](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tc3FsL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3dyaXRlci9KZGJjV3JpdGVyLmphdmE=) | | |
   | [.../org/apache/gobblin/metastore/MetaStoreModule.java](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tZXRhc3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vbWV0YXN0b3JlL01ldGFTdG9yZU1vZHVsZS5qYXZh) | | |
   | [...he/gobblin/config/common/impl/ImportTraverser.java](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb25maWctbWFuYWdlbWVudC9nb2JibGluLWNvbmZpZy1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbmZpZy9jb21tb24vaW1wbC9JbXBvcnRUcmF2ZXJzZXIuamF2YQ==) | | |
   | [.../metrics/kafka/KafkaAvroEventKeyValueReporter.java](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL21ldHJpY3Mva2Fma2EvS2Fma2FBdnJvRXZlbnRLZXlWYWx1ZVJlcG9ydGVyLmphdmE=) | | |
   | [...rvice/modules/flowgraph/pathfinder/PathFinder.java](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9mbG93Z3JhcGgvcGF0aGZpbmRlci9QYXRoRmluZGVyLmphdmE=) | | |
   | ... and [1063 more](https://codecov.io/gh/apache/gobblin/pull/3638?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] AndyJiang99 commented on a diff in pull request #3638: [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service

Posted by "AndyJiang99 (via GitHub)" <gi...@apache.org>.
AndyJiang99 commented on code in PR #3638:
URL: https://github.com/apache/gobblin/pull/3638#discussion_r1099446504


##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.testng.PowerMockObjectFactory;
+import org.powermock.modules.testng.PowerMockTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+
+import static org.mockito.Matchers.*;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@PrepareForTest({AMRMClientAsync.class, RegisterApplicationMasterResponse.class})
+@PowerMockIgnore({"javax.management.*"})
+public class YarnServiceTest extends PowerMockTestCase{
+  final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+  private TestYarnService yarnService;
+  private Config config;
+  private YarnConfiguration clusterConf = new YarnConfiguration();
+  private final EventBus eventBus = new EventBus("YarnServiceTest");
+  @Mock
+  AMRMClientAsync mockAMRMClient;
+  @Mock
+  RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse;
+  @Mock
+  Resource mockResource;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    mockAMRMClient = Mockito.mock(AMRMClientAsync.class);
+    mockRegisterApplicationMasterResponse = Mockito.mock(RegisterApplicationMasterResponse.class);
+    mockResource = Mockito.mock(Resource.class);
+
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+
+    this.config = ConfigFactory.parseURL(url).resolve();
+
+    PowerMockito.mockStatic(AMRMClientAsync.class);
+    PowerMockito.mockStatic(AMRMClientAsyncImpl.class);
+
+    when(AMRMClientAsync.createAMRMClientAsync(anyInt(), any(AMRMClientAsync.CallbackHandler.class)))
+        .thenReturn(mockAMRMClient);
+    doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
+    when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), anyString()))
+        .thenReturn(mockRegisterApplicationMasterResponse);
+    when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
+        .thenReturn(mockResource);
+    FileSystem fs = Mockito.mock(FileSystem.class);
+
+    // Create the test yarn service, but don't start yet
+    this.yarnService = new TestYarnService(this.config, "testApp", "appId",
+        this.clusterConf, fs, this.eventBus);
+  }
+
+  /**
+   * Testing the race condition between the yarn start up and creating yarn container request
+   * Block on creating new yarn containers until start up of the yarn service and purging is complete
+   */
+  @Test(groups = {"gobblin.yarn"})
+  public void testYarnStartUpFirst() throws Exception{
+    boolean canRequestNewContainers = this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET);
+
+    // Not allowed to request target number of containers since yarnService hasn't started up yet.
+    Assert.assertFalse(canRequestNewContainers);
+
+    // Start the yarn service
+    this.yarnService.startUp();
+    canRequestNewContainers = this.yarnService.requestTargetNumberOfContainers(new YarnContainerRequestBundle(), Collections.EMPTY_SET);
+
+    // Allowed to request target number of containers after yarnService is started up.
+    Assert.assertTrue(canRequestNewContainers);
+  }
+
+  static class TestYarnService extends YarnService {
+    public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
+        FileSystem fs, EventBus eventBus) throws Exception {
+      super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, getMockHelixManager(config), getMockHelixAdmin());
+    }
+
+    private static HelixManager getMockHelixManager(Config config) {
+      HelixManager helixManager = Mockito.mock(HelixManager.class);
+      HelixDataAccessor helixDataAccessor = Mockito.mock(HelixDataAccessor.class);
+      PropertyKey propertyKey = Mockito.mock(PropertyKey.class);
+      PropertyKey.Builder propertyKeyBuilder = Mockito.mock(PropertyKey.Builder.class);
+
+      Mockito.when(helixManager.getInstanceName()).thenReturn("helixInstance1");
+      Mockito.when(helixManager.getClusterName()).thenReturn(config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+
+      Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
+      Mockito.when(helixManager.getMetadataStoreConnectionString()).thenReturn("stub");
+      Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(propertyKeyBuilder);
+      Mockito.when(propertyKeyBuilder.liveInstance(Mockito.anyString())).thenReturn(propertyKey);
+      Mockito.when(helixDataAccessor.getProperty(propertyKey)).thenReturn(null);

Review Comment:
   Some of these variables are used, like clusterName and MetadataStoreName. I've removed the ones that aren't used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3638: [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3638:
URL: https://github.com/apache/gobblin/pull/3638#discussion_r1099412670


##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -208,6 +208,9 @@ public class YarnService extends AbstractIdleService {
 
   private volatile boolean shutdownInProgress = false;
 
+  @VisibleForTesting
+  protected volatile boolean startupInProgress = true;

Review Comment:
   Is this protected because startup is also protected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo merged pull request #3638: [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo merged PR #3638:
URL: https://github.com/apache/gobblin/pull/3638


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] homatthew commented on a diff in pull request #3638: [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service

Posted by "homatthew (via GitHub)" <gi...@apache.org>.
homatthew commented on code in PR #3638:
URL: https://github.com/apache/gobblin/pull/3638#discussion_r1099417152


##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -463,9 +467,13 @@ private EventSubmitter buildEventSubmitter() {
    * @param yarnContainerRequestBundle the desired containers information, including numbers, resource and helix tag
    * @param inUseInstances  a set of in use instances
    */
-  public synchronized void requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
+  public synchronized boolean requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
     LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances count is {}, container map size is {}",
         yarnContainerRequestBundle.getTotalContainers(), inUseInstances.size(), this.containerMap.size());
+    if (startupInProgress) {
+      LOGGER.info("RequestTargetNumberOfContainers waiting for startup to complete first.");

Review Comment:
   The log is a bit confusing to those reading it. Most people won't know what this method call is. Consider a log more informative like:
   > The yarn service is still starting up. Unable to request containers from yarn until YarnService is finished starting up. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3638: [GOBBLIN-1781] Helix offline instance purging is not thread safe in the yarn service

Posted by "Will-Lo (via GitHub)" <gi...@apache.org>.
Will-Lo commented on code in PR #3638:
URL: https://github.com/apache/gobblin/pull/3638#discussion_r1100589492


##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -463,9 +465,14 @@ private EventSubmitter buildEventSubmitter() {
    * @param yarnContainerRequestBundle the desired containers information, including numbers, resource and helix tag
    * @param inUseInstances  a set of in use instances
    */

Review Comment:
   Update the javadoc for what the return value means in this case



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java:
##########
@@ -463,9 +465,14 @@ private EventSubmitter buildEventSubmitter() {
    * @param yarnContainerRequestBundle the desired containers information, including numbers, resource and helix tag
    * @param inUseInstances  a set of in use instances
    */
-  public synchronized void requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
+  public synchronized boolean requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
     LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances count is {}, container map size is {}",
         yarnContainerRequestBundle.getTotalContainers(), inUseInstances.size(), this.containerMap.size());
+    if (startupInProgress) {
+      LOGGER.info("YarnService is still starting up. Unable to request containers from yarn until YarnService is finished starting up.");

Review Comment:
   Nit: This should be a warn log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org