You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/20 20:45:37 UTC

tez git commit: TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters. (sseth)

Repository: tez
Updated Branches:
  refs/heads/branch-0.6 86309f93d -> 331671516


TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the
default max counters. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/33167151
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/33167151
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/33167151

Branch: refs/heads/branch-0.6
Commit: 3316715160514bc971d7289e4a0db6b9da08d171
Parents: 86309f9
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Aug 20 11:45:27 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 20 11:45:27 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../java/org/apache/tez/client/TezClient.java   |  2 +
 .../org/apache/tez/common/counters/Limits.java  | 10 ++-
 .../org/apache/tez/client/TestTezClient.java    | 68 +++++++++++++++-----
 4 files changed, 65 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/33167151/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4808e5b..3fcec25 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
   TEZ-2719. Consider reducing logs in unordered fetcher with shared-fetch option
   TEZ-2630. TezChild receives IP address instead of FQDN.
 
@@ -223,6 +224,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the default max counters
   TEZ-2635. Limit number of attempts being downloaded in unordered fetch.
   TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 physical inputs.
   TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error

http://git-wip-us.apache.org/repos/asf/tez/blob/33167151/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index b338112..cd02ba1 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.common.security.JobTokenSecretManager;
@@ -149,6 +150,7 @@ public class TezClient {
     tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
     this.amConfig = new AMConfiguration(tezConf, localResources, credentials);
     this.apiVersionInfo = new TezApiVersionInfo();
+    Limits.setConfiguration(tezConf);
 
     LOG.info("Tez Client Version: " + apiVersionInfo.toString());
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/33167151/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java b/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java
index 38e825a..f88c074 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/Limits.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.common.counters;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -41,7 +42,7 @@ public class Limits {
       return;
     }
     if (conf == null) {
-      conf = new Configuration();
+      conf = new TezConfiguration();
     }
     GROUP_NAME_MAX =
         conf.getInt(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH,
@@ -116,4 +117,11 @@ public class Limits {
     }
   }
 
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  public synchronized static void reset() {
+    conf = null;
+    initialized = false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/33167151/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 14d4d7f..ee20e87 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nullable;
 
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
@@ -45,6 +46,9 @@ import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.common.counters.LimitExceededException;
+import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -98,12 +102,19 @@ public class TestTezClient {
     }
   }
   
-  TezClientForTest configure() throws YarnException, IOException {
-    return configure(new HashMap<String, LocalResource>(), true);
+  TezClientForTest configureAndCreateTezClient() throws YarnException, IOException {
+    return configureAndCreateTezClient(null);
   }
   
-  TezClientForTest configure(Map<String, LocalResource> lrs, boolean isSession) throws YarnException, IOException {
-    TezConfiguration conf = new TezConfiguration();
+  TezClientForTest configureAndCreateTezClient(TezConfiguration conf) throws YarnException, IOException {
+    return configureAndCreateTezClient(new HashMap<String, LocalResource>(), true, conf);
+  }
+  
+  TezClientForTest configureAndCreateTezClient(Map<String, LocalResource> lrs, boolean isSession,
+                                               TezConfiguration conf) throws YarnException, IOException {
+    if (conf == null) {
+      conf = new TezConfiguration();
+    }
     conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
     conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
     TezClientForTest client = new TezClientForTest("test", conf, lrs, null);
@@ -138,7 +149,7 @@ public class TestTezClient {
     lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
         LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
     
-    TezClientForTest client = configure(lrs, isSession);
+    TezClientForTest client = configureAndCreateTezClient(lrs, isSession, null);
     
     ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class);
     when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
@@ -244,7 +255,7 @@ public class TestTezClient {
   
   @Test (timeout=5000)
   public void testPreWarm() throws Exception {
-    TezClientForTest client = configure();
+    TezClientForTest client = configureAndCreateTezClient();
     client.start();
 
     when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
@@ -272,7 +283,8 @@ public class TestTezClient {
   }
   
   public void testMultipleSubmissionsJob(boolean isSession) throws Exception {
-    TezClientForTest client1 = configure(new HashMap<String, LocalResource>(), isSession);
+    TezClientForTest client1 = configureAndCreateTezClient(new HashMap<String, LocalResource>(),
+        isSession, null);
     when(client1.mockYarnClient.getApplicationReport(client1.mockAppId).getYarnApplicationState())
     .thenReturn(YarnApplicationState.RUNNING);
     client1.start();
@@ -292,7 +304,7 @@ public class TestTezClient {
     // the dag resource will be added to the vertex once
     client1.submitDAG(dag);
     
-    TezClientForTest client2 = configure();
+    TezClientForTest client2 = configureAndCreateTezClient();
     when(client2.mockYarnClient.getApplicationReport(client2.mockAppId).getYarnApplicationState())
     .thenReturn(YarnApplicationState.RUNNING);
     client2.start();
@@ -307,7 +319,7 @@ public class TestTezClient {
   
   @Test(timeout = 5000)
   public void testWaitTillReady_Interrupt() throws Exception {
-    final TezClientForTest client = configure();
+    final TezClientForTest client = configureAndCreateTezClient();
     client.start();
 
     when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
@@ -327,13 +339,13 @@ public class TestTezClient {
     thread.join(250);
     thread.interrupt();
     thread.join();
-    Assert.assertThat(exceptionReference.get(),CoreMatchers. instanceOf(InterruptedException.class));
+    Assert.assertThat(exceptionReference.get(), CoreMatchers.instanceOf(InterruptedException.class));
     client.stop();
   }
   
   @Test(timeout = 5000)
   public void testWaitTillReadyAppFailed() throws Exception {
-    final TezClientForTest client = configure();
+    final TezClientForTest client = configureAndCreateTezClient();
     client.start();
     String msg = "Application Test Failed";
     when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
@@ -342,7 +354,7 @@ public class TestTezClient {
         msg);
     try {
       client.waitTillReady();
-      Assert.fail();
+      fail();
     } catch (SessionNotRunning e) {
       Assert.assertTrue(e.getMessage().contains(msg));
     }
@@ -351,13 +363,13 @@ public class TestTezClient {
   
   @Test(timeout = 5000)
   public void testWaitTillReadyAppFailedNoDiagnostics() throws Exception {
-    final TezClientForTest client = configure();
+    final TezClientForTest client = configureAndCreateTezClient();
     client.start();
     when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState())
         .thenReturn(YarnApplicationState.NEW).thenReturn(YarnApplicationState.FAILED);
     try {
       client.waitTillReady();
-      Assert.fail();
+      fail();
     } catch (SessionNotRunning e) {
       Assert.assertTrue(e.getMessage().contains(TezClient.NO_CLUSTER_DIAGNOSTICS_MSG));
     }
@@ -366,7 +378,7 @@ public class TestTezClient {
   
   @Test(timeout = 5000)
   public void testSubmitDAGAppFailed() throws Exception {
-    final TezClientForTest client = configure();
+    final TezClientForTest client = configureAndCreateTezClient();
     client.start();
     
     client.callRealGetSessionAMProxy = true;
@@ -382,11 +394,35 @@ public class TestTezClient {
     
     try {
       client.submitDAG(dag);
-      Assert.fail();
+      fail();
     } catch (SessionNotRunning e) {
       Assert.assertTrue(e.getMessage().contains(msg));
     }
     client.stop();
   }
 
+  @Test(timeout = 5000)
+  public void testTezClientCounterLimits() throws YarnException, IOException {
+    Limits.reset();
+    int defaultCounterLimit = TezConfiguration.TEZ_COUNTERS_MAX_DEFAULT;
+
+    int newCounterLimit = defaultCounterLimit + 500;
+
+    TezConfiguration conf = new TezConfiguration();
+    conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX, newCounterLimit);
+
+    configureAndCreateTezClient(conf);
+
+    TezCounters counters = new TezCounters();
+    for (int i = 0 ; i < newCounterLimit ; i++) {
+      counters.findCounter("GroupName", "TestCounter" + i).setValue(i);
+    }
+
+    try {
+      counters.findCounter("GroupName", "TestCounterFail").setValue(1);
+      fail("Expecting a LimitExceedException - too many counters");
+    } catch (LimitExceededException e) {
+    }
+  }
+
 }