You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by zj...@apache.org on 2014/08/09 20:44:52 UTC
svn commit: r1617002 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/
hadoop-yarn/had...
Author: zjshen
Date: Sat Aug 9 18:44:51 2014
New Revision: 1617002
URL: http://svn.apache.org/r1617002
Log:
YARN-1954. Added waitFor to AMRMClient(Async). Contributed by Tsuyoshi Ozawa.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1617002&r1=1617001&r2=1617002&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Sat Aug 9 18:44:51 2014
@@ -103,6 +103,8 @@ Release 2.6.0 - UNRELEASED
YARN-2026. Fair scheduler: Consider only active queues for computing fairshare.
(Ashwin Shankar via kasha)
+ YARN-1954. Added waitFor to AMRMClient(Async). (Tsuyoshi Ozawa via zjshen)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1617002&r1=1617001&r2=1617002&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java Sat Aug 9 18:44:51 2014
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -37,12 +39,14 @@ import org.apache.hadoop.yarn.client.api
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
AbstractService {
+ private static final Log LOG = LogFactory.getLog(AMRMClient.class);
/**
* Create a new instance of AMRMClient.
@@ -336,4 +340,63 @@ public abstract class AMRMClient<T exten
return nmTokenCache;
}
+ /**
+ * Wait for <code>check</code> to return true for each 1000 ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int)}
+ * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check
+ */
+ public void waitFor(Supplier<Boolean> check) throws InterruptedException {
+ waitFor(check, 1000);
+ }
+
+ /**
+ * Wait for <code>check</code> to return true for each
+ * <code>checkEveryMillis</code> ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call <code>check</code>
+ */
+ public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
+ throws InterruptedException {
+ waitFor(check, checkEveryMillis, 1);
+ }
+
+ /**
+ * Wait for <code>check</code> to return true for each
+ * <code>checkEveryMillis</code> ms. In the main loop, this method will log
+ * the message "waiting in main loop" for each <code>logInterval</code> times
+ * iteration to confirm the thread is alive.
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call <code>check</code>
+ * @param logInterval interval to log for each
+ */
+ public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
+ int logInterval) throws InterruptedException {
+ Preconditions.checkNotNull(check, "check should not be null");
+ Preconditions.checkArgument(checkEveryMillis >= 0,
+ "checkEveryMillis should be positive value");
+ Preconditions.checkArgument(logInterval >= 0,
+ "logInterval should be positive value");
+
+ int loggingCounter = logInterval;
+ do {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Check the condition for main loop.");
+ }
+
+ boolean result = check.get();
+ if (result) {
+ LOG.info("Exits the main loop.");
+ return;
+ }
+ if (--loggingCounter <= 0) {
+ LOG.info("Waiting in main loop.");
+ loggingCounter = logInterval;
+ }
+
+ Thread.sleep(checkEveryMillis);
+ } while (true);
+ }
+
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java?rev=1617002&r1=1617001&r2=1617002&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java Sat Aug 9 18:44:51 2014
@@ -18,11 +18,15 @@
package org.apache.hadoop.yarn.client.api.async;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@@ -90,6 +94,7 @@ import com.google.common.annotations.Vis
@Stable
public abstract class AMRMClientAsync<T extends ContainerRequest>
extends AbstractService {
+ private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
protected final AMRMClient<T> client;
protected final CallbackHandler handler;
@@ -189,6 +194,65 @@ extends AbstractService {
*/
public abstract int getClusterNodeCount();
+ /**
+ * Wait for <code>check</code> to return true for each 1000 ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int)}
+ * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check
+ */
+ public void waitFor(Supplier<Boolean> check) throws InterruptedException {
+ waitFor(check, 1000);
+ }
+
+ /**
+ * Wait for <code>check</code> to return true for each
+ * <code>checkEveryMillis</code> ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call <code>check</code>
+ */
+ public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
+ throws InterruptedException {
+ waitFor(check, checkEveryMillis, 1);
+ };
+
+ /**
+ * Wait for <code>check</code> to return true for each
+ * <code>checkEveryMillis</code> ms. In the main loop, this method will log
+ * the message "waiting in main loop" for each <code>logInterval</code> times
+ * iteration to confirm the thread is alive.
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call <code>check</code>
+ * @param logInterval interval to log for each
+ */
+ public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
+ int logInterval) throws InterruptedException {
+ Preconditions.checkNotNull(check, "check should not be null");
+ Preconditions.checkArgument(checkEveryMillis >= 0,
+ "checkEveryMillis should be positive value");
+ Preconditions.checkArgument(logInterval >= 0,
+ "logInterval should be positive value");
+
+ int loggingCounter = logInterval;
+ do {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Check the condition for main loop.");
+ }
+
+ boolean result = check.get();
+ if (result) {
+ LOG.info("Exits the main loop.");
+ return;
+ }
+ if (--loggingCounter <= 0) {
+ LOG.info("Waiting in main loop.");
+ loggingCounter = logInterval;
+ }
+
+ Thread.sleep(checkEveryMillis);
+ } while (true);
+ }
+
public interface CallbackHandler {
/**
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1617002&r1=1617001&r2=1617002&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java Sat Aug 9 18:44:51 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.client.api.async.impl;
+import com.google.common.base.Supplier;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
@@ -180,7 +181,7 @@ public class TestAMRMClientAsync {
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
when(client.allocate(anyFloat())).thenThrow(ex);
- AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync<ContainerRequest> asyncClient =
AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
@@ -228,6 +229,41 @@ public class TestAMRMClientAsync {
asyncClient.stop();
}
+ @Test (timeout = 10000)
+ public void testAMRMClientAsyncShutDownWithWaitFor() throws Exception {
+ Configuration conf = new Configuration();
+ final TestCallbackHandler callbackHandler = new TestCallbackHandler();
+ @SuppressWarnings("unchecked")
+ AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+ final AllocateResponse shutDownResponse = createAllocateResponse(
+ new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
+ shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN);
+ when(client.allocate(anyFloat())).thenReturn(shutDownResponse);
+
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
+ asyncClient.init(conf);
+ asyncClient.start();
+
+ Supplier<Boolean> checker = new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return callbackHandler.reboot;
+ }
+ };
+
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+ asyncClient.waitFor(checker);
+
+ asyncClient.stop();
+ // stopping should have joined all threads and completed all callbacks
+ Assert.assertTrue(callbackHandler.callbackCount == 0);
+
+ verify(client, times(1)).allocate(anyFloat());
+ asyncClient.stop();
+ }
+
@Test (timeout = 5000)
public void testCallAMRMClientAsyncStopFromCallbackHandler()
throws YarnException, IOException, InterruptedException {
@@ -262,6 +298,40 @@ public class TestAMRMClientAsync {
}
}
+ @Test (timeout = 5000)
+ public void testCallAMRMClientAsyncStopFromCallbackHandlerWithWaitFor()
+ throws YarnException, IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ final TestCallbackHandler2 callbackHandler = new TestCallbackHandler2();
+ @SuppressWarnings("unchecked")
+ AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+ List<ContainerStatus> completed = Arrays.asList(
+ ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
+ ContainerState.COMPLETE, "", 0));
+ final AllocateResponse response = createAllocateResponse(completed,
+ new ArrayList<Container>(), null);
+
+ when(client.allocate(anyFloat())).thenReturn(response);
+
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+ callbackHandler.asynClient = asyncClient;
+ asyncClient.init(conf);
+ asyncClient.start();
+
+ Supplier<Boolean> checker = new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return callbackHandler.notify;
+ }
+ };
+
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+ asyncClient.waitFor(checker);
+ Assert.assertTrue(checker.get());
+ }
+
void runCallBackThrowOutException(TestCallbackHandler2 callbackHandler) throws
InterruptedException, YarnException, IOException {
Configuration conf = new Configuration();
@@ -342,7 +412,7 @@ public class TestAMRMClientAsync {
private volatile List<ContainerStatus> completedContainers;
private volatile List<Container> allocatedContainers;
Exception savedException = null;
- boolean reboot = false;
+ volatile boolean reboot = false;
Object notifier = new Object();
int callbackCount = 0;
@@ -432,7 +502,7 @@ public class TestAMRMClientAsync {
@SuppressWarnings("rawtypes")
AMRMClientAsync asynClient;
boolean stop = true;
- boolean notify = false;
+ volatile boolean notify = false;
boolean throwOutException = false;
@Override
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java?rev=1617002&r1=1617001&r2=1617002&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java Sat Aug 9 18:44:51 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.client.api.impl;
+import com.google.common.base.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -814,6 +815,40 @@ public class TestAMRMClient {
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
}
+
+ class CountDownSupplier implements Supplier<Boolean> {
+ int counter = 0;
+ @Override
+ public Boolean get() {
+ counter++;
+ if (counter >= 3) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
+
+ @Test
+ public void testWaitFor() throws InterruptedException {
+ AMRMClientImpl<ContainerRequest> amClient = null;
+ CountDownSupplier countDownChecker = new CountDownSupplier();
+
+ try {
+ // start am rm client
+ amClient =
+ (AMRMClientImpl<ContainerRequest>) AMRMClient
+ .<ContainerRequest> createAMRMClient();
+ amClient.init(new YarnConfiguration());
+ amClient.start();
+ amClient.waitFor(countDownChecker, 1000);
+ assertEquals(3, countDownChecker.counter);
+ } finally {
+ if (amClient != null) {
+ amClient.stop();
+ }
+ }
+ }
private void sleep(int sleepTime) {
try {