You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/20 20:45:37 UTC
tez git commit: TEZ-2629. LimitExceededException in Tez client when
DAG has exceeds the default max counters. (sseth)
Repository: tez
Updated Branches:
refs/heads/branch-0.6 86309f93d -> 331671516
TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the
default max counters. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/33167151
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/33167151
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/33167151
Branch: refs/heads/branch-0.6
Commit: 3316715160514bc971d7289e4a0db6b9da08d171
Parents: 86309f9
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Aug 20 11:45:27 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 20 11:45:27 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../java/org/apache/tez/client/TezClient.java | 2 +
.../org/apache/tez/common/counters/Limits.java | 10 ++-
.../org/apache/tez/client/TestTezClient.java | 68 +++++++++++++++-----
4 files changed, 65 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/33167151/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4808e5b..3fcec25 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
TEZ-2719. Consider reducing logs in unordered fetcher with shared-fetch option
TEZ-2630. TezChild receives IP address instead of FQDN.
@@ -223,6 +224,7 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
TEZ-2635. Limit number of attempts being downloaded in unordered fetch.
TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs.
TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error
http://git-wip-us.apache.org/repos/asf/tez/blob/33167151/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index b338112..cd02ba1 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.common.security.JobTokenSecretManager;
@@ -149,6 +150,7 @@ public class TezClient {
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
this.amConfig = new AMConfiguration(tezConf, localResources, credentials);
this.apiVersionInfo = new TezApiVersionInfo();
+ Limits.setConfiguration(tezConf);
LOG.info("Tez Client Version: " + apiVersionInfo.toString());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/33167151/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java b/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java
index 38e825a..f88c074 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java
@@ -18,6 +18,7 @@
package org.apache.tez.common.counters;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -41,7 +42,7 @@ public class Limits {
return;
}
if (conf == null) {
- conf = new Configuration();
+ conf = new TezConfiguration();
}
GROUP_NAME_MAX =
conf.getInt(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH,
@@ -116,4 +117,11 @@ public class Limits {
}
}
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ public synchronized static void reset() {
+ conf = null;
+ initialized = false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/33167151/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 14d4d7f..ee20e87 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
@@ -45,6 +46,9 @@ import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.common.counters.LimitExceededException;
+import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -98,12 +102,19 @@ public class TestTezClient {
}
}
- TezClientForTest configure() throws YarnException, IOException {
- return configure(new HashMap<String, LocalResource>(), true);
+ TezClientForTest configureAndCreateTezClient() throws YarnException, IOException {
+ return configureAndCreateTezClient(null);
}
- TezClientForTest configure(Map<String, LocalResource> lrs, boolean isSession) throws YarnException, IOException {
- TezConfiguration conf = new TezConfiguration();
+ TezClientForTest configureAndCreateTezClient(TezConfiguration conf) throws YarnException, IOException {
+ return configureAndCreateTezClient(new HashMap<String, LocalResource>(), true, conf);
+ }
+
+ TezClientForTest configureAndCreateTezClient(Map<String, LocalResource> lrs, boolean isSession,
+ TezConfiguration conf) throws YarnException, IOException {
+ if (conf == null) {
+ conf = new TezConfiguration();
+ }
conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
@@ -138,7 +149,7 @@ public class TestTezClient {
lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
- TezClientForTest client = configure(lrs, isSession);
+ TezClientForTest client = configureAndCreateTezClient(lrs, isSession, null);
ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class);
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
@@ -244,7 +255,7 @@ public class TestTezClient {
@Test (timeout=5000)
public void testPreWarm() throws Exception {
- TezClientForTest client = configure();
+ TezClientForTest client = configureAndCreateTezClient();
client.start();
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
@@ -272,7 +283,8 @@ public class TestTezClient {
}
public void testMultipleSubmissionsJob(boolean isSession) throws Exception {
- TezClientForTest client1 = configure(new HashMap<String, LocalResource>(), isSession);
+ TezClientForTest client1 = configureAndCreateTezClient(new HashMap<String, LocalResource>(),
+ isSession, null);
when(client1.mockYarnClient.getApplicationReport(client1.mockAppId).getYarnApplicationState())
.thenReturn(YarnApplicationState.RUNNING);
client1.start();
@@ -292,7 +304,7 @@ public class TestTezClient {
// the dag resource will be added to the vertex once
client1.submitDAG(dag);
- TezClientForTest client2 = configure();
+ TezClientForTest client2 = configureAndCreateTezClient();
when(client2.mockYarnClient.getApplicationReport(client2.mockAppId).getYarnApplicationState())
.thenReturn(YarnApplicationState.RUNNING);
client2.start();
@@ -307,7 +319,7 @@ public class TestTezClient {
@Test(timeout = 5000)
public void testWaitTillReady_Interrupt() throws Exception {
- final TezClientForTest client = configure();
+ final TezClientForTest client = configureAndCreateTezClient();
client.start();
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
@@ -327,13 +339,13 @@ public class TestTezClient {
thread.join(250);
thread.interrupt();
thread.join();
- Assert.assertThat(exceptionReference.get(),CoreMatchers. instanceOf(InterruptedException.class));
+ Assert.assertThat(exceptionReference.get(), CoreMatchers.instanceOf(InterruptedException.class));
client.stop();
}
@Test(timeout = 5000)
public void testWaitTillReadyAppFailed() throws Exception {
- final TezClientForTest client = configure();
+ final TezClientForTest client = configureAndCreateTezClient();
client.start();
String msg = "Application Test Failed";
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
@@ -342,7 +354,7 @@ public class TestTezClient {
msg);
try {
client.waitTillReady();
- Assert.fail();
+ fail();
} catch (SessionNotRunning e) {
Assert.assertTrue(e.getMessage().contains(msg));
}
@@ -351,13 +363,13 @@ public class TestTezClient {
@Test(timeout = 5000)
public void testWaitTillReadyAppFailedNoDiagnostics() throws Exception {
- final TezClientForTest client = configure();
+ final TezClientForTest client = configureAndCreateTezClient();
client.start();
when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
.thenReturn(YarnApplicationState.NEW).thenReturn(YarnApplicationState.FAILED);
try {
client.waitTillReady();
- Assert.fail();
+ fail();
} catch (SessionNotRunning e) {
Assert.assertTrue(e.getMessage().contains(TezClient.NO_CLUSTER_DIAGNOSTICS_MSG));
}
@@ -366,7 +378,7 @@ public class TestTezClient {
@Test(timeout = 5000)
public void testSubmitDAGAppFailed() throws Exception {
- final TezClientForTest client = configure();
+ final TezClientForTest client = configureAndCreateTezClient();
client.start();
client.callRealGetSessionAMProxy = true;
@@ -382,11 +394,35 @@ public class TestTezClient {
try {
client.submitDAG(dag);
- Assert.fail();
+ fail();
} catch (SessionNotRunning e) {
Assert.assertTrue(e.getMessage().contains(msg));
}
client.stop();
}
+ @Test(timeout = 5000)
+ public void testTezClientCounterLimits() throws YarnException, IOException {
+ Limits.reset();
+ int defaultCounterLimit = TezConfiguration.TEZ_COUNTERS_MAX_DEFAULT;
+
+ int newCounterLimit = defaultCounterLimit + 500;
+
+ TezConfiguration conf = new TezConfiguration();
+ conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX, newCounterLimit);
+
+ configureAndCreateTezClient(conf);
+
+ TezCounters counters = new TezCounters();
+ for (int i = 0 ; i < newCounterLimit ; i++) {
+ counters.findCounter("GroupName", "TestCounter" + i).setValue(i);
+ }
+
+ try {
+ counters.findCounter("GroupName", "TestCounterFail").setValue(1);
+ fail("Expecting a LimitExceedException - too many counters");
+ } catch (LimitExceededException e) {
+ }
+ }
+
}