You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2014/12/24 03:39:29 UTC

tez git commit: TEZ-1844. Shouldn't invoke system.exit in local mode when AM is failed to start. (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master 8343fc83f -> 7d51844df


TEZ-1844. Shouldn't invoke system.exit in local mode when AM is failed to start. (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7d51844d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7d51844d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7d51844d

Branch: refs/heads/master
Commit: 7d51844df6fc7e7aa8b8163f40c6e13174df8403
Parents: 8343fc8
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Dec 24 10:38:57 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Dec 24 10:38:57 2014 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../java/org/apache/tez/client/LocalClient.java | 33 +++++++++++++-------
 .../apache/tez/dag/app/MockDAGAppMaster.java    | 30 ++++++++++++++++--
 .../org/apache/tez/dag/app/MockLocalClient.java | 18 +++++++++--
 .../org/apache/tez/dag/app/MockTezClient.java   |  8 +++++
 .../tez/dag/app/TestMockDAGAppMaster.java       | 33 ++++++++++++++++++++
 6 files changed, 107 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7d51844d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 046c424..0b90d74 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1844. Shouldn't invoke system.exit in local mode when AM is failed to start.
   TEZ-1889. Fix test-patch to provide correct findbugs report.
   TEZ-1313. Setup pre-commit build to test submitted patches.
   TEZ-1856. Remove LocalOnFileSortedOutput, LocalMergedInput, LocalTaskOutputFiles.

