You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2017/01/10 03:20:14 UTC

tez git commit: TEZ-3551: FrameworkClient created twice causing minor delay (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 89d47c325 -> d67faeb37


TEZ-3551: FrameworkClient created twice causing minor delay (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d67faeb3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d67faeb3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d67faeb3

Branch: refs/heads/master
Commit: d67faeb37c8d2600449431d4ab5dac1bfcd7b1fa
Parents: 89d47c3
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Jan 10 08:49:59 2017 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue Jan 10 08:49:59 2017 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/client/FrameworkClient.java  |  2 +
 .../java/org/apache/tez/client/TezClient.java   | 18 +++++++--
 .../org/apache/tez/client/TezYarnClient.java    |  9 +++++
 .../tez/dag/api/client/DAGClientImpl.java       | 23 +++++------
 .../dag/api/client/rpc/DAGClientRPCImpl.java    | 13 ++++++-
 .../tez/dag/api/client/rpc/TestDAGClient.java   | 23 +++++++----
 .../java/org/apache/tez/client/LocalClient.java |  5 +++
 .../org/apache/tez/test/TestFaultTolerance.java | 40 ++++++++++++++++++--
 9 files changed, 107 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6aa66f3..0fe0c88 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3551. FrameworkClient created twice causing minor delay.
   TEZ-3566. Avoid caching fs isntances in TokenCache after a point.
   TEZ-3568. Update SecurityUtils configuration to pick user provided configuration.
   TEZ-3561. Fix wrong tez tarball name in install.md.

http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
index cb20f49..b3e084c 100644
--- a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
@@ -78,4 +78,6 @@ public abstract class FrameworkClient {
 
   public abstract ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException;
 
+  public abstract boolean isRunning() throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 29e7a8b..f4e9f10 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -547,7 +547,6 @@ public class TezClient {
       }
     }
 
-    TezConfiguration dagClientConf = new TezConfiguration(amConfig.getTezConfiguration());
     Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
     DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
         usingTezArchiveDeploy, sessionCredentials, servicePluginsDescriptor, javaOptsChecker);
@@ -613,7 +612,9 @@ public class TezClient {
         + ", dagId=" + dagId
         + ", dagName=" + dag.getName());
     return new DAGClientImpl(sessionAppId, dagId,
-        dagClientConf, frameworkClient);
+        amConfig.getTezConfiguration(),
+        amConfig.getYarnConfiguration(),
+        frameworkClient);
   }
 
   /**
@@ -1030,7 +1031,8 @@ public class TezClient {
     }
     // wait for dag in non-session mode to start running, so that we can start to getDAGStatus
     waitNonSessionTillReady();
-    return getDAGClient(appId, amConfig.getTezConfiguration(), frameworkClient);
+    return getDAGClient(appId, amConfig.getTezConfiguration(), amConfig.getYarnConfiguration(),
+        frameworkClient);
   }
 
   private ApplicationId createApplication() throws TezException, IOException {
@@ -1052,11 +1054,19 @@ public class TezClient {
     return cachedTezJarResources;
   }
 
+  @Private
+  static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf, YarnConfiguration
+      yarnConf, FrameworkClient frameworkClient)
+      throws IOException, TezException {
+    return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf,
+        yarnConf, frameworkClient);
+  }
+
   @Private // Used only for MapReduce compatibility code
   static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
                                 FrameworkClient frameworkClient)
       throws IOException, TezException {
-    return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf, frameworkClient);
+    return getDAGClient(appId, tezConf, new YarnConfiguration(tezConf), frameworkClient);
   }
 
   // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java

http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java b/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java
index 3ac82ac..2a0c79a 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java
@@ -38,6 +38,8 @@ public class TezYarnClient extends FrameworkClient {
 
   private final YarnClient yarnClient;
 
+  private volatile boolean isRunning;
+
   protected TezYarnClient(YarnClient yarnClient) {
     this.yarnClient = yarnClient;
   }
@@ -50,10 +52,12 @@ public class TezYarnClient extends FrameworkClient {
   @Override
   public void start() {
     yarnClient.start();
+    isRunning = true;
   }
 
   @Override
   public void stop() {
+    isRunning = false;
     yarnClient.stop();
   }
 
@@ -98,4 +102,9 @@ public class TezYarnClient extends FrameworkClient {
     }
     return report;
   }
+
+  @Override
+  public boolean isRunning() throws IOException {
+    return isRunning;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index af67ee8..4820b6e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -76,27 +76,28 @@ public class DAGClientImpl extends DAGClient {
       VertexStatus.State.ERROR);
   private long statusPollInterval;
   private long diagnoticsWaitTimeout;
+  private boolean cleanupFrameworkClient;
 
   public DAGClientImpl(ApplicationId appId, String dagId, TezConfiguration conf,
-                       @Nullable FrameworkClient frameworkClient) {
+      YarnConfiguration yarnConf, @Nullable FrameworkClient frameworkClient) {
     this.appId = appId;
     this.dagId = dagId;
     this.conf = conf;
-    if (frameworkClient != null &&
-        conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) {
+    if (frameworkClient != null) {
       this.frameworkClient = frameworkClient;
     } else {
       this.frameworkClient = FrameworkClient.createFrameworkClient(conf);
-      this.frameworkClient.init(conf, new YarnConfiguration(conf));
+      this.frameworkClient.init(conf, yarnConf);
       this.frameworkClient.start();
+      cleanupFrameworkClient = true;
     }
     isATSEnabled = conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "")
-            .equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService") &&
-            conf.getBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED,
-                 TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT) &&
-            conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
-                 TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT) &&
-            DAGClientTimelineImpl.isSupported();
+        .equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService") &&
+        conf.getBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED,
+            TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT) &&
+        conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
+            TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT) &&
+        DAGClientTimelineImpl.isSupported();
 
     realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient);
     statusPollInterval = conf.getLong(
@@ -341,7 +342,7 @@ public class DAGClientImpl extends DAGClient {
   @Override
   public void close() throws IOException {
     realClient.close();
-    if (frameworkClient != null) {
+    if (frameworkClient != null && cleanupFrameworkClient) {
       frameworkClient.stop();
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index ff48755..9eb9807 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -227,8 +227,15 @@ public class DAGClientRPCImpl extends DAGClientInternal {
 
   ApplicationReport getAppReport() throws IOException, TezException,
       ApplicationNotFoundException {
+    FrameworkClient client = null;
     try {
-      ApplicationReport appReport = frameworkClient.getApplicationReport(appId);
+      ApplicationReport appReport = null;
+      if (!frameworkClient.isRunning()) {
+        client = FrameworkClient.createFrameworkClient(conf);
+        appReport = client.getApplicationReport(appId);
+      } else {
+        appReport = frameworkClient.getApplicationReport(appId);
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug("App: " + appId + " in state: "
             + appReport.getYarnApplicationState());
@@ -238,6 +245,10 @@ public class DAGClientRPCImpl extends DAGClientInternal {
       throw e;
     } catch (YarnException e) {
       throw new TezException(e);
+    } finally {
+      if (client != null) {
+        client.stop();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index 674781e..e979f8b 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -34,6 +34,7 @@ import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.client.FrameworkClient;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -200,8 +201,10 @@ public class TestDAGClient {
       .thenReturn(GetVertexStatusResponseProto.newBuilder().setVertexStatus(vertexStatusProtoWithoutCounters).build());
     when(mockProxy.getVertexStatus(isNull(RpcController.class), argThat(new VertexCounterRequestMatcher())))
       .thenReturn(GetVertexStatusResponseProto.newBuilder().setVertexStatus(vertexStatusProtoWithCounters).build());
-   
-    dagClient = new DAGClientImpl(mockAppId, dagIdStr, new TezConfiguration(), null);
+
+    TezConfiguration tezConf = new TezConfiguration();
+    YarnConfiguration yarnConf = new YarnConfiguration(tezConf);
+    dagClient = new DAGClientImpl(mockAppId, dagIdStr, tezConf,  yarnConf, null);
     DAGClientRPCImpl realClient = (DAGClientRPCImpl)((DAGClientImpl)dagClient).getRealClient();
     realClient.appReport = mockAppReport;
     realClient.proxy = mockProxy;
@@ -335,8 +338,10 @@ public class TestDAGClient {
 
     TezConfiguration tezConf = new TezConfiguration();
     tezConf.setLong(TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS, 800l);
+    YarnConfiguration yarnConf = new YarnConfiguration(tezConf);
 
-    DAGClientImplForTest dagClient = new DAGClientImplForTest(mockAppId, dagIdStr, tezConf, null);
+    DAGClientImplForTest dagClient = new DAGClientImplForTest(mockAppId, dagIdStr, tezConf,
+        yarnConf,null);
     DAGClientRPCImplForTest dagClientRpc =
         new DAGClientRPCImplForTest(mockAppId, dagIdStr, tezConf, null);
     dagClient.setRealClient(dagClientRpc);
@@ -417,12 +422,14 @@ public class TestDAGClient {
                                      String loggingClass, boolean amHistoryLoggingEnabled,
                                      boolean dagHistoryLoggingEnabled) {
     TezConfiguration tezConf = new TezConfiguration();
+    YarnConfiguration yarnConf = new YarnConfiguration(tezConf);
 
     tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, loggingClass);
     tezConf.setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, amHistoryLoggingEnabled);
     tezConf.setBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, dagHistoryLoggingEnabled);
 
-    DAGClientImplForTest dagClient = new DAGClientImplForTest(appId, dagIdStr, tezConf, null);
+    DAGClientImplForTest dagClient = new DAGClientImplForTest(appId, dagIdStr, tezConf,
+        yarnConf,null);
     assertEquals(expected, dagClient.getIsATSEnabled());
   }
 
@@ -466,10 +473,10 @@ public class TestDAGClient {
     private DAGStatus rmDagStatus;
     int numGetStatusViaRmInvocations = 0;
 
-    public DAGClientImplForTest(ApplicationId appId, String dagId,
-                                TezConfiguration conf,
-                                @Nullable FrameworkClient frameworkClient) {
-      super(appId, dagId, conf, frameworkClient);
+    public DAGClientImplForTest(ApplicationId appId, String dagId, TezConfiguration conf,
+        YarnConfiguration yarnConf,
+        @Nullable FrameworkClient frameworkClient) {
+      super(appId, dagId, conf, yarnConf, frameworkClient);
     }
 
     private void setRealClient(DAGClientRPCImplForTest dagClientRpcImplForTest) {

http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 7c65c07..db7fc2c 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -147,6 +147,11 @@ public class LocalClient extends FrameworkClient {
   }
 
   @Override
+  public boolean isRunning() {
+    return true;
+  }
+
+  @Override
   public ApplicationReport getApplicationReport(ApplicationId appId) {
     ApplicationReport report = Records.newRecord(ApplicationReport.class);
     report.setApplicationId(appId);

http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index b2a5d17..08bac0d 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -72,7 +72,8 @@ public class TestFaultTolerance {
   protected static MiniDFSCluster dfsCluster;
   
   private static TezClient tezSession = null;
-  
+  private static TezConfiguration tezConf;
+
   @BeforeClass
   public static void setup() throws Exception {
     LOG.info("Starting mini clusters");
@@ -97,7 +98,7 @@ public class TestFaultTolerance {
           .valueOf(new Random().nextInt(100000))));
       TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
       
-      TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+      tezConf = new TezConfiguration(miniTezCluster.getConfig());
       tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
           remoteStagingDir.toString());
       tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
@@ -160,7 +161,40 @@ public class TestFaultTolerance {
       Assert.assertTrue(Joiner.on(":").join(dagStatus.getDiagnostics()).contains(diagnostics));
     }
   }
-  
+
+  @Test (timeout=600000)
+  public void testSessionStopped() throws Exception {
+    Configuration testConf = new Configuration(false);
+
+    testConf.setBoolean(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+    testConf.set(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
+
+    // verify value at v2 task1
+    testConf.set(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "1");
+
+    testConf.setInt(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 4);
+    DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf);
+    tezSession.waitTillReady();
+
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    dagClient.waitForCompletion();
+    // kill the session now
+    tezSession.stop();
+
+    // Check if killing DAG does not throw any exception
+    dagClient.tryKillDAG();
+
+    // restart the session for rest of the tests
+    tezSession = TezClient.create("TestFaultTolerance", tezConf, true);
+    tezSession.start();
+  }
+
   @Test (timeout=60000)
   public void testBasicSuccessScatterGather() throws Exception {
     DAG dag = SimpleTestDAG.createDAG("testBasicSuccessScatterGather", null);