You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/10/02 20:39:10 UTC
git commit: TEZ-397. Add support for getting status of a tez session.
(hitesh)
Updated Branches:
refs/heads/master 095ffeff4 -> 034e661a3
TEZ-397. Add support for getting status of a tez session. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/034e661a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/034e661a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/034e661a
Branch: refs/heads/master
Commit: 034e661a306308508194feaf2848cceb6da2f99b
Parents: 095ffef
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Oct 2 11:38:31 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Oct 2 11:38:31 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/client/TezClientUtils.java | 1 -
.../java/org/apache/tez/client/TezSession.java | 43 +++++++++
.../org/apache/tez/client/TezSessionStatus.java | 30 ++++++
.../apache/tez/dag/api/DagTypeConverters.java | 96 +++++++++++++-------
.../src/main/proto/DAGClientAMProtocol.proto | 15 +++
...DAGClientAMProtocolBlockingPBServerImpl.java | 17 ++++
.../org/apache/tez/dag/app/DAGAppMaster.java | 23 +++++
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 13 +++
8 files changed, 206 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index ff07142..93d51d1 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -558,5 +558,4 @@ public class TezClientUtils {
return proxy;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index c3a6e75..ebdc20f 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -25,12 +25,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DAGSubmissionTimedOut;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
@@ -38,11 +40,15 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import sun.security.provider.certpath.OCSPResponse.ResponseStatus;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
@@ -228,4 +234,41 @@ public class TezSession {
public synchronized ApplicationId getApplicationId() {
return applicationId;
}
+
+ public TezSessionStatus getSessionStatus() throws TezException, IOException {
+ try {
+ ApplicationReport appReport = yarnClient.getApplicationReport(
+ applicationId);
+ switch (appReport.getYarnApplicationState()) {
+ case NEW:
+ case NEW_SAVING:
+ case ACCEPTED:
+ case SUBMITTED:
+ return TezSessionStatus.INITIALIZING;
+ case FINISHED:
+ case FAILED:
+ case KILLED:
+ return TezSessionStatus.SHUTDOWN;
+ case RUNNING:
+ try {
+ DAGClientAMProtocolBlockingPB proxy = TezClientUtils.getSessionAMProxy(
+ yarnClient, sessionConfig.getYarnConfiguration(), applicationId);
+ if (proxy == null) {
+ return TezSessionStatus.INITIALIZING;
+ }
+ GetAMStatusResponseProto response = proxy.getAMStatus(null,
+ GetAMStatusRequestProto.newBuilder().build());
+ return DagTypeConverters.convertTezSessionStatusFromProto(
+ response.getStatus());
+ } catch (TezException e) {
+ LOG.info("Failed to retrieve AM Status via proxy", e);
+ } catch (ServiceException e) {
+ LOG.info("Failed to retrieve AM Status via proxy", e);
+ }
+ }
+ } catch (YarnException e) {
+ throw new TezException(e);
+ }
+ return TezSessionStatus.INITIALIZING;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-api/src/main/java/org/apache/tez/client/TezSessionStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSessionStatus.java b/tez-api/src/main/java/org/apache/tez/client/TezSessionStatus.java
new file mode 100644
index 0000000..3d95482
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSessionStatus.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+public enum TezSessionStatus {
+ /** Session is initializing itself */
+ INITIALIZING,
+ /** Session ready to receive DAG submissions */
+ READY,
+ /** Session is running a DAG */
+ RUNNING,
+ /** Session has shut down or is in the process of shutting down. */
+ SHUTDOWN
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index f14473b..9c72c3d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -31,10 +31,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.TezSessionStatus;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezSessionStatusProto;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.NamedDescriptorProto;
@@ -51,79 +53,78 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import com.google.protobuf.ByteString;
-
public class DagTypeConverters {
-
+
public static PlanLocalResourceVisibility convertToDAGPlan(LocalResourceVisibility visibility){
switch(visibility){
- case PUBLIC : return PlanLocalResourceVisibility.PUBLIC;
+ case PUBLIC : return PlanLocalResourceVisibility.PUBLIC;
case PRIVATE : return PlanLocalResourceVisibility.PRIVATE;
case APPLICATION : return PlanLocalResourceVisibility.APPLICATION;
default : throw new RuntimeException("unknown 'visibility': " + visibility);
}
}
-
+
public static LocalResourceVisibility convertFromDAGPlan(PlanLocalResourceVisibility visibility){
switch(visibility){
- case PUBLIC : return LocalResourceVisibility.PUBLIC;
+ case PUBLIC : return LocalResourceVisibility.PUBLIC;
case PRIVATE : return LocalResourceVisibility.PRIVATE;
case APPLICATION : return LocalResourceVisibility.APPLICATION;
default : throw new RuntimeException("unknown 'visibility': " + visibility);
}
}
-
+
public static PlanEdgeDataSourceType convertToDAGPlan(DataSourceType sourceType){
switch(sourceType){
- case PERSISTED : return PlanEdgeDataSourceType.PERSISTED;
+ case PERSISTED : return PlanEdgeDataSourceType.PERSISTED;
case PERSISTED_RELIABLE : return PlanEdgeDataSourceType.PERSISTED_RELIABLE;
case EPHEMERAL : return PlanEdgeDataSourceType.EPHEMERAL;
default : throw new RuntimeException("unknown 'dataSourceType': " + sourceType);
}
}
-
+
public static DataSourceType convertFromDAGPlan(PlanEdgeDataSourceType sourceType){
switch(sourceType){
- case PERSISTED : return DataSourceType.PERSISTED;
+ case PERSISTED : return DataSourceType.PERSISTED;
case PERSISTED_RELIABLE : return DataSourceType.PERSISTED_RELIABLE;
case EPHEMERAL : return DataSourceType.EPHEMERAL;
default : throw new RuntimeException("unknown 'dataSourceType': " + sourceType);
}
}
-
+
public static PlanEdgeDataMovementType convertToDAGPlan(DataMovementType type){
switch(type){
- case ONE_TO_ONE : return PlanEdgeDataMovementType.ONE_TO_ONE;
+ case ONE_TO_ONE : return PlanEdgeDataMovementType.ONE_TO_ONE;
case BROADCAST : return PlanEdgeDataMovementType.BROADCAST;
case SCATTER_GATHER : return PlanEdgeDataMovementType.SCATTER_GATHER;
default : throw new RuntimeException("unknown 'dataMovementType': " + type);
}
}
-
+
public static DataMovementType convertFromDAGPlan(PlanEdgeDataMovementType type){
switch(type){
- case ONE_TO_ONE : return DataMovementType.ONE_TO_ONE;
+ case ONE_TO_ONE : return DataMovementType.ONE_TO_ONE;
case BROADCAST : return DataMovementType.BROADCAST;
case SCATTER_GATHER : return DataMovementType.SCATTER_GATHER;
default : throw new IllegalArgumentException("unknown 'dataMovementType': " + type);
}
}
-
+
public static PlanEdgeSchedulingType convertToDAGPlan(SchedulingType type){
switch(type){
- case SEQUENTIAL : return PlanEdgeSchedulingType.SEQUENTIAL;
+ case SEQUENTIAL : return PlanEdgeSchedulingType.SEQUENTIAL;
case CONCURRENT : return PlanEdgeSchedulingType.CONCURRENT;
default : throw new RuntimeException("unknown 'SchedulingType': " + type);
}
}
-
+
public static SchedulingType convertFromDAGPlan(PlanEdgeSchedulingType type){
switch(type){
- case SEQUENTIAL : return SchedulingType.SEQUENTIAL;
+ case SEQUENTIAL : return SchedulingType.SEQUENTIAL;
case CONCURRENT : return SchedulingType.CONCURRENT;
default : throw new IllegalArgumentException("unknown 'SchedulingType': " + type);
}
}
-
+
public static PlanLocalResourceType convertToDAGPlan(LocalResourceType type) {
switch(type){
case ARCHIVE : return PlanLocalResourceType.ARCHIVE;
@@ -132,7 +133,7 @@ public class DagTypeConverters {
default : throw new IllegalArgumentException("unknown 'type': " + type);
}
}
-
+
public static LocalResourceType convertFromDAGPlan(PlanLocalResourceType type) {
switch(type){
case ARCHIVE : return LocalResourceType.ARCHIVE;
@@ -145,8 +146,8 @@ public class DagTypeConverters {
public static VertexLocationHint convertFromDAGPlan(
List<PlanTaskLocationHint> locationHints) {
- List<TaskLocationHint> outputList = new ArrayList<TaskLocationHint>();
-
+ List<TaskLocationHint> outputList = new ArrayList<TaskLocationHint>();
+
for(PlanTaskLocationHint inputHint : locationHints){
TaskLocationHint outputHint = new TaskLocationHint(
new HashSet<String>(inputHint.getHostList()),
@@ -157,14 +158,14 @@ public class DagTypeConverters {
}
// notes re HDFS URL handling:
- // Resource URLs in the protobuf message are strings of the form hdfs://host:port/path
+ // Resource URLs in the protobuf message are strings of the form hdfs://host:port/path
// org.apache.hadoop.fs.Path.Path is actually a URI type that allows any scheme
// org.apache.hadoop.yarn.api.records.URL is a URL type used by YARN.
// java.net.URL cannot be used out of the box as it rejects unknown schemes such as HDFS.
-
+
public static String convertToDAGPlan(URL resource) {
// see above notes on HDFS URL handling
- String out = resource.getScheme() + "://" + resource.getHost() + ":" + resource.getPort()
+ String out = resource.getScheme() + "://" + resource.getHost() + ":" + resource.getPort()
+ resource.getFile();
return out;
}
@@ -174,7 +175,7 @@ public class DagTypeConverters {
Map<String, LocalResource> map = new HashMap<String, LocalResource>();
for(PlanLocalResource res : localResourcesList){
LocalResource r = new LocalResourcePBImpl();
-
+
//NOTE: have to check every optional field in protobuf generated classes for existence before accessing
//else we will receive a default value back, eg ""
if(res.hasPattern()){
@@ -191,16 +192,16 @@ public class DagTypeConverters {
}
public static Map<String, String> createEnvironmentMapFromDAGPlan(
- List<PlanKeyValuePair> environmentSettingList) {
-
+ List<PlanKeyValuePair> environmentSettingList) {
+
Map<String, String> map = new HashMap<String, String>();
for(PlanKeyValuePair setting : environmentSettingList){
map.put(setting.getKey(), setting.getValue());
}
-
+
return map;
}
-
+
public static Map<String, EdgePlan> createEdgePlanMapFromDAGPlan(List<EdgePlan> edgeList){
Map<String, EdgePlan> edgePlanMap =
new HashMap<String, EdgePlan>();
@@ -209,7 +210,7 @@ public class DagTypeConverters {
}
return edgePlanMap;
}
-
+
public static EdgeProperty createEdgePropertyMapFromDAGPlan(EdgePlan edge) {
return new EdgeProperty(
convertFromDAGPlan(edge.getDataMovementType()),
@@ -246,7 +247,7 @@ public class DagTypeConverters {
}
return builder.build();
}
-
+
public static NamedDescriptorProto convertToDAGPlan(
NamedDescriptor<? extends TezEntityDescriptor> descriptor) {
NamedDescriptorProto.Builder builder = NamedDescriptorProto.newBuilder();
@@ -284,4 +285,37 @@ public class DagTypeConverters {
}
return new ProcessorDescriptor(className).setUserPayload(bb);
}
+
+ public static TezSessionStatus convertTezSessionStatusFromProto(
+ TezSessionStatusProto proto) {
+ switch (proto) {
+ case INITIALIZING:
+ return TezSessionStatus.INITIALIZING;
+ case READY:
+ return TezSessionStatus.READY;
+ case RUNNING:
+ return TezSessionStatus.RUNNING;
+ case SHUTDOWN:
+ return TezSessionStatus.SHUTDOWN;
+ }
+ throw new TezUncheckedException("Could not convert to TezSessionStatus from"
+ + " proto");
+ }
+
+ public static TezSessionStatusProto convertTezSessionStatusToProto(
+ TezSessionStatus status) {
+ switch (status) {
+ case INITIALIZING:
+ return TezSessionStatusProto.INITIALIZING;
+ case READY:
+ return TezSessionStatusProto.READY;
+ case RUNNING:
+ return TezSessionStatusProto.RUNNING;
+ case SHUTDOWN:
+ return TezSessionStatusProto.SHUTDOWN;
+ }
+ throw new TezUncheckedException("Could not convert TezSessionStatus to"
+ + " proto");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto
index 6fcd1f8..1236190 100644
--- a/tez-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto
@@ -71,6 +71,20 @@ message ShutdownSessionRequestProto {
message ShutdownSessionResponseProto {
}
+enum TezSessionStatusProto {
+ INITIALIZING = 0;
+ READY = 1;
+ RUNNING = 2;
+ SHUTDOWN = 3;
+}
+
+message GetAMStatusRequestProto {
+}
+
+message GetAMStatusResponseProto {
+ required TezSessionStatusProto status = 1;
+}
+
service DAGClientAMProtocol {
rpc getAllDAGs (GetAllDAGsRequestProto) returns (GetAllDAGsResponseProto);
rpc getDAGStatus (GetDAGStatusRequestProto) returns (GetDAGStatusResponseProto);
@@ -78,4 +92,5 @@ service DAGClientAMProtocol {
rpc tryKillDAG (TryKillDAGRequestProto) returns (TryKillDAGResponseProto);
rpc submitDAG (SubmitDAGRequestProto) returns (SubmitDAGResponseProto);
rpc shutdownSession (ShutdownSessionRequestProto) returns (ShutdownSessionResponseProto);
+ rpc getAMStatus (GetAMStatusRequestProto) returns (GetAMStatusResponseProto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index 1b6d562..eb1ff48 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -20,12 +20,16 @@ package org.apache.tez.dag.api.client.rpc;
import java.util.List;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAllDAGsRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAllDAGsResponseProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
@@ -131,4 +135,17 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
return ShutdownSessionResponseProto.newBuilder().build();
}
+ @Override
+ public GetAMStatusResponseProto getAMStatus(RpcController controller,
+ GetAMStatusRequestProto request) throws ServiceException {
+ try {
+ TezSessionStatus sessionStatus = real.getSessionStatus();
+ return GetAMStatusResponseProto.newBuilder().setStatus(
+ DagTypeConverters.convertTezSessionStatusToProto(sessionStatus))
+ .build();
+ } catch(TezException e) {
+ throw wrapException(e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 85ebbfa..8d6fd1f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.client.TezSessionStatus;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
@@ -750,6 +751,28 @@ public class DAGAppMaster extends AbstractService {
LOG.info("Received message to shutdown AM");
shutdownTezAM();
}
+
+ public synchronized TezSessionStatus getSessionStatus() throws TezException {
+ if (!isSession) {
+ throw new TezException("Unsupported operation as AM not running in"
+ + " session mode");
+ }
+ switch (state) {
+ case NEW:
+ case INITED:
+ return TezSessionStatus.INITIALIZING;
+ case IDLE:
+ return TezSessionStatus.READY;
+ case RUNNING:
+ return TezSessionStatus.RUNNING;
+ case ERROR:
+ case FAILED:
+ case SUCCEEDED:
+ case KILLED:
+ return TezSessionStatus.SHUTDOWN;
+ }
+ return TezSessionStatus.INITIALIZING;
+ }
}
private class RunningAppContext implements AppContext {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index e2f2856..d9d56bf 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -54,6 +54,7 @@ import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezSession;
import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
@@ -243,15 +244,25 @@ public class TestMRRJobsDAGApi {
new TezSessionConfiguration(amConfig, tezConf);
TezSession tezSession = new TezSession("testsession", tezSessionConfig);
tezSession.start();
+ Assert.assertEquals(TezSessionStatus.INITIALIZING,
+ tezSession.getSessionStatus());
State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
tezSession);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ Assert.assertEquals(TezSessionStatus.READY,
+ tezSession.getSessionStatus());
finalState = testMRRSleepJobDagSubmitCore(true, false, false,
tezSession);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ Assert.assertEquals(TezSessionStatus.READY,
+ tezSession.getSessionStatus());
+
ApplicationId appId = tezSession.getApplicationId();
tezSession.stop();
+ Assert.assertEquals(TezSessionStatus.SHUTDOWN,
+ tezSession.getSessionStatus());
+
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(mrrTezCluster.getConfig());
yarnClient.start();
@@ -533,6 +544,8 @@ public class TestMRRJobsDAGApi {
LOG.info("Submitting dag to tez session with appId="
+ tezSession.getApplicationId());
dagClient = tezSession.submitDAG(dag);
+ Assert.assertEquals(TezSessionStatus.RUNNING,
+ tezSession.getSessionStatus());
}
DAGStatus dagStatus = dagClient.getDAGStatus();
while (!dagStatus.isCompleted()) {