You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2017/01/10 03:20:14 UTC
tez git commit: TEZ-3551: FrameworkClient created twice causing minor
delay (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 89d47c325 -> d67faeb37
TEZ-3551: FrameworkClient created twice causing minor delay (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d67faeb3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d67faeb3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d67faeb3
Branch: refs/heads/master
Commit: d67faeb37c8d2600449431d4ab5dac1bfcd7b1fa
Parents: 89d47c3
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue Jan 10 08:49:59 2017 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue Jan 10 08:49:59 2017 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/client/FrameworkClient.java | 2 +
.../java/org/apache/tez/client/TezClient.java | 18 +++++++--
.../org/apache/tez/client/TezYarnClient.java | 9 +++++
.../tez/dag/api/client/DAGClientImpl.java | 23 +++++------
.../dag/api/client/rpc/DAGClientRPCImpl.java | 13 ++++++-
.../tez/dag/api/client/rpc/TestDAGClient.java | 23 +++++++----
.../java/org/apache/tez/client/LocalClient.java | 5 +++
.../org/apache/tez/test/TestFaultTolerance.java | 40 ++++++++++++++++++--
9 files changed, 107 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6aa66f3..0fe0c88 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3551. FrameworkClient created twice causing minor delay.
TEZ-3566. Avoid caching fs isntances in TokenCache after a point.
TEZ-3568. Update SecurityUtils configuration to pick user provided configuration.
TEZ-3561. Fix wrong tez tarball name in install.md.
http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
index cb20f49..b3e084c 100644
--- a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java
@@ -78,4 +78,6 @@ public abstract class FrameworkClient {
public abstract ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException;
+ public abstract boolean isRunning() throws IOException;
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 29e7a8b..f4e9f10 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -547,7 +547,6 @@ public class TezClient {
}
}
- TezConfiguration dagClientConf = new TezConfiguration(amConfig.getTezConfiguration());
Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
usingTezArchiveDeploy, sessionCredentials, servicePluginsDescriptor, javaOptsChecker);
@@ -613,7 +612,9 @@ public class TezClient {
+ ", dagId=" + dagId
+ ", dagName=" + dag.getName());
return new DAGClientImpl(sessionAppId, dagId,
- dagClientConf, frameworkClient);
+ amConfig.getTezConfiguration(),
+ amConfig.getYarnConfiguration(),
+ frameworkClient);
}
/**
@@ -1030,7 +1031,8 @@ public class TezClient {
}
// wait for dag in non-session mode to start running, so that we can start to getDAGStatus
waitNonSessionTillReady();
- return getDAGClient(appId, amConfig.getTezConfiguration(), frameworkClient);
+ return getDAGClient(appId, amConfig.getTezConfiguration(), amConfig.getYarnConfiguration(),
+ frameworkClient);
}
private ApplicationId createApplication() throws TezException, IOException {
@@ -1052,11 +1054,19 @@ public class TezClient {
return cachedTezJarResources;
}
+ @Private
+ static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf, YarnConfiguration
+ yarnConf, FrameworkClient frameworkClient)
+ throws IOException, TezException {
+ return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf,
+ yarnConf, frameworkClient);
+ }
+
@Private // Used only for MapReduce compatibility code
static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
FrameworkClient frameworkClient)
throws IOException, TezException {
- return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf, frameworkClient);
+ return getDAGClient(appId, tezConf, new YarnConfiguration(tezConf), frameworkClient);
}
// DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java b/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java
index 3ac82ac..2a0c79a 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java
@@ -38,6 +38,8 @@ public class TezYarnClient extends FrameworkClient {
private final YarnClient yarnClient;
+ private volatile boolean isRunning;
+
protected TezYarnClient(YarnClient yarnClient) {
this.yarnClient = yarnClient;
}
@@ -50,10 +52,12 @@ public class TezYarnClient extends FrameworkClient {
@Override
public void start() {
yarnClient.start();
+ isRunning = true;
}
@Override
public void stop() {
+ isRunning = false;
yarnClient.stop();
}
@@ -98,4 +102,9 @@ public class TezYarnClient extends FrameworkClient {
}
return report;
}
+
+ @Override
+ public boolean isRunning() throws IOException {
+ return isRunning;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index af67ee8..4820b6e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -76,27 +76,28 @@ public class DAGClientImpl extends DAGClient {
VertexStatus.State.ERROR);
private long statusPollInterval;
private long diagnoticsWaitTimeout;
+ private boolean cleanupFrameworkClient;
public DAGClientImpl(ApplicationId appId, String dagId, TezConfiguration conf,
- @Nullable FrameworkClient frameworkClient) {
+ YarnConfiguration yarnConf, @Nullable FrameworkClient frameworkClient) {
this.appId = appId;
this.dagId = dagId;
this.conf = conf;
- if (frameworkClient != null &&
- conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) {
+ if (frameworkClient != null) {
this.frameworkClient = frameworkClient;
} else {
this.frameworkClient = FrameworkClient.createFrameworkClient(conf);
- this.frameworkClient.init(conf, new YarnConfiguration(conf));
+ this.frameworkClient.init(conf, yarnConf);
this.frameworkClient.start();
+ cleanupFrameworkClient = true;
}
isATSEnabled = conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "")
- .equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService") &&
- conf.getBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED,
- TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT) &&
- conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
- TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT) &&
- DAGClientTimelineImpl.isSupported();
+ .equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService") &&
+ conf.getBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED,
+ TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT) &&
+ conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED,
+ TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT) &&
+ DAGClientTimelineImpl.isSupported();
realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient);
statusPollInterval = conf.getLong(
@@ -341,7 +342,7 @@ public class DAGClientImpl extends DAGClient {
@Override
public void close() throws IOException {
realClient.close();
- if (frameworkClient != null) {
+ if (frameworkClient != null && cleanupFrameworkClient) {
frameworkClient.stop();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index ff48755..9eb9807 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -227,8 +227,15 @@ public class DAGClientRPCImpl extends DAGClientInternal {
ApplicationReport getAppReport() throws IOException, TezException,
ApplicationNotFoundException {
+ FrameworkClient client = null;
try {
- ApplicationReport appReport = frameworkClient.getApplicationReport(appId);
+ ApplicationReport appReport = null;
+ if (!frameworkClient.isRunning()) {
+ client = FrameworkClient.createFrameworkClient(conf);
+ appReport = client.getApplicationReport(appId);
+ } else {
+ appReport = frameworkClient.getApplicationReport(appId);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("App: " + appId + " in state: "
+ appReport.getYarnApplicationState());
@@ -238,6 +245,10 @@ public class DAGClientRPCImpl extends DAGClientInternal {
throw e;
} catch (YarnException e) {
throw new TezException(e);
+ } finally {
+ if (client != null) {
+ client.stop();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index 674781e..e979f8b 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -34,6 +34,7 @@ import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.FrameworkClient;
import org.apache.tez.dag.api.TezConfiguration;
@@ -200,8 +201,10 @@ public class TestDAGClient {
.thenReturn(GetVertexStatusResponseProto.newBuilder().setVertexStatus(vertexStatusProtoWithoutCounters).build());
when(mockProxy.getVertexStatus(isNull(RpcController.class), argThat(new VertexCounterRequestMatcher())))
.thenReturn(GetVertexStatusResponseProto.newBuilder().setVertexStatus(vertexStatusProtoWithCounters).build());
-
- dagClient = new DAGClientImpl(mockAppId, dagIdStr, new TezConfiguration(), null);
+
+ TezConfiguration tezConf = new TezConfiguration();
+ YarnConfiguration yarnConf = new YarnConfiguration(tezConf);
+ dagClient = new DAGClientImpl(mockAppId, dagIdStr, tezConf, yarnConf, null);
DAGClientRPCImpl realClient = (DAGClientRPCImpl)((DAGClientImpl)dagClient).getRealClient();
realClient.appReport = mockAppReport;
realClient.proxy = mockProxy;
@@ -335,8 +338,10 @@ public class TestDAGClient {
TezConfiguration tezConf = new TezConfiguration();
tezConf.setLong(TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS, 800l);
+ YarnConfiguration yarnConf = new YarnConfiguration(tezConf);
- DAGClientImplForTest dagClient = new DAGClientImplForTest(mockAppId, dagIdStr, tezConf, null);
+ DAGClientImplForTest dagClient = new DAGClientImplForTest(mockAppId, dagIdStr, tezConf,
+ yarnConf,null);
DAGClientRPCImplForTest dagClientRpc =
new DAGClientRPCImplForTest(mockAppId, dagIdStr, tezConf, null);
dagClient.setRealClient(dagClientRpc);
@@ -417,12 +422,14 @@ public class TestDAGClient {
String loggingClass, boolean amHistoryLoggingEnabled,
boolean dagHistoryLoggingEnabled) {
TezConfiguration tezConf = new TezConfiguration();
+ YarnConfiguration yarnConf = new YarnConfiguration(tezConf);
tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, loggingClass);
tezConf.setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, amHistoryLoggingEnabled);
tezConf.setBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, dagHistoryLoggingEnabled);
- DAGClientImplForTest dagClient = new DAGClientImplForTest(appId, dagIdStr, tezConf, null);
+ DAGClientImplForTest dagClient = new DAGClientImplForTest(appId, dagIdStr, tezConf,
+ yarnConf,null);
assertEquals(expected, dagClient.getIsATSEnabled());
}
@@ -466,10 +473,10 @@ public class TestDAGClient {
private DAGStatus rmDagStatus;
int numGetStatusViaRmInvocations = 0;
- public DAGClientImplForTest(ApplicationId appId, String dagId,
- TezConfiguration conf,
- @Nullable FrameworkClient frameworkClient) {
- super(appId, dagId, conf, frameworkClient);
+ public DAGClientImplForTest(ApplicationId appId, String dagId, TezConfiguration conf,
+ YarnConfiguration yarnConf,
+ @Nullable FrameworkClient frameworkClient) {
+ super(appId, dagId, conf, yarnConf, frameworkClient);
}
private void setRealClient(DAGClientRPCImplForTest dagClientRpcImplForTest) {
http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 7c65c07..db7fc2c 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -147,6 +147,11 @@ public class LocalClient extends FrameworkClient {
}
@Override
+ public boolean isRunning() {
+ return true;
+ }
+
+ @Override
public ApplicationReport getApplicationReport(ApplicationId appId) {
ApplicationReport report = Records.newRecord(ApplicationReport.class);
report.setApplicationId(appId);
http://git-wip-us.apache.org/repos/asf/tez/blob/d67faeb3/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index b2a5d17..08bac0d 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -72,7 +72,8 @@ public class TestFaultTolerance {
protected static MiniDFSCluster dfsCluster;
private static TezClient tezSession = null;
-
+ private static TezConfiguration tezConf;
+
@BeforeClass
public static void setup() throws Exception {
LOG.info("Starting mini clusters");
@@ -97,7 +98,7 @@ public class TestFaultTolerance {
.valueOf(new Random().nextInt(100000))));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
- TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+ tezConf = new TezConfiguration(miniTezCluster.getConfig());
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
@@ -160,7 +161,40 @@ public class TestFaultTolerance {
Assert.assertTrue(Joiner.on(":").join(dagStatus.getDiagnostics()).contains(diagnostics));
}
}
-
+
+ @Test (timeout=600000)
+ public void testSessionStopped() throws Exception {
+ Configuration testConf = new Configuration(false);
+
+ testConf.setBoolean(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+ testConf.set(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
+ testConf.setInt(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
+
+ // verify value at v2 task1
+ testConf.set(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "1");
+
+ testConf.setInt(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 4);
+ DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf);
+ tezSession.waitTillReady();
+
+ DAGClient dagClient = tezSession.submitDAG(dag);
+ dagClient.waitForCompletion();
+ // kill the session now
+ tezSession.stop();
+
+ // Check if killing DAG does not throw any exception
+ dagClient.tryKillDAG();
+
+ // restart the session for rest of the tests
+ tezSession = TezClient.create("TestFaultTolerance", tezConf, true);
+ tezSession.start();
+ }
+
@Test (timeout=60000)
public void testBasicSuccessScatterGather() throws Exception {
DAG dag = SimpleTestDAG.createDAG("testBasicSuccessScatterGather", null);