You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/03/03 21:19:27 UTC
tez git commit: TEZ-3156. Tez client keeps trying to talk to RM even
if RM does not know about the application. (hitesh)
Repository: tez
Updated Branches:
refs/heads/master 3f5a7f35d -> 2af886b50
TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2af886b5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2af886b5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2af886b5
Branch: refs/heads/master
Commit: 2af886b509015200e1c04527275474cbc771c667
Parents: 3f5a7f3
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Mar 3 12:19:12 2016 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Mar 3 12:19:12 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/api/client/DAGClient.java | 1 -
.../tez/dag/api/client/DAGClientImpl.java | 24 +++-
.../tez/dag/api/client/DAGClientInternal.java | 116 +++++++++++++++++++
.../dag/api/client/DAGClientTimelineImpl.java | 4 +-
.../dag/api/client/rpc/DAGClientRPCImpl.java | 40 ++++---
6 files changed, 169 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5a7ae58..0c6cacc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-3029. Add an onError method to service plugin contexts.
ALL CHANGES:
+ TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application.
TEZ-3115. Shuffle string handling adds significant memory overhead
TEZ-3151. Expose DAG credentials to plugins.
TEZ-3149. Tez-tools: Add username in DagInfo.
@@ -396,6 +397,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES:
+ TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application.
TEZ-3115. Shuffle string handling adds significant memory overhead
TEZ-3149. Tez-tools: Add username in DagInfo.
TEZ-2988. DAGAppMaster::shutdownTezAM should return with a no-op if it has been invoked earlier.
http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index 27b316b..9b11b96 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -26,7 +26,6 @@ import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.tez.dag.api.TezException;
http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index fa22c32..af67ee8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -30,6 +30,8 @@ import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -58,7 +60,7 @@ public class DAGClientImpl extends DAGClient {
private final FrameworkClient frameworkClient;
@VisibleForTesting
- protected DAGClient realClient;
+ protected DAGClientInternal realClient;
private boolean dagCompleted = false;
@VisibleForTesting
protected boolean isATSEnabled = false;
@@ -240,6 +242,9 @@ public class DAGClientImpl extends DAGClient {
if (dagStatus.isCompleted()) {
return dagStatus;
}
+ } catch (ApplicationNotFoundException e) {
+ LOG.info("Failed to fetch DAG data for completed DAG from YARN Timeline"
+ + " - Application not found by YARN", e);
} catch (TezException e) {
if (LOG.isDebugEnabled()) {
LOG.info("DAGStatus fetch failed." + e.getMessage());
@@ -291,6 +296,10 @@ public class DAGClientImpl extends DAGClient {
if (vertexCompletionStates.contains(vertexStatus.getState())) {
return vertexStatus;
}
+ } catch (ApplicationNotFoundException e) {
+ LOG.info("Failed to fetch Vertex data for completed DAG from YARN Timeline"
+ + " - Application not found by YARN", e);
+ return null;
} catch (TezException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("ERROR fetching vertex data from Yarn Timeline. " + e.getMessage());
@@ -350,6 +359,10 @@ public class DAGClientImpl extends DAGClient {
try {
dagStatus = realClient.getDAGStatus(statusOptions, timeout);
} catch (DAGNotRunningException e) {
+ LOG.info("DAG is no longer running", e);
+ dagCompleted = true;
+ } catch (ApplicationNotFoundException e) {
+ LOG.info("DAG is no longer running - application not found by YARN", e);
dagCompleted = true;
} catch (TezException e) {
// can be either due to a n/w issue of due to AM completed.
@@ -370,6 +383,10 @@ public class DAGClientImpl extends DAGClient {
try {
vertexStatus = realClient.getVertexStatus(vertexName, statusOptions);
} catch (DAGNotRunningException e) {
+ LOG.info("DAG is no longer running", e);
+ dagCompleted = true;
+ } catch (ApplicationNotFoundException e) {
+ LOG.info("DAG is no longer running - application not found by YARN", e);
dagCompleted = true;
} catch (TezException e) {
// can be either due to a n/w issue of due to AM completed.
@@ -398,6 +415,9 @@ public class DAGClientImpl extends DAGClient {
ApplicationReport appReport;
try {
appReport = frameworkClient.getApplicationReport(appId);
+ } catch (ApplicationNotFoundException e) {
+ LOG.info("DAG is no longer running - application not found by YARN", e);
+ throw new DAGNotRunningException(e);
} catch (YarnException e) {
throw new TezException(e);
}
@@ -592,7 +612,7 @@ public class DAGClientImpl extends DAGClient {
}
@VisibleForTesting
- public DAGClient getRealClient() {
+ public DAGClientInternal getRealClient() {
return realClient;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java
new file mode 100644
index 0000000..bb236a3
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java
@@ -0,0 +1,116 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.tez.dag.api.TezException;
+
+/**
+ * Private internal class for monitoring the <code>DAG</code> running in a Tez DAG
+ * Application Master.
+ */
+@Private
+public abstract class DAGClientInternal implements Closeable {
+
+ /**
+ * Gets DAG execution context for use with logging
+ * @return summary of DAG execution
+ */
+ public abstract String getExecutionContext();
+
+ @Private
+ /**
+ * Get the YARN ApplicationReport for the app running the DAG. For performance
+ * reasons this may be stale copy and should be used to access static info. It
+ * may be null.
+ * @return <code>ApplicationReport</code> or null
+ */
+ protected abstract ApplicationReport getApplicationReportInternal();
+
+ /**
+ * Get the status of the specified DAG
+ * @param statusOptions Optionally, retrieve additional information based on
+ * specified options. To retrieve basic information, this can be null
+ */
+ public abstract DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions)
+ throws IOException, TezException, ApplicationNotFoundException;
+
+ /**
+ * Get the status of the specified DAG when it reaches a final state, or the timeout expires.
+ *
+ * @param statusOptions Optionally, retrieve additional information based on
+ * specified options. To retrieve basic information, this can be null
+ * @param timeout RPC call timeout. Value -1 waits for infinite and returns when
+ * DAG reaches final state
+ * @return DAG Status
+ * @throws IOException
+ * @throws TezException
+ */
+ public abstract DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+ long timeout)
+ throws IOException, TezException, ApplicationNotFoundException;
+
+ /**
+ * Get the status of a Vertex of a DAG
+ * @param statusOptions Optionally, retrieve additional information based on
+ * specified options
+ */
+ public abstract VertexStatus getVertexStatus(String vertexName,
+ Set<StatusGetOpts> statusOptions)
+ throws IOException, TezException, ApplicationNotFoundException;
+
+ /**
+ * Kill a running DAG
+ *
+ */
+ public abstract void tryKillDAG() throws IOException, TezException;
+
+ /**
+ * Wait for DAG to complete without printing any vertex statuses
+ *
+ * @return Final DAG Status
+ * @throws IOException
+ * @throws TezException
+ * @throws InterruptedException
+ */
+ public abstract DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException;
+
+ /**
+ * Wait for DAG to complete and periodically print *all* vertices' status.
+ *
+ * @param statusGetOpts
+ * : status get options. For example, to get counter pass
+ * <code>EnumSet.of(StatusGetOpts.GET_COUNTERS)</code>
+ * @return Final DAG Status
+ * @throws IOException
+ * @throws TezException
+ * @throws InterruptedException
+ */
+ public abstract DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts)
+ throws IOException, TezException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index 2294759..ffd91b7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -37,6 +37,8 @@ import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
+
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -63,7 +65,7 @@ import org.codehaus.jettison.json.JSONObject;
@Private
-public class DAGClientTimelineImpl extends DAGClient {
+public class DAGClientTimelineImpl extends DAGClientInternal {
private static final Logger LOG = LoggerFactory.getLogger(DAGClientTimelineImpl.class);
private static final String FILTER_BY_FIELDS = "primaryfilters,otherinfo";
http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 240289c..ff48755 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -23,7 +23,10 @@ import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.tez.common.RPCUtil;
+import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.client.DAGClientInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -51,7 +54,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
@Private
-public class DAGClientRPCImpl extends DAGClient {
+public class DAGClientRPCImpl extends DAGClientInternal {
private static final Logger LOG = LoggerFactory.getLogger(DAGClientRPCImpl.class);
@@ -82,15 +85,15 @@ public class DAGClientRPCImpl extends DAGClient {
@Override
public DAGStatus getDAGStatus(Set<StatusGetOpts> statusOptions)
- throws IOException, TezException {
+ throws IOException, TezException, ApplicationNotFoundException {
return getDAGStatus(statusOptions, 0);
}
@Override
public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
- long timeout) throws IOException, TezException {
- if(createAMProxyIfNeeded()) {
+ long timeout) throws IOException, TezException, ApplicationNotFoundException {
+ if (createAMProxyIfNeeded()) {
try {
DAGStatus dagStatus = getDAGStatusViaAM(statusOptions, timeout);
return dagStatus;
@@ -110,7 +113,8 @@ public class DAGClientRPCImpl extends DAGClient {
@Override
public VertexStatus getVertexStatus(String vertexName,
Set<StatusGetOpts> statusOptions)
- throws IOException, TezException {
+ throws IOException, TezException, ApplicationNotFoundException {
+
if(createAMProxyIfNeeded()) {
try {
return getVertexStatusViaAM(vertexName, statusOptions);
@@ -133,14 +137,18 @@ public class DAGClientRPCImpl extends DAGClient {
if(LOG.isDebugEnabled()) {
LOG.debug("TryKill for app: " + appId + " dag:" + dagId);
}
- if(createAMProxyIfNeeded()) {
- TryKillDAGRequestProto requestProto =
- TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
- try {
- proxy.tryKillDAG(null, requestProto);
- } catch (ServiceException e) {
- resetProxy(e);
+ try {
+ if (createAMProxyIfNeeded()) {
+ TryKillDAGRequestProto requestProto =
+ TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
+ try {
+ proxy.tryKillDAG(null, requestProto);
+ } catch (ServiceException e) {
+ resetProxy(e);
+ }
}
+ } catch (ApplicationNotFoundException e) {
+ throw new SessionNotRunning("Application already completed");
}
}
@@ -217,7 +225,8 @@ public class DAGClientRPCImpl extends DAGClient {
}
}
- ApplicationReport getAppReport() throws IOException, TezException {
+ ApplicationReport getAppReport() throws IOException, TezException,
+ ApplicationNotFoundException {
try {
ApplicationReport appReport = frameworkClient.getApplicationReport(appId);
if (LOG.isDebugEnabled()) {
@@ -225,12 +234,15 @@ public class DAGClientRPCImpl extends DAGClient {
+ appReport.getYarnApplicationState());
}
return appReport;
+ } catch (ApplicationNotFoundException e) {
+ throw e;
} catch (YarnException e) {
throw new TezException(e);
}
}
- boolean createAMProxyIfNeeded() throws IOException, TezException {
+ boolean createAMProxyIfNeeded() throws IOException, TezException,
+ ApplicationNotFoundException {
if(proxy != null) {
// if proxy exist optimistically use it assuming there is no retry
return true;