You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/03/03 21:19:27 UTC

tez git commit: TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application. (hitesh)

Repository: tez
Updated Branches:
  refs/heads/master 3f5a7f35d -> 2af886b50


TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2af886b5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2af886b5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2af886b5

Branch: refs/heads/master
Commit: 2af886b509015200e1c04527275474cbc771c667
Parents: 3f5a7f3
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Mar 3 12:19:12 2016 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Mar 3 12:19:12 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/tez/dag/api/client/DAGClient.java    |   1 -
 .../tez/dag/api/client/DAGClientImpl.java       |  24 +++-
 .../tez/dag/api/client/DAGClientInternal.java   | 116 +++++++++++++++++++
 .../dag/api/client/DAGClientTimelineImpl.java   |   4 +-
 .../dag/api/client/rpc/DAGClientRPCImpl.java    |  40 ++++---
 6 files changed, 169 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5a7ae58..0c6cacc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application.
   TEZ-3115. Shuffle string handling adds significant memory overhead
   TEZ-3151. Expose DAG credentials to plugins.
   TEZ-3149. Tez-tools: Add username in DagInfo.
@@ -396,6 +397,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES:
+  TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application.
   TEZ-3115. Shuffle string handling adds significant memory overhead
   TEZ-3149. Tez-tools: Add username in DagInfo.
   TEZ-2988. DAGAppMaster::shutdownTezAM should return with a no-op if it has been invoked earlier.

