You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2014/12/24 03:39:29 UTC
tez git commit: TEZ-1844. Shouldn't invoke system.exit in local mode
when AM is failed to start. (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master 8343fc83f -> 7d51844df
TEZ-1844. Shouldn't invoke system.exit in local mode when AM is failed to start. (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7d51844d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7d51844d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7d51844d
Branch: refs/heads/master
Commit: 7d51844df6fc7e7aa8b8163f40c6e13174df8403
Parents: 8343fc8
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Dec 24 10:38:57 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Dec 24 10:38:57 2014 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/tez/client/LocalClient.java | 33 +++++++++++++-------
.../apache/tez/dag/app/MockDAGAppMaster.java | 30 ++++++++++++++++--
.../org/apache/tez/dag/app/MockLocalClient.java | 18 +++++++++--
.../org/apache/tez/dag/app/MockTezClient.java | 8 +++++
.../tez/dag/app/TestMockDAGAppMaster.java | 33 ++++++++++++++++++++
6 files changed, 107 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/7d51844d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 046c424..0b90d74 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1844. Shouldn't invoke system.exit in local mode when AM is failed to start.
TEZ-1889. Fix test-patch to provide correct findbugs report.
TEZ-1313. Setup pre-commit build to test submitted patches.
TEZ-1856. Remove LocalOnFileSortedOutput, LocalMergedInput, LocalTaskOutputFiles.
http://git-wip-us.apache.org/repos/asf/tez/blob/7d51844d/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 9848159..68432f4 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
@@ -69,6 +70,7 @@ public class LocalClient extends FrameworkClient {
private int appIdNumber = 1;
private boolean isSession;
private TezApiVersionInfo versionInfo = new TezApiVersionInfo();
+ private volatile Throwable amFailException = null;
public LocalClient() {
}
@@ -113,7 +115,7 @@ public class LocalClient extends FrameworkClient {
}
@Override
- public ApplicationId submitApplication(ApplicationSubmissionContext appContext) {
+ public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws IOException, YarnException {
ApplicationId appId = appContext.getApplicationId();
startDAGAppMaster(appContext);
return appId;
@@ -205,7 +207,7 @@ public class LocalClient extends FrameworkClient {
}
}
- protected void startDAGAppMaster(final ApplicationSubmissionContext appContext) {
+ protected void startDAGAppMaster(final ApplicationSubmissionContext appContext) throws IOException {
if (dagAmThread == null) {
try {
dagAmThread = createDAGAppMaster(appContext);
@@ -213,7 +215,7 @@ public class LocalClient extends FrameworkClient {
// Wait until DAGAppMaster is started
long waitingTime = 0;
- while (true) {
+ while (amFailException == null) {
if (dagAppMaster != null) {
DAGAppMasterState dagAMState = dagAppMaster.getState();
LOG.info("DAGAppMaster state: " + dagAMState);
@@ -240,8 +242,13 @@ public class LocalClient extends FrameworkClient {
}
} catch (Throwable t) {
LOG.fatal("Error starting DAGAppMaster", t);
- dagAmThread.interrupt();
- System.exit(0);
+ if (dagAmThread != null) {
+ dagAmThread.interrupt();
+ }
+ throw new IOException(t);
+ }
+ if (amFailException != null) {
+ throw new IOException(amFailException);
}
}
}
@@ -254,12 +261,13 @@ public class LocalClient extends FrameworkClient {
ApplicationId appId = appContext.getApplicationId();
// Set up working directory for DAGAppMaster
- Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString());
- LOG.info("Using staging directory: " + userDir.toUri().getPath());
+ Path staging = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString());
+ Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()+"_wd");
+ LOG.info("Using working directory: " + userDir.toUri().getPath());
FileSystem fs = FileSystem.get(conf);
- fs.mkdirs(userDir);
-
+ // copy data from staging directory to working directory to simulate the resource localizing
+ FileUtil.copy(fs, staging, fs, userDir, false, conf);
// Prepare Environment
Path logDir = new Path(userDir, "localmode-log-dir");
Path localDir = new Path(userDir, "localmode-local-dir");
@@ -296,9 +304,12 @@ public class LocalClient extends FrameworkClient {
clientHandler = new DAGClientHandler(dagAppMaster);
DAGAppMaster.initAndStartAppMaster(dagAppMaster, currentUser.getShortUserName());
- } catch (Throwable t) {
+ } catch (Throwable t) {
LOG.fatal("Error starting DAGAppMaster", t);
- System.exit(1);
+ if (dagAppMaster != null) {
+ dagAppMaster.stop();
+ }
+ amFailException = t;
}
}
});
http://git-wip-us.apache.org/repos/asf/tez/blob/7d51844d/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index a548e3c..d84f933 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -27,6 +27,9 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -37,7 +40,9 @@ import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.client.TezApiVersionInfo;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.launcher.ContainerLauncher;
import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
@@ -60,8 +65,11 @@ import com.google.common.collect.Maps;
@SuppressWarnings("unchecked")
public class MockDAGAppMaster extends DAGAppMaster {
+ private static final Log LOG = LogFactory.getLog(MockDAGAppMaster.class);
MockContainerLauncher containerLauncher;
-
+ boolean initFailFlag;
+ boolean startFailFlag;
+
// mock container launcher does not launch real tasks.
// Upon, launch of a container is simulates the container asking for tasks
// Upon receiving a task it simulates completion of the tasks
@@ -295,11 +303,14 @@ public class MockDAGAppMaster extends DAGAppMaster {
public MockDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId,
String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime,
- boolean isSession, String workingDirectory, AtomicBoolean launcherGoFlag) {
+ boolean isSession, String workingDirectory, AtomicBoolean launcherGoFlag,
+ boolean initFailFlag, boolean startFailFlag) {
super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
isSession, workingDirectory, new TezApiVersionInfo().getVersion());
containerLauncher = new MockContainerLauncher(launcherGoFlag);
shutdownHandler = new MockDAGAppMasterShutdownHandler();
+ this.initFailFlag = initFailFlag;
+ this.startFailFlag = startFailFlag;
}
// use mock container launcher for tests
@@ -317,4 +328,19 @@ public class MockDAGAppMaster extends DAGAppMaster {
return (MockDAGAppMasterShutdownHandler) this.shutdownHandler;
}
+ @Override
+ public synchronized void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ if (initFailFlag) {
+ throw new Exception("FailInit");
+ }
+ }
+
+ @Override
+ public synchronized void serviceStart() throws Exception {
+ super.serviceStart();
+ if (startFailFlag) {
+ throw new Exception("FailStart");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/7d51844d/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
index 2631e3c..49b7c6f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
@@ -23,24 +23,36 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.client.LocalClient;
public class MockLocalClient extends LocalClient {
MockDAGAppMaster mockApp;
AtomicBoolean mockAppLauncherGoFlag;
Clock mockClock;
-
+ final boolean initFailFlag;
+ final boolean startFailFlag;
+
public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock) {
this.mockAppLauncherGoFlag = mockAppLauncherGoFlag;
this.mockClock = clock;
+ this.initFailFlag = false;
+ this.startFailFlag = false;
}
+ public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock,
+ boolean initFailFlag, boolean startFailFlag) {
+ this.mockAppLauncherGoFlag = mockAppLauncherGoFlag;
+ this.mockClock = clock;
+ this.initFailFlag = initFailFlag;
+ this.startFailFlag = startFailFlag;
+ }
+
protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime, boolean isSession, String userDir) {
mockApp = new MockDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
- (mockClock!=null ? mockClock : clock), appSubmitTime, isSession, userDir, mockAppLauncherGoFlag);
+ (mockClock!=null ? mockClock : clock), appSubmitTime, isSession, userDir, mockAppLauncherGoFlag,
+ initFailFlag, startFailFlag);
return mockApp;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/7d51844d/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
index 0ff3340..89df25c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
@@ -38,6 +38,14 @@ public class MockTezClient extends TezClient {
this.client = new MockLocalClient(mockAppLauncherGoFlag, clock);
}
+ MockTezClient(String name, TezConfiguration tezConf, boolean isSession,
+ Map<String, LocalResource> localResources, Credentials credentials,
+ Clock clock, AtomicBoolean mockAppLauncherGoFlag,
+ boolean initFailFlag, boolean startFailFlag) {
+ super(name, tezConf, isSession, localResources, credentials);
+ this.client = new MockLocalClient(mockAppLauncherGoFlag, clock, initFailFlag, startFailFlag);
+ }
+
protected FrameworkClient createFrameworkClient() {
return client;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/7d51844d/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 682e6ed..bed971a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -20,10 +20,12 @@ package org.apache.tez.dag.app;
import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -160,4 +162,35 @@ public class TestMockDAGAppMaster {
Assert.assertEquals(DAGState.RUNNING, mockApp.getContext().getCurrentDAG().getState());
}
+ @Test (timeout = 10000)
+ public void testInitFailed() throws Exception {
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true,
+ null, null, null, new AtomicBoolean(false), true, false);
+ try {
+ tezClient.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.assertEquals("FailInit", e.getCause().getCause().getMessage());
+ MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+ // will timeout if DAGAppMasterShutdownHook is not invoked
+ mockApp.waitForServiceToStop(Integer.MAX_VALUE);
+ }
+ }
+
+ @Test (timeout = 10000)
+ public void testStartFailed() {
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true,
+ null, null, null, new AtomicBoolean(false), false, true);
+ try {
+ tezClient.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.assertEquals("FailStart", e.getCause().getCause().getMessage());
+ MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+ // will timeout if DAGAppMasterShutdownHook is not invoked
+ mockApp.waitForServiceToStop(Integer.MAX_VALUE);
+ }
+ }
}