You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/10/02 20:39:10 UTC

git commit: TEZ-397. Add support for getting status of a tez session. (hitesh)

Updated Branches:
  refs/heads/master 095ffeff4 -> 034e661a3


TEZ-397. Add support for getting status of a tez session. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/034e661a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/034e661a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/034e661a

Branch: refs/heads/master
Commit: 034e661a306308508194feaf2848cceb6da2f99b
Parents: 095ffef
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Oct 2 11:38:31 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Oct 2 11:38:31 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/client/TezClientUtils.java   |  1 -
 .../java/org/apache/tez/client/TezSession.java  | 43 +++++++++
 .../org/apache/tez/client/TezSessionStatus.java | 30 ++++++
 .../apache/tez/dag/api/DagTypeConverters.java   | 96 +++++++++++++-------
 .../src/main/proto/DAGClientAMProtocol.proto    | 15 +++
 ...DAGClientAMProtocolBlockingPBServerImpl.java | 17 ++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 23 +++++
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 13 +++
 8 files changed, 206 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index ff07142..93d51d1 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -558,5 +558,4 @@ public class TezClientUtils {
     return proxy;
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index c3a6e75..ebdc20f 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -25,12 +25,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DAGSubmissionTimedOut;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -38,11 +40,15 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 
+import sun.security.provider.certpath.OCSPResponse.ResponseStatus;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
 
@@ -228,4 +234,41 @@ public class TezSession {
   public synchronized ApplicationId getApplicationId() {
     return applicationId;
   }
+
+  public TezSessionStatus getSessionStatus() throws TezException, IOException {
+    try {
+      ApplicationReport appReport = yarnClient.getApplicationReport(
+          applicationId);
+      switch (appReport.getYarnApplicationState()) {
+      case NEW:
+      case NEW_SAVING:
+      case ACCEPTED:
+      case SUBMITTED:
+        return TezSessionStatus.INITIALIZING;
+      case FINISHED:
+      case FAILED:
+      case KILLED:
+        return TezSessionStatus.SHUTDOWN;
+      case RUNNING:
+        try {
+          DAGClientAMProtocolBlockingPB proxy = TezClientUtils.getSessionAMProxy(
+              yarnClient, sessionConfig.getYarnConfiguration(), applicationId);
+          if (proxy == null) {
+            return TezSessionStatus.INITIALIZING;
+          }
+          GetAMStatusResponseProto response = proxy.getAMStatus(null,
+              GetAMStatusRequestProto.newBuilder().build());
+          return DagTypeConverters.convertTezSessionStatusFromProto(
+              response.getStatus());
+        } catch (TezException e) {
+          LOG.info("Failed to retrieve AM Status via proxy", e);
+        } catch (ServiceException e) {
+          LOG.info("Failed to retrieve AM Status via proxy", e);
+        }
+      }
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+    return TezSessionStatus.INITIALIZING;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-api/src/main/java/org/apache/tez/client/TezSessionStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSessionStatus.java b/tez-api/src/main/java/org/apache/tez/client/TezSessionStatus.java
new file mode 100644
index 0000000..3d95482
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSessionStatus.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+public enum TezSessionStatus {
+  /** Session is initializing itself */
+  INITIALIZING,
+  /** Session ready to receive DAG submissions */
+  READY,
+  /** Session is running a DAG */
+  RUNNING,
+  /** Session has shut down or is in the process of shutting down. */
+  SHUTDOWN
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index f14473b..9c72c3d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -31,10 +31,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezSessionStatusProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.NamedDescriptorProto;
@@ -51,79 +53,78 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 
 import com.google.protobuf.ByteString;
 
-
 public class DagTypeConverters {
-  
+
   public static PlanLocalResourceVisibility convertToDAGPlan(LocalResourceVisibility visibility){
     switch(visibility){
-      case PUBLIC : return PlanLocalResourceVisibility.PUBLIC;  
+      case PUBLIC : return PlanLocalResourceVisibility.PUBLIC;
       case PRIVATE : return PlanLocalResourceVisibility.PRIVATE;
       case APPLICATION : return PlanLocalResourceVisibility.APPLICATION;
       default : throw new RuntimeException("unknown 'visibility': " + visibility);
     }
   }
-  
+
   public static LocalResourceVisibility convertFromDAGPlan(PlanLocalResourceVisibility visibility){
     switch(visibility){
-      case PUBLIC : return LocalResourceVisibility.PUBLIC;  
+      case PUBLIC : return LocalResourceVisibility.PUBLIC;
       case PRIVATE : return LocalResourceVisibility.PRIVATE;
       case APPLICATION : return LocalResourceVisibility.APPLICATION;
       default : throw new RuntimeException("unknown 'visibility': " + visibility);
     }
   }
-  
+
   public static PlanEdgeDataSourceType convertToDAGPlan(DataSourceType sourceType){
     switch(sourceType){
-      case PERSISTED : return PlanEdgeDataSourceType.PERSISTED;  
+      case PERSISTED : return PlanEdgeDataSourceType.PERSISTED;
       case PERSISTED_RELIABLE : return PlanEdgeDataSourceType.PERSISTED_RELIABLE;
       case EPHEMERAL :  return PlanEdgeDataSourceType.EPHEMERAL;
       default : throw new RuntimeException("unknown 'dataSourceType': " + sourceType);
     }
   }
-  
+
   public static DataSourceType convertFromDAGPlan(PlanEdgeDataSourceType sourceType){
     switch(sourceType){
-      case PERSISTED : return DataSourceType.PERSISTED;  
+      case PERSISTED : return DataSourceType.PERSISTED;
       case PERSISTED_RELIABLE : return DataSourceType.PERSISTED_RELIABLE;
       case EPHEMERAL :  return DataSourceType.EPHEMERAL;
       default : throw new RuntimeException("unknown 'dataSourceType': " + sourceType);
     }
   }
-  
+
   public static PlanEdgeDataMovementType convertToDAGPlan(DataMovementType type){
     switch(type){
-      case ONE_TO_ONE : return PlanEdgeDataMovementType.ONE_TO_ONE;  
+      case ONE_TO_ONE : return PlanEdgeDataMovementType.ONE_TO_ONE;
       case BROADCAST : return PlanEdgeDataMovementType.BROADCAST;
       case SCATTER_GATHER : return PlanEdgeDataMovementType.SCATTER_GATHER;
       default : throw new RuntimeException("unknown 'dataMovementType': " + type);
     }
   }
-  
+
   public static DataMovementType convertFromDAGPlan(PlanEdgeDataMovementType type){
     switch(type){
-      case ONE_TO_ONE : return DataMovementType.ONE_TO_ONE;  
+      case ONE_TO_ONE : return DataMovementType.ONE_TO_ONE;
       case BROADCAST : return DataMovementType.BROADCAST;
       case SCATTER_GATHER : return DataMovementType.SCATTER_GATHER;
       default : throw new IllegalArgumentException("unknown 'dataMovementType': " + type);
     }
   }
-  
+
   public static PlanEdgeSchedulingType convertToDAGPlan(SchedulingType type){
     switch(type){
-      case SEQUENTIAL : return PlanEdgeSchedulingType.SEQUENTIAL;  
+      case SEQUENTIAL : return PlanEdgeSchedulingType.SEQUENTIAL;
       case CONCURRENT : return PlanEdgeSchedulingType.CONCURRENT;
       default : throw new RuntimeException("unknown 'SchedulingType': " + type);
     }
   }
-  
+
   public static SchedulingType convertFromDAGPlan(PlanEdgeSchedulingType type){
     switch(type){
-      case SEQUENTIAL : return SchedulingType.SEQUENTIAL;  
+      case SEQUENTIAL : return SchedulingType.SEQUENTIAL;
       case CONCURRENT : return SchedulingType.CONCURRENT;
       default : throw new IllegalArgumentException("unknown 'SchedulingType': " + type);
     }
   }
-  
+
   public static PlanLocalResourceType convertToDAGPlan(LocalResourceType type) {
     switch(type){
     case ARCHIVE : return PlanLocalResourceType.ARCHIVE;
@@ -132,7 +133,7 @@ public class DagTypeConverters {
     default : throw new IllegalArgumentException("unknown 'type': " + type);
     }
   }
-  
+
   public static LocalResourceType convertFromDAGPlan(PlanLocalResourceType type) {
     switch(type){
     case ARCHIVE : return LocalResourceType.ARCHIVE;
@@ -145,8 +146,8 @@ public class DagTypeConverters {
   public static VertexLocationHint convertFromDAGPlan(
       List<PlanTaskLocationHint> locationHints) {
 
-    List<TaskLocationHint> outputList = new ArrayList<TaskLocationHint>();  
-    
+    List<TaskLocationHint> outputList = new ArrayList<TaskLocationHint>();
+
     for(PlanTaskLocationHint inputHint : locationHints){
       TaskLocationHint outputHint = new TaskLocationHint(
           new HashSet<String>(inputHint.getHostList()),
@@ -157,14 +158,14 @@ public class DagTypeConverters {
   }
 
   // notes re HDFS URL handling:
-  //   Resource URLs in the protobuf message are strings of the form hdfs://host:port/path 
+  //   Resource URLs in the protobuf message are strings of the form hdfs://host:port/path
   //   org.apache.hadoop.fs.Path.Path  is actually a URI type that allows any scheme
   //   org.apache.hadoop.yarn.api.records.URL is a URL type used by YARN.
   //   java.net.URL cannot be used out of the box as it rejects unknown schemes such as HDFS.
-  
+
   public static String convertToDAGPlan(URL resource) {
     // see above notes on HDFS URL handling
-    String out = resource.getScheme() + "://" + resource.getHost() + ":" + resource.getPort() 
+    String out = resource.getScheme() + "://" + resource.getHost() + ":" + resource.getPort()
         + resource.getFile();
     return out;
   }
@@ -174,7 +175,7 @@ public class DagTypeConverters {
     Map<String, LocalResource> map = new HashMap<String, LocalResource>();
     for(PlanLocalResource res : localResourcesList){
       LocalResource r = new LocalResourcePBImpl();
-      
+
       //NOTE: have to check every optional field in protobuf generated classes for existence before accessing
       //else we will receive a default value back, eg ""
       if(res.hasPattern()){
@@ -191,16 +192,16 @@ public class DagTypeConverters {
   }
 
   public static Map<String, String> createEnvironmentMapFromDAGPlan(
-      List<PlanKeyValuePair> environmentSettingList) {  
-      
+      List<PlanKeyValuePair> environmentSettingList) {
+
     Map<String, String> map = new HashMap<String, String>();
     for(PlanKeyValuePair setting : environmentSettingList){
       map.put(setting.getKey(), setting.getValue());
     }
-    
+
     return map;
   }
-  
+
   public static Map<String, EdgePlan> createEdgePlanMapFromDAGPlan(List<EdgePlan> edgeList){
     Map<String, EdgePlan> edgePlanMap =
         new HashMap<String, EdgePlan>();
@@ -209,7 +210,7 @@ public class DagTypeConverters {
     }
     return edgePlanMap;
   }
-  
+
   public static EdgeProperty createEdgePropertyMapFromDAGPlan(EdgePlan edge) {
     return new EdgeProperty(
         convertFromDAGPlan(edge.getDataMovementType()),
@@ -246,7 +247,7 @@ public class DagTypeConverters {
     }
     return builder.build();
   }
-  
+
   public static NamedDescriptorProto convertToDAGPlan(
       NamedDescriptor<? extends TezEntityDescriptor> descriptor) {
     NamedDescriptorProto.Builder builder = NamedDescriptorProto.newBuilder();
@@ -284,4 +285,37 @@ public class DagTypeConverters {
     }
     return new ProcessorDescriptor(className).setUserPayload(bb);
   }
+
+  public static TezSessionStatus convertTezSessionStatusFromProto(
+      TezSessionStatusProto proto) {
+    switch (proto) {
+    case INITIALIZING:
+      return TezSessionStatus.INITIALIZING;
+    case READY:
+      return TezSessionStatus.READY;
+    case RUNNING:
+      return TezSessionStatus.RUNNING;
+    case SHUTDOWN:
+      return TezSessionStatus.SHUTDOWN;
+    }
+    throw new TezUncheckedException("Could not convert to TezSessionStatus from"
+        + " proto");
+  }
+
+  public static TezSessionStatusProto convertTezSessionStatusToProto(
+      TezSessionStatus status) {
+    switch (status) {
+    case INITIALIZING:
+      return TezSessionStatusProto.INITIALIZING;
+    case READY:
+      return TezSessionStatusProto.READY;
+    case RUNNING:
+      return TezSessionStatusProto.RUNNING;
+    case SHUTDOWN:
+      return TezSessionStatusProto.SHUTDOWN;
+    }
+    throw new TezUncheckedException("Could not convert TezSessionStatus to"
+        + " proto");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto
index 6fcd1f8..1236190 100644
--- a/tez-api/src/main/proto/DAGClientAMProtocol.proto
+++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto
@@ -71,6 +71,20 @@ message ShutdownSessionRequestProto {
 message ShutdownSessionResponseProto {
 }
 
+enum TezSessionStatusProto {
+  INITIALIZING = 0;
+  READY = 1;
+  RUNNING = 2;
+  SHUTDOWN = 3;
+}
+
+message GetAMStatusRequestProto {
+}
+
+message GetAMStatusResponseProto {
+  required TezSessionStatusProto status = 1;
+}
+
 service DAGClientAMProtocol {
   rpc getAllDAGs (GetAllDAGsRequestProto) returns (GetAllDAGsResponseProto);
   rpc getDAGStatus (GetDAGStatusRequestProto) returns (GetDAGStatusResponseProto);
@@ -78,4 +92,5 @@ service DAGClientAMProtocol {
   rpc tryKillDAG (TryKillDAGRequestProto) returns (TryKillDAGResponseProto);
   rpc submitDAG (SubmitDAGRequestProto) returns (SubmitDAGResponseProto);
   rpc shutdownSession (ShutdownSessionRequestProto) returns (ShutdownSessionResponseProto);
+  rpc getAMStatus (GetAMStatusRequestProto) returns (GetAMStatusResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index 1b6d562..eb1ff48 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -20,12 +20,16 @@ package org.apache.tez.dag.api.client.rpc;
 
 import java.util.List;
 
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAllDAGsRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAllDAGsResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
@@ -131,4 +135,17 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements
     return ShutdownSessionResponseProto.newBuilder().build();
   }
 
+  @Override
+  public GetAMStatusResponseProto getAMStatus(RpcController controller,
+      GetAMStatusRequestProto request) throws ServiceException {
+    try {
+      TezSessionStatus sessionStatus = real.getSessionStatus();
+      return GetAMStatusResponseProto.newBuilder().setStatus(
+          DagTypeConverters.convertTezSessionStatusToProto(sessionStatus))
+          .build();
+    } catch(TezException e) {
+      throw wrapException(e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 85ebbfa..8d6fd1f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
@@ -750,6 +751,28 @@ public class DAGAppMaster extends AbstractService {
       LOG.info("Received message to shutdown AM");
       shutdownTezAM();
     }
+
+    public synchronized TezSessionStatus getSessionStatus() throws TezException {
+      if (!isSession) {
+        throw new TezException("Unsupported operation as AM not running in"
+            + " session mode");
+      }
+      switch (state) {
+      case NEW:
+      case INITED:
+        return TezSessionStatus.INITIALIZING;
+      case IDLE:
+        return TezSessionStatus.READY;
+      case RUNNING:
+        return TezSessionStatus.RUNNING;
+      case ERROR:
+      case FAILED:
+      case SUCCEEDED:
+      case KILLED:
+        return TezSessionStatus.SHUTDOWN;
+      }
+      return TezSessionStatus.INITIALIZING;
+    }
   }
 
   private class RunningAppContext implements AppContext {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034e661a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index e2f2856..d9d56bf 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -54,6 +54,7 @@ import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezSession;
 import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -243,15 +244,25 @@ public class TestMRRJobsDAGApi {
         new TezSessionConfiguration(amConfig, tezConf);
     TezSession tezSession = new TezSession("testsession", tezSessionConfig);
     tezSession.start();
+    Assert.assertEquals(TezSessionStatus.INITIALIZING,
+        tezSession.getSessionStatus());
 
     State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
         tezSession);
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    Assert.assertEquals(TezSessionStatus.READY,
+        tezSession.getSessionStatus());
     finalState = testMRRSleepJobDagSubmitCore(true, false, false,
         tezSession);
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    Assert.assertEquals(TezSessionStatus.READY,
+        tezSession.getSessionStatus());
+
     ApplicationId appId = tezSession.getApplicationId();
     tezSession.stop();
+    Assert.assertEquals(TezSessionStatus.SHUTDOWN,
+        tezSession.getSessionStatus());
+
     YarnClient yarnClient = YarnClient.createYarnClient();
     yarnClient.init(mrrTezCluster.getConfig());
     yarnClient.start();
@@ -533,6 +544,8 @@ public class TestMRRJobsDAGApi {
       LOG.info("Submitting dag to tez session with appId="
           + tezSession.getApplicationId());
       dagClient = tezSession.submitDAG(dag);
+      Assert.assertEquals(TezSessionStatus.RUNNING,
+          tezSession.getSessionStatus());
     }
     DAGStatus dagStatus = dagClient.getDAGStatus();
     while (!dagStatus.isCompleted()) {