You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ji...@apache.org on 2014/08/12 00:27:53 UTC
svn commit: r1617377 [1/2] - in
/hadoop/common/branches/HDFS-6584/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/asy...
Author: jing9
Date: Mon Aug 11 22:27:50 2014
New Revision: 1617377
URL: http://svn.apache.org/r1617377
Log:
Merging r1616894 through r1617376 from trunk.
Added:
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
- copied unchanged from r1617376, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java
- copied unchanged from r1617376, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java
Removed:
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNewSavedEvent.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppUpdateSavedEvent.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptNewSavedEvent.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUpdateSavedEvent.java
Modified:
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt Mon Aug 11 22:27:50 2014
@@ -100,6 +100,19 @@ Release 2.6.0 - UNRELEASED
YARN-2212. ApplicationMaster needs to find a way to update the AMRMToken
periodically. (xgong)
+ YARN-2026. Fair scheduler: Consider only active queues for computing fairshare.
+ (Ashwin Shankar via kasha)
+
+ YARN-1954. Added waitFor to AMRMClient(Async). (Tsuyoshi Ozawa via zjshen)
+
+ YARN-2302. Refactor TimelineWebServices. (Zhijie Shen via junping_du)
+
+ YARN-2337. ResourceManager sets ClientRMService in RMContext multiple times.
+ (Zhihai Xu via kasha)
+
+ YARN-2138. Cleaned up notifyDone* APIs in RMStateStore. (Varun Saxena via
+ jianhe)
+
OPTIMIZATIONS
BUG FIXES
@@ -154,6 +167,11 @@ Release 2.6.0 - UNRELEASED
YARN-2008. Fixed CapacityScheduler to calculate headroom based on max available
capacity instead of configured max capacity. (Craig Welch via jianhe)
+ YARN-2400. Fixed TestAMRestart fails intermittently. (Jian He via xgong)
+
+ YARN-2361. RMAppAttempt state machine entries for KILLED state has duplicate
+ event entries. (Zhihai Xu via kasha)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java Mon Aug 11 22:27:50 2014
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -37,12 +39,14 @@ import org.apache.hadoop.yarn.client.api
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
AbstractService {
+ private static final Log LOG = LogFactory.getLog(AMRMClient.class);
/**
* Create a new instance of AMRMClient.
@@ -336,4 +340,63 @@ public abstract class AMRMClient<T exten
return nmTokenCache;
}
+ /**
+ * Wait for <code>check</code> to return true for each 1000 ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int)}
+ * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check
+ */
+ public void waitFor(Supplier<Boolean> check) throws InterruptedException {
+ waitFor(check, 1000);
+ }
+
+ /**
+ * Wait for <code>check</code> to return true for each
+ * <code>checkEveryMillis</code> ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call <code>check</code>
+ */
+ public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
+ throws InterruptedException {
+ waitFor(check, checkEveryMillis, 1);
+ }
+
+ /**
+ * Wait for <code>check</code> to return true for each
+ * <code>checkEveryMillis</code> ms. In the main loop, this method will log
+ * the message "waiting in main loop" for each <code>logInterval</code> times
+ * iteration to confirm the thread is alive.
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call <code>check</code>
+ * @param logInterval interval to log for each
+ */
+ public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
+ int logInterval) throws InterruptedException {
+ Preconditions.checkNotNull(check, "check should not be null");
+ Preconditions.checkArgument(checkEveryMillis >= 0,
+ "checkEveryMillis should be positive value");
+ Preconditions.checkArgument(logInterval >= 0,
+ "logInterval should be positive value");
+
+ int loggingCounter = logInterval;
+ do {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Check the condition for main loop.");
+ }
+
+ boolean result = check.get();
+ if (result) {
+ LOG.info("Exits the main loop.");
+ return;
+ }
+ if (--loggingCounter <= 0) {
+ LOG.info("Waiting in main loop.");
+ loggingCounter = logInterval;
+ }
+
+ Thread.sleep(checkEveryMillis);
+ } while (true);
+ }
+
}
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java Mon Aug 11 22:27:50 2014
@@ -18,11 +18,15 @@
package org.apache.hadoop.yarn.client.api.async;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@@ -90,6 +94,7 @@ import com.google.common.annotations.Vis
@Stable
public abstract class AMRMClientAsync<T extends ContainerRequest>
extends AbstractService {
+ private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
protected final AMRMClient<T> client;
protected final CallbackHandler handler;
@@ -189,6 +194,65 @@ extends AbstractService {
*/
public abstract int getClusterNodeCount();
+ /**
+ * Wait for <code>check</code> to return true for each 1000 ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int)}
+ * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check
+ */
+ public void waitFor(Supplier<Boolean> check) throws InterruptedException {
+ waitFor(check, 1000);
+ }
+
+ /**
+ * Wait for <code>check</code> to return true for each
+ * <code>checkEveryMillis</code> ms.
+ * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call <code>check</code>
+ */
+ public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
+ throws InterruptedException {
+ waitFor(check, checkEveryMillis, 1);
+ };
+
+ /**
+ * Wait for <code>check</code> to return true for each
+ * <code>checkEveryMillis</code> ms. In the main loop, this method will log
+ * the message "waiting in main loop" for each <code>logInterval</code> times
+ * iteration to confirm the thread is alive.
+ * @param check user defined checker
+ * @param checkEveryMillis interval to call <code>check</code>
+ * @param logInterval interval to log for each
+ */
+ public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
+ int logInterval) throws InterruptedException {
+ Preconditions.checkNotNull(check, "check should not be null");
+ Preconditions.checkArgument(checkEveryMillis >= 0,
+ "checkEveryMillis should be positive value");
+ Preconditions.checkArgument(logInterval >= 0,
+ "logInterval should be positive value");
+
+ int loggingCounter = logInterval;
+ do {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Check the condition for main loop.");
+ }
+
+ boolean result = check.get();
+ if (result) {
+ LOG.info("Exits the main loop.");
+ return;
+ }
+ if (--loggingCounter <= 0) {
+ LOG.info("Waiting in main loop.");
+ loggingCounter = logInterval;
+ }
+
+ Thread.sleep(checkEveryMillis);
+ } while (true);
+ }
+
public interface CallbackHandler {
/**
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java Mon Aug 11 22:27:50 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.client.api.async.impl;
+import com.google.common.base.Supplier;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
@@ -180,7 +181,7 @@ public class TestAMRMClientAsync {
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
when(client.allocate(anyFloat())).thenThrow(ex);
- AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync<ContainerRequest> asyncClient =
AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
@@ -228,6 +229,41 @@ public class TestAMRMClientAsync {
asyncClient.stop();
}
+ @Test (timeout = 10000)
+ public void testAMRMClientAsyncShutDownWithWaitFor() throws Exception {
+ Configuration conf = new Configuration();
+ final TestCallbackHandler callbackHandler = new TestCallbackHandler();
+ @SuppressWarnings("unchecked")
+ AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+ final AllocateResponse shutDownResponse = createAllocateResponse(
+ new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
+ shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN);
+ when(client.allocate(anyFloat())).thenReturn(shutDownResponse);
+
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
+ asyncClient.init(conf);
+ asyncClient.start();
+
+ Supplier<Boolean> checker = new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return callbackHandler.reboot;
+ }
+ };
+
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+ asyncClient.waitFor(checker);
+
+ asyncClient.stop();
+ // stopping should have joined all threads and completed all callbacks
+ Assert.assertTrue(callbackHandler.callbackCount == 0);
+
+ verify(client, times(1)).allocate(anyFloat());
+ asyncClient.stop();
+ }
+
@Test (timeout = 5000)
public void testCallAMRMClientAsyncStopFromCallbackHandler()
throws YarnException, IOException, InterruptedException {
@@ -262,6 +298,40 @@ public class TestAMRMClientAsync {
}
}
+ @Test (timeout = 5000)
+ public void testCallAMRMClientAsyncStopFromCallbackHandlerWithWaitFor()
+ throws YarnException, IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ final TestCallbackHandler2 callbackHandler = new TestCallbackHandler2();
+ @SuppressWarnings("unchecked")
+ AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+ List<ContainerStatus> completed = Arrays.asList(
+ ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
+ ContainerState.COMPLETE, "", 0));
+ final AllocateResponse response = createAllocateResponse(completed,
+ new ArrayList<Container>(), null);
+
+ when(client.allocate(anyFloat())).thenReturn(response);
+
+ AMRMClientAsync<ContainerRequest> asyncClient =
+ AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+ callbackHandler.asynClient = asyncClient;
+ asyncClient.init(conf);
+ asyncClient.start();
+
+ Supplier<Boolean> checker = new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return callbackHandler.notify;
+ }
+ };
+
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+ asyncClient.waitFor(checker);
+ Assert.assertTrue(checker.get());
+ }
+
void runCallBackThrowOutException(TestCallbackHandler2 callbackHandler) throws
InterruptedException, YarnException, IOException {
Configuration conf = new Configuration();
@@ -342,7 +412,7 @@ public class TestAMRMClientAsync {
private volatile List<ContainerStatus> completedContainers;
private volatile List<Container> allocatedContainers;
Exception savedException = null;
- boolean reboot = false;
+ volatile boolean reboot = false;
Object notifier = new Object();
int callbackCount = 0;
@@ -432,7 +502,7 @@ public class TestAMRMClientAsync {
@SuppressWarnings("rawtypes")
AMRMClientAsync asynClient;
boolean stop = true;
- boolean notify = false;
+ volatile boolean notify = false;
boolean throwOutException = false;
@Override
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java Mon Aug 11 22:27:50 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.client.api.impl;
+import com.google.common.base.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -814,6 +815,40 @@ public class TestAMRMClient {
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
}
+
+ class CountDownSupplier implements Supplier<Boolean> {
+ int counter = 0;
+ @Override
+ public Boolean get() {
+ counter++;
+ if (counter >= 3) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
+
+ @Test
+ public void testWaitFor() throws InterruptedException {
+ AMRMClientImpl<ContainerRequest> amClient = null;
+ CountDownSupplier countDownChecker = new CountDownSupplier();
+
+ try {
+ // start am rm client
+ amClient =
+ (AMRMClientImpl<ContainerRequest>) AMRMClient
+ .<ContainerRequest> createAMRMClient();
+ amClient.init(new YarnConfiguration());
+ amClient.start();
+ amClient.waitFor(countDownChecker, 1000);
+ assertEquals(3, countDownChecker.counter);
+ } finally {
+ if (amClient != null) {
+ amClient.stop();
+ }
+ }
+ }
private void sleep(int sleepTime) {
try {
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java Mon Aug 11 22:27:50 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
@@ -59,12 +60,12 @@ public class ApplicationHistoryServer ex
private static final Log LOG = LogFactory
.getLog(ApplicationHistoryServer.class);
- protected ApplicationHistoryClientService ahsClientService;
- protected ApplicationHistoryManager historyManager;
- protected TimelineStore timelineStore;
- protected TimelineDelegationTokenSecretManagerService secretManagerService;
- protected TimelineACLsManager timelineACLsManager;
- protected WebApp webApp;
+ private ApplicationHistoryClientService ahsClientService;
+ private ApplicationHistoryManager historyManager;
+ private TimelineStore timelineStore;
+ private TimelineDelegationTokenSecretManagerService secretManagerService;
+ private TimelineDataManager timelineDataManager;
+ private WebApp webApp;
public ApplicationHistoryServer() {
super(ApplicationHistoryServer.class.getName());
@@ -72,15 +73,18 @@ public class ApplicationHistoryServer ex
@Override
protected void serviceInit(Configuration conf) throws Exception {
- historyManager = createApplicationHistory();
- ahsClientService = createApplicationHistoryClientService(historyManager);
- addService(ahsClientService);
- addService((Service) historyManager);
+ // init timeline services first
timelineStore = createTimelineStore(conf);
addIfService(timelineStore);
secretManagerService = createTimelineDelegationTokenSecretManagerService(conf);
addService(secretManagerService);
- timelineACLsManager = createTimelineACLsManager(conf);
+ timelineDataManager = createTimelineDataManager(conf);
+
+ // init generic history service afterwards
+ historyManager = createApplicationHistoryManager(conf);
+ ahsClientService = createApplicationHistoryClientService(historyManager);
+ addService(ahsClientService);
+ addService((Service) historyManager);
DefaultMetricsSystem.initialize("ApplicationHistoryServer");
JvmMetrics.initSingleton("ApplicationHistoryServer", null);
@@ -111,21 +115,22 @@ public class ApplicationHistoryServer ex
@Private
@VisibleForTesting
- public ApplicationHistoryClientService getClientService() {
+ ApplicationHistoryClientService getClientService() {
return this.ahsClientService;
}
- protected ApplicationHistoryClientService
- createApplicationHistoryClientService(
- ApplicationHistoryManager historyManager) {
- return new ApplicationHistoryClientService(historyManager);
- }
-
- protected ApplicationHistoryManager createApplicationHistory() {
- return new ApplicationHistoryManagerImpl();
+ /**
+ * @return ApplicationTimelineStore
+ */
+ @Private
+ @VisibleForTesting
+ public TimelineStore getTimelineStore() {
+ return timelineStore;
}
- protected ApplicationHistoryManager getApplicationHistory() {
+ @Private
+ @VisibleForTesting
+ ApplicationHistoryManager getApplicationHistoryManager() {
return this.historyManager;
}
@@ -154,28 +159,35 @@ public class ApplicationHistoryServer ex
launchAppHistoryServer(args);
}
- protected ApplicationHistoryManager createApplicationHistoryManager(
+ private ApplicationHistoryClientService
+ createApplicationHistoryClientService(
+ ApplicationHistoryManager historyManager) {
+ return new ApplicationHistoryClientService(historyManager);
+ }
+
+ private ApplicationHistoryManager createApplicationHistoryManager(
Configuration conf) {
return new ApplicationHistoryManagerImpl();
}
- protected TimelineStore createTimelineStore(
+ private TimelineStore createTimelineStore(
Configuration conf) {
return ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class,
TimelineStore.class), conf);
}
- protected TimelineDelegationTokenSecretManagerService
+ private TimelineDelegationTokenSecretManagerService
createTimelineDelegationTokenSecretManagerService(Configuration conf) {
return new TimelineDelegationTokenSecretManagerService();
}
- protected TimelineACLsManager createTimelineACLsManager(Configuration conf) {
- return new TimelineACLsManager(conf);
+ private TimelineDataManager createTimelineDataManager(Configuration conf) {
+ return new TimelineDataManager(
+ timelineStore, new TimelineACLsManager(conf));
}
- protected void startWebApp() {
+ private void startWebApp() {
Configuration conf = getConfig();
// Always load pseudo authentication filter to parse "user.name" in an URL
// to identify a HTTP request's user in insecure mode.
@@ -199,9 +211,8 @@ public class ApplicationHistoryServer ex
try {
AHSWebApp ahsWebApp = AHSWebApp.getInstance();
ahsWebApp.setApplicationHistoryManager(historyManager);
- ahsWebApp.setTimelineStore(timelineStore);
ahsWebApp.setTimelineDelegationTokenSecretManagerService(secretManagerService);
- ahsWebApp.setTimelineACLsManager(timelineACLsManager);
+ ahsWebApp.setTimelineDataManager(timelineDataManager);
webApp =
WebApps
.$for("applicationhistory", ApplicationHistoryClientService.class,
@@ -213,14 +224,6 @@ public class ApplicationHistoryServer ex
throw new YarnRuntimeException(msg, e);
}
}
- /**
- * @return ApplicationTimelineStore
- */
- @Private
- @VisibleForTesting
- public TimelineStore getTimelineStore() {
- return timelineStore;
- }
private void doSecureLogin(Configuration conf) throws IOException {
InetSocketAddress socAddr = getBindAddress(conf);
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java Mon Aug 11 22:27:50 2014
@@ -22,8 +22,7 @@ import static org.apache.hadoop.yarn.uti
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.server.api.ApplicationContext;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager;
-import org.apache.hadoop.yarn.server.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService;
import org.apache.hadoop.yarn.server.timeline.webapp.TimelineWebServices;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
@@ -36,9 +35,8 @@ import com.google.common.annotations.Vis
public class AHSWebApp extends WebApp implements YarnWebParams {
private ApplicationHistoryManager applicationHistoryManager;
- private TimelineStore timelineStore;
private TimelineDelegationTokenSecretManagerService secretManagerService;
- private TimelineACLsManager timelineACLsManager;
+ private TimelineDataManager timelineDataManager;
private static AHSWebApp instance = null;
@@ -68,14 +66,6 @@ public class AHSWebApp extends WebApp im
this.applicationHistoryManager = applicationHistoryManager;
}
- public TimelineStore getTimelineStore() {
- return timelineStore;
- }
-
- public void setTimelineStore(TimelineStore timelineStore) {
- this.timelineStore = timelineStore;
- }
-
public TimelineDelegationTokenSecretManagerService
getTimelineDelegationTokenSecretManagerService() {
return secretManagerService;
@@ -86,12 +76,12 @@ public class AHSWebApp extends WebApp im
this.secretManagerService = secretManagerService;
}
- public TimelineACLsManager getTimelineACLsManager() {
- return timelineACLsManager;
+ public TimelineDataManager getTimelineDataManager() {
+ return timelineDataManager;
}
- public void setTimelineACLsManager(TimelineACLsManager timelineACLsManager) {
- this.timelineACLsManager = timelineACLsManager;
+ public void setTimelineDataManager(TimelineDataManager timelineDataManager) {
+ this.timelineDataManager = timelineDataManager;
}
@Override
@@ -101,10 +91,9 @@ public class AHSWebApp extends WebApp im
bind(TimelineWebServices.class);
bind(GenericExceptionHandler.class);
bind(ApplicationContext.class).toInstance(applicationHistoryManager);
- bind(TimelineStore.class).toInstance(timelineStore);
bind(TimelineDelegationTokenSecretManagerService.class).toInstance(
secretManagerService);
- bind(TimelineACLsManager.class).toInstance(timelineACLsManager);
+ bind(TimelineDataManager.class).toInstance(timelineDataManager);
route("/", AHSController.class);
route(pajoin("/apps", APP_STATE), AHSController.class);
route(pajoin("/app", APPLICATION_ID), AHSController.class, "app");
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java Mon Aug 11 22:27:50 2014
@@ -18,14 +18,10 @@
package org.apache.hadoop.yarn.server.timeline.webapp;
-import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
-
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
@@ -58,14 +54,11 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -80,14 +73,11 @@ public class TimelineWebServices {
private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
- private TimelineStore store;
- private TimelineACLsManager timelineACLsManager;
+ private TimelineDataManager timelineDataManager;
@Inject
- public TimelineWebServices(TimelineStore store,
- TimelineACLsManager timelineACLsManager) {
- this.store = store;
- this.timelineACLsManager = timelineACLsManager;
+ public TimelineWebServices(TimelineDataManager timelineDataManager) {
+ this.timelineDataManager = timelineDataManager;
}
@XmlRootElement(name = "about")
@@ -148,61 +138,28 @@ public class TimelineWebServices {
@QueryParam("limit") String limit,
@QueryParam("fields") String fields) {
init(res);
- TimelineEntities entities = null;
try {
- EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
- boolean modified = extendFields(fieldEnums);
- UserGroupInformation callerUGI = getUser(req);
- entities = store.getEntities(
+ return timelineDataManager.getEntities(
parseStr(entityType),
- parseLongStr(limit),
+ parsePairStr(primaryFilter, ":"),
+ parsePairsStr(secondaryFilter, ",", ":"),
parseLongStr(windowStart),
parseLongStr(windowEnd),
parseStr(fromId),
parseLongStr(fromTs),
- parsePairStr(primaryFilter, ":"),
- parsePairsStr(secondaryFilter, ",", ":"),
- fieldEnums);
- if (entities != null) {
- Iterator<TimelineEntity> entitiesItr =
- entities.getEntities().iterator();
- while (entitiesItr.hasNext()) {
- TimelineEntity entity = entitiesItr.next();
- try {
- // check ACLs
- if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
- entitiesItr.remove();
- } else {
- // clean up system data
- if (modified) {
- entity.setPrimaryFilters(null);
- } else {
- cleanupOwnerInfo(entity);
- }
- }
- } catch (YarnException e) {
- LOG.error("Error when verifying access for user " + callerUGI
- + " on the events of the timeline entity "
- + new EntityIdentifier(entity.getEntityId(),
- entity.getEntityType()), e);
- entitiesItr.remove();
- }
- }
- }
+ parseLongStr(limit),
+ parseFieldsStr(fields, ","),
+ getUser(req));
} catch (NumberFormatException e) {
throw new BadRequestException(
"windowStart, windowEnd or limit is not a numeric value.");
} catch (IllegalArgumentException e) {
throw new BadRequestException("requested invalid field.");
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error("Error getting entities", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
}
- if (entities == null) {
- return new TimelineEntities();
- }
- return entities;
}
/**
@@ -220,33 +177,15 @@ public class TimelineWebServices {
init(res);
TimelineEntity entity = null;
try {
- EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
- boolean modified = extendFields(fieldEnums);
- entity =
- store.getEntity(parseStr(entityId), parseStr(entityType),
- fieldEnums);
- if (entity != null) {
- // check ACLs
- UserGroupInformation callerUGI = getUser(req);
- if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
- entity = null;
- } else {
- // clean up the system data
- if (modified) {
- entity.setPrimaryFilters(null);
- } else {
- cleanupOwnerInfo(entity);
- }
- }
- }
+ entity = timelineDataManager.getEntity(
+ parseStr(entityType),
+ parseStr(entityId),
+ parseFieldsStr(fields, ","),
+ getUser(req));
} catch (IllegalArgumentException e) {
throw new BadRequestException(
"requested invalid field.");
- } catch (IOException e) {
- LOG.error("Error getting entity", e);
- throw new WebApplicationException(e,
- Response.Status.INTERNAL_SERVER_ERROR);
- } catch (YarnException e) {
+ } catch (Exception e) {
LOG.error("Error getting entity", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
@@ -275,51 +214,23 @@ public class TimelineWebServices {
@QueryParam("windowEnd") String windowEnd,
@QueryParam("limit") String limit) {
init(res);
- TimelineEvents events = null;
try {
- UserGroupInformation callerUGI = getUser(req);
- events = store.getEntityTimelines(
+ return timelineDataManager.getEvents(
parseStr(entityType),
parseArrayStr(entityId, ","),
- parseLongStr(limit),
+ parseArrayStr(eventType, ","),
parseLongStr(windowStart),
parseLongStr(windowEnd),
- parseArrayStr(eventType, ","));
- if (events != null) {
- Iterator<TimelineEvents.EventsOfOneEntity> eventsItr =
- events.getAllEvents().iterator();
- while (eventsItr.hasNext()) {
- TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next();
- try {
- TimelineEntity entity = store.getEntity(
- eventsOfOneEntity.getEntityId(),
- eventsOfOneEntity.getEntityType(),
- EnumSet.of(Field.PRIMARY_FILTERS));
- // check ACLs
- if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
- eventsItr.remove();
- }
- } catch (Exception e) {
- LOG.error("Error when verifying access for user " + callerUGI
- + " on the events of the timeline entity "
- + new EntityIdentifier(eventsOfOneEntity.getEntityId(),
- eventsOfOneEntity.getEntityType()), e);
- eventsItr.remove();
- }
- }
- }
+ parseLongStr(limit),
+ getUser(req));
} catch (NumberFormatException e) {
throw new BadRequestException(
"windowStart, windowEnd or limit is not a numeric value.");
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error("Error getting entity timelines", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
}
- if (events == null) {
- return new TimelineEvents();
- }
- return events;
}
/**
@@ -333,9 +244,6 @@ public class TimelineWebServices {
@Context HttpServletResponse res,
TimelineEntities entities) {
init(res);
- if (entities == null) {
- return new TimelinePutResponse();
- }
UserGroupInformation callerUGI = getUser(req);
if (callerUGI == null) {
String msg = "The owner of the posted timeline entities is not set";
@@ -343,76 +251,8 @@ public class TimelineWebServices {
throw new ForbiddenException(msg);
}
try {
- List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
- TimelineEntities entitiesToPut = new TimelineEntities();
- List<TimelinePutResponse.TimelinePutError> errors =
- new ArrayList<TimelinePutResponse.TimelinePutError>();
- for (TimelineEntity entity : entities.getEntities()) {
- EntityIdentifier entityID =
- new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
-
- // check if there is existing entity
- TimelineEntity existingEntity = null;
- try {
- existingEntity =
- store.getEntity(entityID.getId(), entityID.getType(),
- EnumSet.of(Field.PRIMARY_FILTERS));
- if (existingEntity != null
- && !timelineACLsManager.checkAccess(callerUGI, existingEntity)) {
- throw new YarnException("The timeline entity " + entityID
- + " was not put by " + callerUGI + " before");
- }
- } catch (Exception e) {
- // Skip the entity which already exists and was put by others
- LOG.warn("Skip the timeline entity: " + entityID + ", because "
- + e.getMessage());
- TimelinePutResponse.TimelinePutError error =
- new TimelinePutResponse.TimelinePutError();
- error.setEntityId(entityID.getId());
- error.setEntityType(entityID.getType());
- error.setErrorCode(
- TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
- errors.add(error);
- continue;
- }
-
- // inject owner information for the access check if this is the first
- // time to post the entity, in case it's the admin who is updating
- // the timeline data.
- try {
- if (existingEntity == null) {
- injectOwnerInfo(entity, callerUGI.getShortUserName());
- }
- } catch (YarnException e) {
- // Skip the entity which messes up the primary filter and record the
- // error
- LOG.warn("Skip the timeline entity: " + entityID + ", because "
- + e.getMessage());
- TimelinePutResponse.TimelinePutError error =
- new TimelinePutResponse.TimelinePutError();
- error.setEntityId(entityID.getId());
- error.setEntityType(entityID.getType());
- error.setErrorCode(
- TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT);
- errors.add(error);
- continue;
- }
-
- entityIDs.add(entityID);
- entitiesToPut.addEntity(entity);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
- + TimelineUtils.dumpTimelineRecordtoJSON(entity));
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
- }
- TimelinePutResponse response = store.put(entitiesToPut);
- // add the errors of timeline system filter key conflict
- response.addErrors(errors);
- return response;
- } catch (IOException e) {
+ return timelineDataManager.postEntities(entities, callerUGI);
+ } catch (Exception e) {
LOG.error("Error putting entities", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
@@ -423,6 +263,15 @@ public class TimelineWebServices {
response.setContentType(null);
}
+ private static UserGroupInformation getUser(HttpServletRequest req) {
+ String remoteUser = req.getRemoteUser();
+ UserGroupInformation callerUGI = null;
+ if (remoteUser != null) {
+ callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ return callerUGI;
+ }
+
private static SortedSet<String> parseArrayStr(String str, String delimiter) {
if (str == null) {
return null;
@@ -495,14 +344,6 @@ public class TimelineWebServices {
}
}
- private static boolean extendFields(EnumSet<Field> fieldEnums) {
- boolean modified = false;
- if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) {
- fieldEnums.add(Field.PRIMARY_FILTERS);
- modified = true;
- }
- return modified;
- }
private static Long parseLongStr(String str) {
return str == null ? null : Long.parseLong(str.trim());
}
@@ -511,34 +352,4 @@ public class TimelineWebServices {
return str == null ? null : str.trim();
}
- private static UserGroupInformation getUser(HttpServletRequest req) {
- String remoteUser = req.getRemoteUser();
- UserGroupInformation callerUGI = null;
- if (remoteUser != null) {
- callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
- }
- return callerUGI;
- }
-
- private static void injectOwnerInfo(TimelineEntity timelineEntity,
- String owner) throws YarnException {
- if (timelineEntity.getPrimaryFilters() != null &&
- timelineEntity.getPrimaryFilters().containsKey(
- TimelineStore.SystemFilter.ENTITY_OWNER.toString())) {
- throw new YarnException(
- "User should not use the timeline system filter key: "
- + TimelineStore.SystemFilter.ENTITY_OWNER);
- }
- timelineEntity.addPrimaryFilter(
- TimelineStore.SystemFilter.ENTITY_OWNER
- .toString(), owner);
- }
-
- private static void cleanupOwnerInfo(TimelineEntity timelineEntity) {
- if (timelineEntity.getPrimaryFilters() != null) {
- timelineEntity.getPrimaryFilters().remove(
- TimelineStore.SystemFilter.ENTITY_OWNER.toString());
- }
- }
-
}
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java Mon Aug 11 22:27:50 2014
@@ -69,7 +69,7 @@ public class TestApplicationHistoryClien
historyServer.init(config);
historyServer.start();
store =
- ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistory())
+ ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistoryManager())
.getHistoryStore();
}
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java Mon Aug 11 22:27:50 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AdminACLsManager;
import org.apache.hadoop.yarn.server.timeline.TestMemoryTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter;
@@ -89,14 +90,15 @@ public class TestTimelineWebServices ext
} catch (Exception e) {
Assert.fail();
}
- bind(TimelineStore.class).toInstance(store);
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
timelineACLsManager = new TimelineACLsManager(conf);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
adminACLsManager = new AdminACLsManager(conf);
- bind(TimelineACLsManager.class).toInstance(timelineACLsManager);
+ TimelineDataManager timelineDataManager =
+ new TimelineDataManager(store, timelineACLsManager);
+ bind(TimelineDataManager.class).toInstance(timelineDataManager);
serve("/*").with(GuiceContainer.class);
TimelineAuthenticationFilter taFilter = new TimelineAuthenticationFilter();
FilterConfig filterConfig = mock(FilterConfig.class);
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Mon Aug 11 22:27:50 2014
@@ -461,7 +461,6 @@ public class ResourceManager extends Com
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
clientRM = createClientRMService();
- rmContext.setClientRMService(clientRM);
addService(clientRM);
rmContext.setClientRMService(clientRM);
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Mon Aug 11 22:27:50 2014
@@ -52,13 +52,13 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
@@ -132,7 +132,8 @@ public abstract class RMStateStore exten
LOG.info("Storing info for app: " + appId);
try {
store.storeApplicationStateInternal(appId, appStateData);
- store.notifyDoneStoringApplication(appId, null);
+ store.notifyApplication(new RMAppEvent(appId,
+ RMAppEventType.APP_NEW_SAVED));
} catch (Exception e) {
LOG.error("Error storing app: " + appId, e);
store.notifyStoreOperationFailed(e);
@@ -156,7 +157,8 @@ public abstract class RMStateStore exten
LOG.info("Updating info for app: " + appId);
try {
store.updateApplicationStateInternal(appId, appStateData);
- store.notifyDoneUpdatingApplication(appId, null);
+ store.notifyApplication(new RMAppEvent(appId,
+ RMAppEventType.APP_UPDATE_SAVED));
} catch (Exception e) {
LOG.error("Error updating app: " + appId, e);
store.notifyStoreOperationFailed(e);
@@ -205,8 +207,9 @@ public abstract class RMStateStore exten
}
store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptStateData);
- store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
- null);
+ store.notifyApplicationAttempt(new RMAppAttemptEvent
+ (attemptState.getAttemptId(),
+ RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
} catch (Exception e) {
LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
store.notifyStoreOperationFailed(e);
@@ -233,8 +236,9 @@ public abstract class RMStateStore exten
}
store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptStateData);
- store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
- null);
+ store.notifyApplicationAttempt(new RMAppAttemptEvent
+ (attemptState.getAttemptId(),
+ RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
} catch (Exception e) {
LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
store.notifyStoreOperationFailed(e);
@@ -801,47 +805,28 @@ public abstract class RMStateStore exten
}
rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause));
}
-
+
@SuppressWarnings("unchecked")
/**
- * In (@link handleStoreEvent}, this method is called to notify the
- * application that new application is stored in state store
- * @param appId id of the application that has been saved
- * @param storedException the exception that is thrown when storing the
- * application
- */
- private void notifyDoneStoringApplication(ApplicationId appId,
- Exception storedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppNewSavedEvent(appId, storedException));
- }
-
- @SuppressWarnings("unchecked")
- private void notifyDoneUpdatingApplication(ApplicationId appId,
- Exception storedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppUpdateSavedEvent(appId, storedException));
+ * This method is called to notify the application that
+ * new application is stored or updated in state store
+ * @param event App event containing the app id and event type
+ */
+ private void notifyApplication(RMAppEvent event) {
+ rmDispatcher.getEventHandler().handle(event);
}
-
+
@SuppressWarnings("unchecked")
/**
- * In (@link handleStoreEvent}, this method is called to notify the
- * application attempt that new attempt is stored in state store
- * @param appAttempt attempt that has been saved
- */
- private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
- Exception storedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppAttemptNewSavedEvent(attemptId, storedException));
- }
-
- @SuppressWarnings("unchecked")
- private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId,
- Exception updatedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
+ * This method is called to notify the application attempt
+ * that new attempt is stored or updated in state store
+ * @param event App attempt event containing the app attempt
+ * id and event type
+ */
+ private void notifyApplicationAttempt(RMAppAttemptEvent event) {
+ rmDispatcher.getEventHandler().handle(event);
}
-
+
/**
* EventHandler implementation which forward events to the FSRMStateStore
* This hides the EventHandle methods of the store from its public interface
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Mon Aug 11 22:27:50 2014
@@ -820,17 +820,6 @@ public class RMAppImpl implements RMApp,
RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
- if (event instanceof RMAppNewSavedEvent) {
- RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
- // For HA this exception needs to be handled by giving up
- // master status if we got fenced
- if (((RMAppNewSavedEvent) event).getStoredException() != null) {
- LOG.error(
- "Failed to store application: " + storeEvent.getApplicationId(),
- storeEvent.getStoredException());
- ExitUtil.terminate(1, storeEvent.getStoredException());
- }
- }
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user));
}
@@ -848,13 +837,6 @@ public class RMAppImpl implements RMApp,
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
- RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
- if (storeEvent.getUpdatedException() != null) {
- LOG.error("Failed to update the final state of application"
- + storeEvent.getApplicationId(), storeEvent.getUpdatedException());
- ExitUtil.terminate(1, storeEvent.getUpdatedException());
- }
-
if (app.transitionTodo instanceof SingleArcTransition) {
((SingleArcTransition) app.transitionTodo).transition(app,
app.eventCausingFinalSaving);
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Mon Aug 11 22:27:50 2014
@@ -80,11 +80,9 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -398,7 +396,6 @@ public class RMAppAttemptImpl implements
RMAppAttemptState.KILLED,
RMAppAttemptState.KILLED,
EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED,
- RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.LAUNCH_FAILED,
RMAppAttemptEventType.EXPIRE,
@@ -906,8 +903,6 @@ public class RMAppAttemptImpl implements
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- appAttempt.checkAttemptStoreError(event);
-
appAttempt.launchAttempt();
}
}
@@ -1059,14 +1054,6 @@ public class RMAppAttemptImpl implements
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event;
- if (storeEvent.getUpdatedException() != null) {
- LOG.error("Failed to update the final state of application attempt: "
- + storeEvent.getApplicationAttemptId(),
- storeEvent.getUpdatedException());
- ExitUtil.terminate(1, storeEvent.getUpdatedException());
- }
-
RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
if (appAttempt.transitionTodo instanceof SingleArcTransition) {
@@ -1196,8 +1183,6 @@ public class RMAppAttemptImpl implements
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- appAttempt.checkAttemptStoreError(event);
-
// create AMRMToken
appAttempt.amrmToken =
appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
@@ -1690,18 +1675,6 @@ public class RMAppAttemptImpl implements
rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
}
- private void checkAttemptStoreError(RMAppAttemptEvent event) {
- RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event;
- if(storeEvent.getStoredException() != null)
- {
- // This needs to be handled for HA and give up master status if we got
- // fenced
- LOG.error("Failed to store attempt: " + getAppAttemptId(),
- storeEvent.getStoredException());
- ExitUtil.terminate(1, storeEvent.getStoredException());
- }
- }
-
private void storeAttempt() {
// store attempt data in a non-blocking manner to prevent dispatcher
// thread starvation and wait for state to be saved
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Mon Aug 11 22:27:50 2014
@@ -116,6 +116,18 @@ public abstract class Schedulable {
return fairShare;
}
+ /**
+ * Returns true if queue has atleast one app running. Always returns true for
+ * AppSchedulables.
+ */
+ public boolean isActive() {
+ if (this instanceof FSQueue) {
+ FSQueue queue = (FSQueue) this;
+ return queue.getNumRunnableApps() > 0;
+ }
+ return true;
+ }
+
/** Convenient toString implementation for debugging. */
@Override
public String toString() {
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java Mon Aug 11 22:27:50 2014
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
+import java.util.ArrayList;
import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -33,7 +34,31 @@ import org.apache.hadoop.yarn.server.res
public class ComputeFairShares {
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
-
+
+ /**
+ * Compute fair share of the given schedulables.Fair share is an allocation of
+ * shares considering only active schedulables ie schedulables which have
+ * running apps.
+ *
+ * @param schedulables
+ * @param totalResources
+ * @param type
+ */
+ public static void computeShares(
+ Collection<? extends Schedulable> schedulables, Resource totalResources,
+ ResourceType type) {
+ Collection<Schedulable> activeSchedulables = new ArrayList<Schedulable>();
+ for (Schedulable sched : schedulables) {
+ if (sched.isActive()) {
+ activeSchedulables.add(sched);
+ } else {
+ setResourceValue(0, sched.getFairShare(), type);
+ }
+ }
+
+ computeSharesInternal(activeSchedulables, totalResources, type);
+ }
+
/**
* Given a set of Schedulables and a number of slots, compute their weighted
* fair shares. The min and max shares and of the Schedulables are assumed to
@@ -75,7 +100,7 @@ public class ComputeFairShares {
* because resourceUsedWithWeightToResourceRatio is linear-time and the number of
* iterations of binary search is a constant (dependent on desired precision).
*/
- public static void computeShares(
+ private static void computeSharesInternal(
Collection<? extends Schedulable> schedulables, Resource totalResources,
ResourceType type) {
if (schedulables.isEmpty()) {
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Mon Aug 11 22:27:50 2014
@@ -386,7 +386,8 @@ public class TestAMRestart {
ApplicationState appState =
memStore.getState().getApplicationState().get(app1.getApplicationId());
// AM should be restarted even though max-am-attempt is 1.
- MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ MockAM am2 =
+ rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
@@ -398,7 +399,8 @@ public class TestAMRestart {
am2.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
- MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ MockAM am3 =
+ rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
@@ -421,7 +423,8 @@ public class TestAMRestart {
.getAMContainerExitStatus());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
- MockAM am4 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ MockAM am4 =
+ rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Mon Aug 11 22:27:50 2014
@@ -65,8 +65,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
@@ -77,10 +77,9 @@ public class RMStateStoreTestBase extend
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
static class TestDispatcher implements
- Dispatcher, EventHandler<RMAppAttemptNewSavedEvent> {
+ Dispatcher, EventHandler<RMAppAttemptEvent> {
ApplicationAttemptId attemptId;
- Exception storedException;
boolean notified = false;
@@ -91,9 +90,8 @@ public class RMStateStoreTestBase extend
}
@Override
- public void handle(RMAppAttemptNewSavedEvent event) {
+ public void handle(RMAppAttemptEvent event) {
assertEquals(attemptId, event.getApplicationAttemptId());
- assertEquals(storedException, event.getStoredException());
notified = true;
synchronized (this) {
notifyAll();
@@ -163,7 +161,6 @@ public class RMStateStoreTestBase extend
when(mockAttempt.getClientTokenMasterKey())
.thenReturn(clientTokenMasterKey);
dispatcher.attemptId = attemptId;
- dispatcher.storedException = null;
store.storeNewApplicationAttempt(mockAttempt);
waitNotify(dispatcher);
return container.getId();
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Mon Aug 11 22:27:50 2014
@@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@@ -328,15 +327,15 @@ public class TestRMAppTransitions {
private void sendAppUpdateSavedEvent(RMApp application) {
RMAppEvent event =
- new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+ new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
application.handle(event);
rmDispatcher.await();
}
private void sendAttemptUpdateSavedEvent(RMApp application) {
application.getCurrentAppAttempt().handle(
- new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
- .getAppAttemptId(), null));
+ new RMAppAttemptEvent(application.getCurrentAppAttempt().getAppAttemptId(),
+ RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
}
protected RMApp testCreateAppNewSaving(
@@ -357,7 +356,7 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppNewSaving(submissionContext);
// NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED
RMAppEvent event =
- new RMAppNewSavedEvent(application.getApplicationId(), null);
+ new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_NEW_SAVED);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.SUBMITTED, application);
@@ -422,7 +421,7 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppFinalSaving(submissionContext);
// FINAL_SAVING => FINISHING event RMAppEventType.APP_UPDATED
RMAppEvent appUpdated =
- new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+ new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
application.handle(appUpdated);
assertAppState(RMAppState.FINISHING, application);
assertTimesAtFinish(application);
@@ -763,7 +762,7 @@ public class TestRMAppTransitions {
application.handle(event);
assertAppState(RMAppState.FINAL_SAVING, application);
RMAppEvent appUpdated =
- new RMAppUpdateSavedEvent(application.getApplicationId(), null);
+ new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED);
application.handle(appUpdated);
assertAppState(RMAppState.FINISHED, application);
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1617377&r1=1617376&r2=1617377&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Mon Aug 11 22:27:50 2014
@@ -81,10 +81,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
@@ -570,15 +568,15 @@ public class TestRMAppAttemptTransitions
submitApplicationAttempt();
applicationAttempt.handle(
new RMAppAttemptEvent(
- applicationAttempt.getAppAttemptId(),
+ applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.ATTEMPT_ADDED));
if(unmanagedAM){
assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
applicationAttempt.getAppAttemptState());
applicationAttempt.handle(
- new RMAppAttemptNewSavedEvent(
- applicationAttempt.getAppAttemptId(), null));
+ new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
+ RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
}
testAppAttemptScheduledState();
@@ -616,8 +614,8 @@ public class TestRMAppAttemptTransitions
assertEquals(RMAppAttemptState.ALLOCATED_SAVING,
applicationAttempt.getAppAttemptState());
applicationAttempt.handle(
- new RMAppAttemptNewSavedEvent(
- applicationAttempt.getAppAttemptId(), null));
+ new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
+ RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
testAppAttemptAllocatedState(container);
@@ -696,8 +694,8 @@ public class TestRMAppAttemptTransitions
assertEquals(RMAppAttemptState.FINAL_SAVING,
applicationAttempt.getAppAttemptState());
applicationAttempt.handle(
- new RMAppAttemptUpdateSavedEvent(
- applicationAttempt.getAppAttemptId(), null));
+ new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
+ RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
}
@Test