You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/06/22 06:46:30 UTC
tez git commit: TEZ-2561. Port for TaskAttemptListenerImpTezDag
should be configurable (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master 4995bccdb -> 4b29ece20
TEZ-2561. Port for TaskAttemptListenerImpTezDag should be configurable (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4b29ece2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4b29ece2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4b29ece2
Branch: refs/heads/master
Commit: 4b29ece20afed83bd5b715119a0446f02550d9be
Parents: 4995bcc
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Jun 22 12:45:44 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Mon Jun 22 12:45:44 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 12 +++-
.../dag/app/TaskAttemptListenerImpTezDag.java | 2 +
.../app/TestTaskAttemptListenerImplTezDag.java | 58 +++++++++++++++++++-
4 files changed, 71 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4b29ece2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 08f999f..25a2450 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -448,6 +448,7 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-2561. Port for TaskAttemptListenerImpTezDag should be configurable
TEZ-2566. Allow TaskAttemptFinishedEvent without TaskAttemptStartedEvent when it is KILLED/FAILED
TEZ-2475. Fix a potential hang in Tez local mode caused by incorrectly handled interrupts.
TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.
http://git-wip-us.apache.org/repos/asf/tez/blob/4b29ece2/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 88b1dee..2114793 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -490,13 +490,23 @@ public class TezConfiguration extends Configuration {
/**
* String value. Range of ports that the AM can use when binding for client connections. Leave blank
- * to use all possible ports. Expert level setting.
+ * to use all possible ports. Expert level setting. It's hadoop standard range configuration.
+ * For example 50000-50050,50100-50200
*/
@ConfigurationScope(Scope.AM)
public static final String TEZ_AM_CLIENT_AM_PORT_RANGE =
TEZ_AM_PREFIX + "client.am.port-range";
/**
+ * String value. Range of ports that the AM can use when binding for task connections. Leave blank
+ * to use all possible ports. Expert level setting. It's hadoop standard range configuration.
+ * For example 50000-50050,50100-50200
+ */
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_AM_TASK_AM_PORT_RANGE =
+ TEZ_AM_PREFIX + "task.am.port-range";
+
+ /**
* String value. The class to be used for DAG Scheduling. Expert level setting.
*/
@ConfigurationScope(Scope.DAG)
http://git-wip-us.apache.org/repos/asf/tez/blob/4b29ece2/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 2bf7de3..fe92f3a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -136,6 +136,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
.setNumHandlers(
conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
+ .setPortRangeConfig(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE)
.setSecretManager(jobTokenSecretManager).build();
// Enable service authorization?
@@ -147,6 +148,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
server.start();
this.address = NetUtils.getConnectAddress(server);
+ LOG.info("Instantiated TaskAttemptListener RPC at " + this.address);
} catch (IOException e) {
throw new TezUncheckedException(e);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4b29ece2/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index ac816f4..5c24ecc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -30,7 +31,9 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Vertex;
@@ -54,7 +58,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventType;
@@ -234,6 +237,59 @@ public class TestTaskAttemptListenerImplTezDag {
assertEquals(eventsToSend, response.getEvents());
}
+ //try 10 times to allocate random port, fail it if no one is succeed.
+ @Test (timeout = 5000)
+ public void testPortRange() {
+ boolean succeedToAllocate = false;
+ Random rand = new Random();
+ for (int i = 0; i < 10; ++i) {
+ int nextPort = 1024 + rand.nextInt(65535 - 1024);
+ if (testPortRange(nextPort)) {
+ succeedToAllocate = true;
+ break;
+ }
+ }
+ if (!succeedToAllocate) {
+ fail("Can not allocate free port even in 10 iterations for TaskAttemptListener");
+ }
+ }
+
+ @Test (timeout= 5000)
+ public void testPortRange_NotSpecified() {
+ Configuration conf = new Configuration();
+ taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
+ mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null);
+ // no exception happen, should started properly
+ taskAttemptListener.init(conf);
+ taskAttemptListener.start();
+ }
+
+ private boolean testPortRange(int port) {
+ boolean succeedToAllocate = true;
+ try {
+ Configuration conf = new Configuration();
+ conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port);
+ taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
+ mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null);
+ taskAttemptListener.init(conf);
+ taskAttemptListener.start();
+ int resultedPort = taskAttemptListener.getAddress().getPort();
+ assertEquals(port, resultedPort);
+ } catch (Exception e) {
+ succeedToAllocate = false;
+ } finally {
+ if (taskAttemptListener != null) {
+ try {
+ taskAttemptListener.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("fail to stop TaskAttemptListener");
+ }
+ }
+ }
+ return succeedToAllocate;
+ }
+
private TezHeartbeatResponse generateHeartbeat(List<TezEvent> events,
int fromEventId, int maxEvents, int nextFromEventId,
List<TezEvent> sendEvents) throws IOException, TezException {