http://git-wip-us.apache.org/repos/asf/tez/blob/7d51844d/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 9848159..68432f4 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.security.Credentials;
@@ -69,6 +70,7 @@ public class LocalClient extends FrameworkClient {
   private int appIdNumber = 1;
   private boolean isSession;
   private TezApiVersionInfo versionInfo = new TezApiVersionInfo();
+  private volatile Throwable amFailException = null;
 
   public LocalClient() {
   }
@@ -113,7 +115,7 @@ public class LocalClient extends FrameworkClient {
   }
 
   @Override
-  public ApplicationId submitApplication(ApplicationSubmissionContext appContext) {
+  public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws IOException, YarnException {
     ApplicationId appId = appContext.getApplicationId();
     startDAGAppMaster(appContext);
     return appId;
@@ -205,7 +207,7 @@ public class LocalClient extends FrameworkClient {
     }
   }
 
-  protected void startDAGAppMaster(final ApplicationSubmissionContext appContext) {
+  protected void startDAGAppMaster(final ApplicationSubmissionContext appContext) throws IOException {
     if (dagAmThread == null) {
       try {
         dagAmThread = createDAGAppMaster(appContext);
@@ -213,7 +215,7 @@ public class LocalClient extends FrameworkClient {
 
         // Wait until DAGAppMaster is started
         long waitingTime = 0;
-        while (true) {
+        while (amFailException == null) {
           if (dagAppMaster != null) {
             DAGAppMasterState dagAMState = dagAppMaster.getState();
             LOG.info("DAGAppMaster state: " + dagAMState);
@@ -240,8 +242,13 @@ public class LocalClient extends FrameworkClient {
         }
       } catch (Throwable t) {
         LOG.fatal("Error starting DAGAppMaster", t);
-        dagAmThread.interrupt();
-        System.exit(0);
+        if (dagAmThread != null) {
+          dagAmThread.interrupt();
+        }
+        throw new IOException(t);
+      }
+      if (amFailException != null) {
+        throw new IOException(amFailException);
       }
     }
   }
@@ -254,12 +261,13 @@ public class LocalClient extends FrameworkClient {
           ApplicationId appId = appContext.getApplicationId();
 
           // Set up working directory for DAGAppMaster
-          Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString());
-          LOG.info("Using staging directory: " + userDir.toUri().getPath());
+          Path staging = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString());
+          Path userDir = TezCommonUtils.getTezSystemStagingPath(conf, appId.toString()+"_wd");
+          LOG.info("Using working directory: " + userDir.toUri().getPath());
 
           FileSystem fs = FileSystem.get(conf);
-          fs.mkdirs(userDir);
-
+          // copy data from staging directory to working directory to simulate the resource localizing
+          FileUtil.copy(fs, staging, fs, userDir, false, conf);
           // Prepare Environment
           Path logDir = new Path(userDir, "localmode-log-dir");
           Path localDir = new Path(userDir, "localmode-local-dir");
@@ -296,9 +304,12 @@ public class LocalClient extends FrameworkClient {
           clientHandler = new DAGClientHandler(dagAppMaster);
           DAGAppMaster.initAndStartAppMaster(dagAppMaster, currentUser.getShortUserName());
 
-          } catch (Throwable t) {
+        } catch (Throwable t) {
           LOG.fatal("Error starting DAGAppMaster", t);
-          System.exit(1);
+          if (dagAppMaster != null) {
+            dagAppMaster.stop();
+          }
+          amFailException = t;
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/tez/blob/7d51844d/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index a548e3c..d84f933 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -27,6 +27,9 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -37,7 +40,9 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.client.TezApiVersionInfo;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.launcher.ContainerLauncher;
 import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
@@ -60,8 +65,11 @@ import com.google.common.collect.Maps;
 @SuppressWarnings("unchecked")
 public class MockDAGAppMaster extends DAGAppMaster {
   
+  private static final Log LOG = LogFactory.getLog(MockDAGAppMaster.class);
   MockContainerLauncher containerLauncher;
-  
+  boolean initFailFlag;
+  boolean startFailFlag;
+
   // mock container launcher does not launch real tasks.
   // Upon, launch of a container is simulates the container asking for tasks
   // Upon receiving a task it simulates completion of the tasks
@@ -295,11 +303,14 @@ public class MockDAGAppMaster extends DAGAppMaster {
 
   public MockDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId,
       String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime,
-      boolean isSession, String workingDirectory, AtomicBoolean launcherGoFlag) {
+      boolean isSession, String workingDirectory, AtomicBoolean launcherGoFlag,
+      boolean initFailFlag, boolean startFailFlag) {
     super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
         isSession, workingDirectory, new TezApiVersionInfo().getVersion());
     containerLauncher = new MockContainerLauncher(launcherGoFlag);
     shutdownHandler = new MockDAGAppMasterShutdownHandler();
+    this.initFailFlag = initFailFlag;
+    this.startFailFlag = startFailFlag;
   }
   
   // use mock container launcher for tests
@@ -317,4 +328,19 @@ public class MockDAGAppMaster extends DAGAppMaster {
     return (MockDAGAppMasterShutdownHandler) this.shutdownHandler;
   }
 
+  @Override
+  public synchronized void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    if (initFailFlag) {
+      throw new Exception("FailInit");
+    }
+  }
+
+  @Override
+  public synchronized void serviceStart() throws Exception {
+    super.serviceStart();
+    if (startFailFlag) {
+      throw new Exception("FailStart");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/7d51844d/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
index 2631e3c..49b7c6f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
@@ -23,24 +23,36 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.client.LocalClient;
 
 public class MockLocalClient extends LocalClient {
   MockDAGAppMaster mockApp;
   AtomicBoolean mockAppLauncherGoFlag;
   Clock mockClock;
-  
+  final boolean initFailFlag;
+  final boolean startFailFlag;
+
   public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock) {
     this.mockAppLauncherGoFlag = mockAppLauncherGoFlag;
     this.mockClock = clock;
+    this.initFailFlag = false;
+    this.startFailFlag = false;
   }
   
+  public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock,
+      boolean initFailFlag, boolean startFailFlag) {
+    this.mockAppLauncherGoFlag = mockAppLauncherGoFlag;
+    this.mockClock = clock;
+    this.initFailFlag = initFailFlag;
+    this.startFailFlag = startFailFlag;
+  }
+
   protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
       Clock clock, long appSubmitTime, boolean isSession, String userDir) {
     mockApp = new MockDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
-        (mockClock!=null ? mockClock : clock), appSubmitTime, isSession, userDir, mockAppLauncherGoFlag);
+        (mockClock!=null ? mockClock : clock), appSubmitTime, isSession, userDir, mockAppLauncherGoFlag,
+        initFailFlag, startFailFlag);
     return mockApp;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/7d51844d/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
index 0ff3340..89df25c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
@@ -38,6 +38,14 @@ public class MockTezClient extends TezClient {
     this.client = new MockLocalClient(mockAppLauncherGoFlag, clock);
   }
   
+  MockTezClient(String name, TezConfiguration tezConf, boolean isSession,
+      Map<String, LocalResource> localResources, Credentials credentials,
+      Clock clock, AtomicBoolean mockAppLauncherGoFlag,
+      boolean initFailFlag, boolean startFailFlag) {
+    super(name, tezConf, isSession, localResources, credentials);
+    this.client = new MockLocalClient(mockAppLauncherGoFlag, clock, initFailFlag, startFailFlag);
+  }
+
   protected FrameworkClient createFrameworkClient() {
     return client;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/7d51844d/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 682e6ed..bed971a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -20,10 +20,12 @@ package org.apache.tez.dag.app;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -160,4 +162,35 @@ public class TestMockDAGAppMaster {
     Assert.assertEquals(DAGState.RUNNING, mockApp.getContext().getCurrentDAG().getState());
   }
 
+  @Test (timeout = 10000)
+  public void testInitFailed() throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true,
+      null, null, null, new AtomicBoolean(false), true, false);
+    try {
+      tezClient.start();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertEquals("FailInit", e.getCause().getCause().getMessage());
+      MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+      // will timeout if DAGAppMasterShutdownHook is not invoked
+      mockApp.waitForServiceToStop(Integer.MAX_VALUE);
+    }
+  }
+
+  @Test (timeout = 10000)
+  public void testStartFailed() {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true,
+      null, null, null, new AtomicBoolean(false), false, true);
+    try {
+      tezClient.start();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertEquals("FailStart", e.getCause().getCause().getMessage());
+      MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+      // will timeout if DAGAppMasterShutdownHook is not invoked
+      mockApp.waitForServiceToStop(Integer.MAX_VALUE);
+    }
+  }
 }