You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/06/22 06:46:30 UTC

tez git commit: TEZ-2561. Port for TaskAttemptListenerImpTezDag should be configurable (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master 4995bccdb -> 4b29ece20


TEZ-2561. Port for TaskAttemptListenerImpTezDag should be configurable (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4b29ece2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4b29ece2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4b29ece2

Branch: refs/heads/master
Commit: 4b29ece20afed83bd5b715119a0446f02550d9be
Parents: 4995bcc
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Jun 22 12:45:44 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Mon Jun 22 12:45:44 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/api/TezConfiguration.java    | 12 +++-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  2 +
 .../app/TestTaskAttemptListenerImplTezDag.java  | 58 +++++++++++++++++++-
 4 files changed, 71 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4b29ece2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 08f999f..25a2450 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -448,6 +448,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2561. Port for TaskAttemptListenerImpTezDag should be configurable
   TEZ-2566. Allow TaskAttemptFinishedEvent without TaskAttemptStartedEvent when it is KILLED/FAILED
   TEZ-2475. Fix a potential hang in Tez local mode caused by incorrectly handled interrupts.
   TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.

http://git-wip-us.apache.org/repos/asf/tez/blob/4b29ece2/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 88b1dee..2114793 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -490,13 +490,23 @@ public class TezConfiguration extends Configuration {
   
   /**
    * String value. Range of ports that the AM can use when binding for client connections. Leave blank
-   * to use all possible ports. Expert level setting.
+   * to use all possible ports. Expert level setting. It's hadoop standard range configuration.
+   * For example 50000-50050,50100-50200
    */
   @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_CLIENT_AM_PORT_RANGE =
       TEZ_AM_PREFIX + "client.am.port-range";
 
   /**
+   * String value. Range of ports that the AM can use when binding for task connections. Leave blank
+   * to use all possible ports. Expert level setting. It's hadoop standard range configuration.
+   * For example 50000-50050,50100-50200
+   */
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_AM_TASK_AM_PORT_RANGE =
+      TEZ_AM_PREFIX + "task.am.port-range";
+
+  /**
    * String value. The class to be used for DAG Scheduling. Expert level setting.
    */
   @ConfigurationScope(Scope.DAG)

http://git-wip-us.apache.org/repos/asf/tez/blob/4b29ece2/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 2bf7de3..fe92f3a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -136,6 +136,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
             .setNumHandlers(
                 conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
                     TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
+            .setPortRangeConfig(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE)
             .setSecretManager(jobTokenSecretManager).build();
 
         // Enable service authorization?
@@ -147,6 +148,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
 
         server.start();
         this.address = NetUtils.getConnectAddress(server);
+        LOG.info("Instantiated TaskAttemptListener RPC at " + this.address);
       } catch (IOException e) {
         throw new TezUncheckedException(e);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/4b29ece2/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index ac816f4..5c24ecc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -30,7 +31,9 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -54,7 +58,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
@@ -234,6 +237,59 @@ public class TestTaskAttemptListenerImplTezDag {
     assertEquals(eventsToSend, response.getEvents());
   }
 
+  //try 10 times to allocate random port, fail it if no one is succeed.
+  @Test (timeout = 5000)
+  public void testPortRange() {
+    boolean succeedToAllocate = false;
+    Random rand = new Random();
+    for (int i = 0; i < 10; ++i) {
+      int nextPort = 1024 + rand.nextInt(65535 - 1024);
+      if (testPortRange(nextPort)) {
+        succeedToAllocate = true;
+        break;
+      }
+    }
+    if (!succeedToAllocate) {
+      fail("Can not allocate free port even in 10 iterations for TaskAttemptListener");
+    }
+  }
+
+  @Test (timeout= 5000)
+  public void testPortRange_NotSpecified() {
+    Configuration conf = new Configuration();
+    taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
+        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null);
+    // no exception happen, should started properly
+    taskAttemptListener.init(conf);
+    taskAttemptListener.start();
+  }
+
+  private boolean testPortRange(int port) {
+    boolean succeedToAllocate = true;
+    try {
+      Configuration conf = new Configuration();
+      conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port);
+      taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
+          mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null);
+      taskAttemptListener.init(conf);
+      taskAttemptListener.start();
+      int resultedPort = taskAttemptListener.getAddress().getPort();
+      assertEquals(port, resultedPort);
+    } catch (Exception e) {
+      succeedToAllocate = false;
+    } finally {
+      if (taskAttemptListener != null) {
+        try {
+          taskAttemptListener.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+          fail("fail to stop TaskAttemptListener");
+        }
+      }
+    }
+    return succeedToAllocate;
+  }
+
   private TezHeartbeatResponse generateHeartbeat(List<TezEvent> events,
       int fromEventId, int maxEvents, int nextFromEventId,
       List<TezEvent> sendEvents) throws IOException, TezException {