You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/26 20:03:33 UTC
tez git commit: TEZ-1967. Add a monitoring API on DAGClient which
returns after a time interval or on DAG final state change. Contributed by
Vasanth kumar RJ.
Repository: tez
Updated Branches:
refs/heads/master 53259aad9 -> 9c8015c49
TEZ-1967. Add a monitoring API on DAGClient which returns after a time
interval or on DAG final state change. Contributed by Vasanth kumar RJ.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9c8015c4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9c8015c4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9c8015c4
Branch: refs/heads/master
Commit: 9c8015c49c8d6059e60c7d29c834348cb27bb6df
Parents: 53259aa
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Feb 26 11:03:05 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Feb 26 11:03:05 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 9 ++
.../apache/tez/dag/api/client/DAGClient.java | 17 ++++
.../tez/dag/api/client/DAGClientImpl.java | 90 ++++++++++++++++++--
.../dag/api/client/DAGClientTimelineImpl.java | 6 ++
.../dag/api/client/rpc/DAGClientRPCImpl.java | 13 ++-
.../src/main/proto/DAGClientAMProtocol.proto | 1 +
.../tez/dag/api/client/rpc/TestDAGClient.java | 10 +--
.../tez/dag/api/client/DAGClientHandler.java | 5 ++
...DAGClientAMProtocolBlockingPBServerImpl.java | 3 +-
.../java/org/apache/tez/dag/app/dag/DAG.java | 3 +
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 29 +++++++
.../apache/tez/dag/api/client/MRDAGClient.java | 6 ++
13 files changed, 178 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0aae152..1dfe515 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1967. Add a monitoring API on DAGClient which returns after a time interval or on DAG final state change.
TEZ-2130. Send the sessionToken as part of the AM CLC.
TEZ-1935. Organization should be removed from http://tez.apache.org/team-list.html.
TEZ-2009. Change license/copyright headers to 2015.
http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 0bf78f9..8186f2a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1155,4 +1155,13 @@ public class TezConfiguration extends Configuration {
static Set<String> getPropertySet() {
return PropertyScope.keySet();
}
+
+ /**
+ * Long value
+ * Status Poll interval in Milliseconds used when getting DAG status with timeout.
+ */
+ @ConfigurationScope(Scope.DAG)
+ public static final String TEZ_DAG_STATUS_POLLINTERVAL_MS = TEZ_PREFIX
+ + "dag.status.pollinterval-ms";
+ public static final long TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT = 500;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index d3de10f..13c8ce6 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.tez.dag.api.TezException;
@@ -60,6 +61,22 @@ public abstract class DAGClient implements Closeable {
throws IOException, TezException;
/**
+ * Get the status of the specified DAG when it reaches a final state, or the timeout expires.
+ *
+ * @param statusOptions Optionally, retrieve additional information based on
+ * specified options. To retrieve basic information, this can be null
+ * @param timeout RPC call timeout. Value -1 waits for infinite and returns when
+ * DAG reaches final state
+ * @return DAG Status
+ * @throws IOException
+ * @throws TezException
+ */
+ @Unstable
+ public abstract DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+ long timeout)
+ throws IOException, TezException;
+
+ /**
* Get the status of a Vertex of a DAG
* @param statusOptions Optionally, retrieve additional information based on
* specified options
http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index 87e64cd..b5bb599 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -69,6 +70,7 @@ public class DAGClientImpl extends DAGClient {
private EnumSet<VertexStatus.State> vertexCompletionStates = EnumSet.of(
VertexStatus.State.SUCCEEDED, VertexStatus.State.FAILED, VertexStatus.State.KILLED,
VertexStatus.State.ERROR);
+ private long statusPollInterval;
public DAGClientImpl(ApplicationId appId, String dagId, TezConfiguration conf,
@Nullable FrameworkClient frameworkClient) {
@@ -92,6 +94,13 @@ public class DAGClientImpl extends DAGClient {
}
realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient);
+ statusPollInterval = conf.getLong(
+ TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS,
+ TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT);
+ if(statusPollInterval < 0) {
+ LOG.error("DAG Status poll interval cannot be negative and setting to default value.");
+ statusPollInterval = TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT;
+ }
}
@Override
@@ -105,14 +114,77 @@ public class DAGClientImpl extends DAGClient {
}
@Override
- public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions) throws
- TezException, IOException {
+ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+ final long timeout) throws TezException, IOException {
+ long currentStatusPollInterval = statusPollInterval;
+ if(timeout >= 0 && currentStatusPollInterval > timeout) {
+ currentStatusPollInterval = timeout;
+ }
+ DAGStatus dagStatus = null;
+ if(cachedDagStatus != null) {
+ dagStatus = cachedDagStatus;
+ } else {
+ dagStatus = getDAGStatus(statusOptions);
+ }
+ //Handling when client dag status init or submitted
+ if (dagStatus.getState() == DAGStatus.State.INITING
+ || dagStatus.getState() == DAGStatus.State.SUBMITTED) {
+ boolean initOrSubmittedState = true;
+ long timeoutTime = System.currentTimeMillis() + timeout;
+ while (timeout < 0
+ || (timeout > 0 && timeoutTime > System.currentTimeMillis())) {
+ if (initOrSubmittedState) {
+ dagStatus = getDAGStatus(statusOptions);
+ }
+ if (dagStatus.getState() == DAGStatus.State.RUNNING) {
+ initOrSubmittedState = false;
+ // When RUNNING State, Check for AM status is also RUNNING
+ DAGStatus dagStatusFromAM = getDAGStatusViaAM(statusOptions, 0);
+ if (dagStatusFromAM != null) {
+ if (dagStatusFromAM.getState() == DAGStatus.State.RUNNING) {
+ long remainingTimeout = 0;
+ if (timeout <= 0) {
+ remainingTimeout = timeout;
+ } else {
+ if (timeoutTime > System.currentTimeMillis()) {
+ remainingTimeout = timeoutTime - System.currentTimeMillis();
+ } else {
+ return dagStatusFromAM;
+ }
+ }
+ dagStatus = getDAGStatusInternal(statusOptions, remainingTimeout);
+ } else {
+ dagStatus = dagStatusFromAM;
+ }
+ break;
+ }
+ }
+ if(dagStatus.getState() == DAGStatus.State.SUCCEEDED
+ || dagStatus.getState() == DAGStatus.State.FAILED
+ || dagStatus.getState() == DAGStatus.State.KILLED
+ || dagStatus.getState() == DAGStatus.State.ERROR) {
+ break;
+ }
+ try {
+ Thread.sleep(currentStatusPollInterval);
+ } catch (InterruptedException e) {
+ throw new TezException(e);
+ }
+ }// End of while
+ return dagStatus;
+ } else {
+ return getDAGStatusInternal(statusOptions, timeout);
+ }
+ }
+
+ private DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts> statusOptions,
+ long timeout) throws TezException, IOException {
if (!dagCompleted) {
// fetch from AM. on Error and while DAG is still not completed (could not reach AM, AM got
// killed). return cached status. This prevents the progress being reset (for ex fetching from
// RM does not give status).
- final DAGStatus dagStatus = getDAGStatusViaAM(statusOptions);
+ final DAGStatus dagStatus = getDAGStatusViaAM(statusOptions, timeout);
if (!dagCompleted) {
if (dagStatus != null) {
@@ -155,6 +227,12 @@ public class DAGClientImpl extends DAGClient {
}
@Override
+ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions) throws
+ TezException, IOException {
+ return getDAGStatusInternal(statusOptions, 0);
+ }
+
+ @Override
public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions) throws
IOException, TezException {
@@ -228,11 +306,11 @@ public class DAGClientImpl extends DAGClient {
}
}
- private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions) throws
- IOException {
+ private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions,
+ long timeout) throws IOException {
DAGStatus dagStatus = null;
try {
- dagStatus = realClient.getDAGStatus(statusOptions);
+ dagStatus = realClient.getDAGStatus(statusOptions, timeout);
} catch (DAGNotRunningException e) {
dagCompleted = true;
} catch (TezException e) {
http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index ce88c93..702eead 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -505,4 +505,10 @@ public class DAGClientTimelineImpl extends DAGClient {
}
}
+ @Override
+ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+ long timeout) throws IOException, TezException {
+ return getDAGStatus(statusOptions);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 2ac89d5..74e3b53 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -81,9 +81,16 @@ public class DAGClientRPCImpl extends DAGClient {
@Override
public DAGStatus getDAGStatus(Set<StatusGetOpts> statusOptions)
throws IOException, TezException {
+ return getDAGStatus(statusOptions, 0);
+ }
+
+
+ @Override
+ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+ long timeout) throws IOException, TezException {
if(createAMProxyIfNeeded()) {
try {
- DAGStatus dagStatus = getDAGStatusViaAM(statusOptions);
+ DAGStatus dagStatus = getDAGStatusViaAM(statusOptions, timeout);
return dagStatus;
} catch (TezException e) {
resetProxy(e); // create proxy again
@@ -149,14 +156,14 @@ public class DAGClientRPCImpl extends DAGClient {
proxy = null;
}
- DAGStatus getDAGStatusViaAM(Set<StatusGetOpts> statusOptions)
+ DAGStatus getDAGStatusViaAM(Set<StatusGetOpts> statusOptions, long timeout)
throws IOException, TezException {
if(LOG.isDebugEnabled()) {
LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
}
GetDAGStatusRequestProto.Builder requestProtoBuilder =
GetDAGStatusRequestProto.newBuilder()
- .setDagId(dagId);
+ .setDagId(dagId).setTimeout(timeout);
if (statusOptions != null) {
requestProtoBuilder.addAllStatusOptions(
http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto
index aa0d938..143cded 100644
--- a/tez-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto
@@ -35,6 +35,7 @@ message GetAllDAGsResponseProto {
message GetDAGStatusRequestProto {
optional string dagId = 1;
repeated StatusGetOptsProto statusOptions = 3;
+ optional int64 timeout = 4;
}
message GetDAGStatusResponseProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index bad0d2e..c6894ef 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -207,13 +207,13 @@ public class TestDAGClient {
public void testDAGStatus() throws Exception{
DAGStatus resultDagStatus = dagClient.getDAGStatus(null);
verify(mockProxy, times(1)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
- .setDagId(dagIdStr).build());
+ .setDagId(dagIdStr).setTimeout(0).build());
assertEquals(new DAGStatus(dagStatusProtoWithoutCounters), resultDagStatus);
System.out.println("DAGStatusWithoutCounter:" + resultDagStatus);
resultDagStatus = dagClient.getDAGStatus(Sets.newSet(StatusGetOpts.GET_COUNTERS));
verify(mockProxy, times(1)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
- .setDagId(dagIdStr).addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build());
+ .setDagId(dagIdStr).setTimeout(0).addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build());
assertEquals(new DAGStatus(dagStatusProtoWithCounters), resultDagStatus);
System.out.println("DAGStatusWithCounter:" + resultDagStatus);
}
@@ -252,7 +252,7 @@ public class TestDAGClient {
dagClient.waitForCompletion();
verify(mockProxy, times(2)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
- .setDagId(dagIdStr).build());
+ .setDagId(dagIdStr).setTimeout(0).build());
}
@Test(timeout = 5000)
@@ -271,7 +271,7 @@ public class TestDAGClient {
// second & third time for check completion
dagClient.waitForCompletionWithStatusUpdates(null);
verify(mockProxy, times(3)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
- .setDagId(dagIdStr).build());
+ .setDagId(dagIdStr).setTimeout(0).build());
when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class)))
@@ -282,7 +282,7 @@ public class TestDAGClient {
.build());
dagClient.waitForCompletionWithStatusUpdates(Sets.newSet(StatusGetOpts.GET_COUNTERS));
verify(mockProxy, times(3)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
- .setDagId(dagIdStr).addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build());
+ .setDagId(dagIdStr).setTimeout(0).addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build());
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index d14ed2a..e40b208 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -62,6 +62,11 @@ public class DAGClientHandler {
return getDAG(dagIdStr).getDAGStatus(statusOptions);
}
+ public DAGStatus getDAGStatus(String dagIdStr,
+ Set<StatusGetOpts> statusOptions, long timeout) throws TezException {
+ return getDAG(dagIdStr).getDAGStatus(statusOptions, timeout);
+ }
+
public VertexStatus getVertexStatus(String dagIdStr, String vertexName,
Set<StatusGetOpts> statusOptions) throws TezException {
VertexStatus status =
http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index c054305..4c29b79 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -91,13 +91,14 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
UserGroupInformation user = getRPCUser();
try {
String dagId = request.getDagId();
+ long timeout = request.getTimeout();
if (!real.getACLManager(dagId).checkDAGViewAccess(user)) {
throw new AccessControlException("User " + user + " cannot perform DAG view operation");
}
DAGStatus status;
status = real.getDAGStatus(dagId,
DagTypeConverters.convertStatusGetOptsFromProto(
- request.getStatusOptionsList()));
+ request.getStatusOptionsList()), timeout);
assert status instanceof DAGStatusBuilder;
DAGStatusBuilder builder = (DAGStatusBuilder) status;
return GetDAGStatusResponseProto.newBuilder().
http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 8677015..1b64754 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
@@ -72,6 +73,8 @@ public interface DAG {
DAGPlan getJobPlan();
DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions);
+ DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions, long timeout)
+ throws TezException;
VertexStatusBuilder getVertexStatus(String vertexName,
Set<StatusGetOpts> statusOptions);
http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index dd38c29..d1dcfb1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -61,6 +61,7 @@ import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.Scope;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.client.DAGStatus;
@@ -386,6 +387,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
@VisibleForTesting
boolean recoveryCommitInProgress = false;
Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>();
+ long statusPollInterval;
static class VertexGroupInfo {
String groupName;
@@ -464,6 +466,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// instance variable.
stateMachine = stateMachineFactory.make(this);
this.entityUpdateTracker = new StateChangeNotifier(this);
+ statusPollInterval = dagConf.getLong(
+ TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS,
+ TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT);
+ if(statusPollInterval < 0) {
+ LOG.error("DAG Status poll interval cannot be negative and setting to default value.");
+ statusPollInterval = TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT;
+ }
}
protected StateMachine<DAGState, DAGEventType, DAGEvent> getStateMachine() {
@@ -730,6 +739,26 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
+ public DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions,
+ long timeout) throws TezException {
+ long currentStatusPollInterval = statusPollInterval;
+ if(timeout >= 0 && currentStatusPollInterval > timeout) {
+ currentStatusPollInterval = timeout;
+ }
+ long timeoutTime = System.currentTimeMillis() + timeout;
+ while (timeout < 0 || (timeout > 0 && timeoutTime > System.currentTimeMillis())) {
+ if(isComplete()) {
+ break;
+ }
+ try {
+ Thread.sleep(currentStatusPollInterval);
+ } catch (InterruptedException e) {
+ throw new TezException(e);
+ }
+ }
+ return getDAGStatus(statusOptions);
+ }
+
private ProgressBuilder getDAGProgress() {
int totalTaskCount = 0;
int totalSucceededTaskCount = 0;
http://git-wip-us.apache.org/repos/asf/tez/blob/9c8015c4/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java
index e05f25c..d743feb 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java
@@ -85,4 +85,10 @@ public class MRDAGClient extends DAGClient {
public void close() throws IOException {
realClient.close();
}
+
+ @Override
+ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+ long timeout) throws IOException, TezException {
+ return getDAGStatus(statusOptions);
+ }
}