http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index 27b316b..9b11b96 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -26,7 +26,6 @@ import javax.annotation.Nullable;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.tez.dag.api.TezException;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index fa22c32..af67ee8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -30,6 +30,8 @@ import java.util.Set;
 import com.google.common.annotations.VisibleForTesting;
 
 import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -58,7 +60,7 @@ public class DAGClientImpl extends DAGClient {
   private final FrameworkClient frameworkClient;
 
   @VisibleForTesting
-  protected DAGClient realClient;
+  protected DAGClientInternal realClient;
   private boolean dagCompleted = false;
   @VisibleForTesting
   protected boolean isATSEnabled = false;
@@ -240,6 +242,9 @@ public class DAGClientImpl extends DAGClient {
         if (dagStatus.isCompleted()) {
           return dagStatus;
         }
+      } catch (ApplicationNotFoundException e) {
+        LOG.info("Failed to fetch DAG data for completed DAG from YARN Timeline"
+            + " - Application not found by YARN", e);
       } catch (TezException e) {
         if (LOG.isDebugEnabled()) {
           LOG.info("DAGStatus fetch failed." + e.getMessage());
@@ -291,6 +296,10 @@ public class DAGClientImpl extends DAGClient {
         if (vertexCompletionStates.contains(vertexStatus.getState())) {
           return vertexStatus;
         }
+      } catch (ApplicationNotFoundException e) {
+        LOG.info("Failed to fetch Vertex data for completed DAG from YARN Timeline"
+            + " - Application not found by YARN", e);
+        return null;
       } catch (TezException e) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("ERROR fetching vertex data from Yarn Timeline. " + e.getMessage());
@@ -350,6 +359,10 @@ public class DAGClientImpl extends DAGClient {
     try {
       dagStatus = realClient.getDAGStatus(statusOptions, timeout);
     } catch (DAGNotRunningException e) {
+      LOG.info("DAG is no longer running", e);
+      dagCompleted = true;
+    } catch (ApplicationNotFoundException e) {
+      LOG.info("DAG is no longer running - application not found by YARN", e);
       dagCompleted = true;
     } catch (TezException e) {
       // can be either due to a n/w issue of due to AM completed.
@@ -370,6 +383,10 @@ public class DAGClientImpl extends DAGClient {
     try {
       vertexStatus = realClient.getVertexStatus(vertexName, statusOptions);
     } catch (DAGNotRunningException e) {
+      LOG.info("DAG is no longer running", e);
+      dagCompleted = true;
+    } catch (ApplicationNotFoundException e) {
+      LOG.info("DAG is no longer running - application not found by YARN", e);
       dagCompleted = true;
     } catch (TezException e) {
       // can be either due to a n/w issue of due to AM completed.
@@ -398,6 +415,9 @@ public class DAGClientImpl extends DAGClient {
     ApplicationReport appReport;
     try {
       appReport = frameworkClient.getApplicationReport(appId);
+    } catch (ApplicationNotFoundException e) {
+      LOG.info("DAG is no longer running - application not found by YARN", e);
+      throw new DAGNotRunningException(e);
     } catch (YarnException e) {
       throw new TezException(e);
     }
@@ -592,7 +612,7 @@ public class DAGClientImpl extends DAGClient {
   }
 
   @VisibleForTesting
-  public DAGClient getRealClient() {
+  public DAGClientInternal getRealClient() {
     return realClient;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java
new file mode 100644
index 0000000..bb236a3
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java
@@ -0,0 +1,116 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.tez.dag.api.TezException;
+
+/**
+ * Private internal class for monitoring the <code>DAG</code> running in a Tez DAG
+ * Application Master.
+ */
+@Private
+public abstract class DAGClientInternal implements Closeable {
+
+  /**
+   * Gets DAG execution context for use with logging
+   * @return summary of DAG execution
+   */
+  public abstract String getExecutionContext();
+
+  @Private
+  /**
+   * Get the YARN ApplicationReport for the app running the DAG. For performance
+   * reasons this may be stale copy and should be used to access static info. It
+   * may be null.
+   * @return <code>ApplicationReport</code> or null
+   */
+  protected abstract ApplicationReport getApplicationReportInternal();
+
+  /**
+   * Get the status of the specified DAG
+   * @param statusOptions Optionally, retrieve additional information based on
+   *                      specified options. To retrieve basic information, this can be null
+   */
+  public abstract DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions)
+      throws IOException, TezException, ApplicationNotFoundException;
+
+  /**
+   * Get the status of the specified DAG when it reaches a final state, or the timeout expires.
+   *
+   * @param statusOptions Optionally, retrieve additional information based on
+   *                      specified options. To retrieve basic information, this can be null
+   * @param timeout RPC call timeout. Value -1 waits for infinite and returns when
+   *                DAG reaches final state
+   * @return DAG Status
+   * @throws IOException
+   * @throws TezException
+   */
+  public abstract DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+      long timeout)
+      throws IOException, TezException, ApplicationNotFoundException;
+
+  /**
+   * Get the status of a Vertex of a DAG
+   * @param statusOptions Optionally, retrieve additional information based on
+   *                      specified options
+   */
+  public abstract VertexStatus getVertexStatus(String vertexName,
+      Set<StatusGetOpts> statusOptions)
+    throws IOException, TezException, ApplicationNotFoundException;
+
+  /**
+   * Kill a running DAG
+   *
+   */
+  public abstract void tryKillDAG() throws IOException, TezException;
+
+  /**
+   * Wait for DAG to complete without printing any vertex statuses
+   * 
+   * @return Final DAG Status
+   * @throws IOException
+   * @throws TezException
+   * @throws InterruptedException 
+   */
+  public abstract DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException;
+
+  /**
+   * Wait for DAG to complete and periodically print *all* vertices' status.
+   * 
+   * @param statusGetOpts
+   *          : status get options. For example, to get counter pass
+   *          <code>EnumSet.of(StatusGetOpts.GET_COUNTERS)</code>
+   * @return Final DAG Status
+   * @throws IOException
+   * @throws TezException
+   * @throws InterruptedException 
+   */
+  public abstract DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts)
+      throws IOException, TezException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index 2294759..ffd91b7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -37,6 +37,8 @@ import com.sun.jersey.api.client.ClientHandlerException;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.UniformInterfaceException;
 import com.sun.jersey.api.client.WebResource;
+
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -63,7 +65,7 @@ import org.codehaus.jettison.json.JSONObject;
 
 
 @Private
-public class DAGClientTimelineImpl extends DAGClient {
+public class DAGClientTimelineImpl extends DAGClientInternal {
   private static final Logger LOG = LoggerFactory.getLogger(DAGClientTimelineImpl.class);
 
   private static final String FILTER_BY_FIELDS = "primaryfilters,otherinfo";

http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 240289c..ff48755 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -23,7 +23,10 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.tez.common.RPCUtil;
+import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.client.DAGClientInternal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -51,7 +54,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
 
 @Private
-public class DAGClientRPCImpl extends DAGClient {
+public class DAGClientRPCImpl extends DAGClientInternal {
 
   private static final Logger LOG = LoggerFactory.getLogger(DAGClientRPCImpl.class);
 
@@ -82,15 +85,15 @@ public class DAGClientRPCImpl extends DAGClient {
 
   @Override
   public DAGStatus getDAGStatus(Set<StatusGetOpts> statusOptions)
-      throws IOException, TezException {
+      throws IOException, TezException, ApplicationNotFoundException {
     return getDAGStatus(statusOptions, 0);
   }
 
 
   @Override
   public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
-      long timeout) throws IOException, TezException {
-    if(createAMProxyIfNeeded()) {
+      long timeout) throws IOException, TezException, ApplicationNotFoundException {
+    if (createAMProxyIfNeeded()) {
       try {
         DAGStatus dagStatus = getDAGStatusViaAM(statusOptions, timeout);
         return dagStatus;
@@ -110,7 +113,8 @@ public class DAGClientRPCImpl extends DAGClient {
   @Override
   public VertexStatus getVertexStatus(String vertexName,
       Set<StatusGetOpts> statusOptions)
-      throws IOException, TezException {
+      throws IOException, TezException, ApplicationNotFoundException {
+
     if(createAMProxyIfNeeded()) {
       try {
         return getVertexStatusViaAM(vertexName, statusOptions);
@@ -133,14 +137,18 @@ public class DAGClientRPCImpl extends DAGClient {
     if(LOG.isDebugEnabled()) {
       LOG.debug("TryKill for app: " + appId + " dag:" + dagId);
     }
-    if(createAMProxyIfNeeded()) {
-      TryKillDAGRequestProto requestProto =
-          TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
-      try {
-        proxy.tryKillDAG(null, requestProto);
-      } catch (ServiceException e) {
-        resetProxy(e);
+    try {
+      if (createAMProxyIfNeeded()) {
+        TryKillDAGRequestProto requestProto =
+            TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
+        try {
+          proxy.tryKillDAG(null, requestProto);
+        } catch (ServiceException e) {
+          resetProxy(e);
+        }
       }
+    } catch (ApplicationNotFoundException e) {
+      throw new SessionNotRunning("Application already completed");
     }
   }
 
@@ -217,7 +225,8 @@ public class DAGClientRPCImpl extends DAGClient {
     }
   }
 
-  ApplicationReport getAppReport() throws IOException, TezException {
+  ApplicationReport getAppReport() throws IOException, TezException,
+      ApplicationNotFoundException {
     try {
       ApplicationReport appReport = frameworkClient.getApplicationReport(appId);
       if (LOG.isDebugEnabled()) {
@@ -225,12 +234,15 @@ public class DAGClientRPCImpl extends DAGClient {
             + appReport.getYarnApplicationState());
       }
       return appReport;
+    } catch (ApplicationNotFoundException e) {
+      throw e;
     } catch (YarnException e) {
       throw new TezException(e);
     }
   }
 
-  boolean createAMProxyIfNeeded() throws IOException, TezException {
+  boolean createAMProxyIfNeeded() throws IOException, TezException,
+      ApplicationNotFoundException {
     if(proxy != null) {
       // if proxy exist optimistically use it assuming there is no retry
       return true;