You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2022/02/21 11:27:21 UTC
[tez] branch branch-0.9 updated: TEZ-4351: ShuffleHandler port should respect value in config (#163) (Laszlo Bodor reviewed by Jonathan Eagles)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 59cbc77 TEZ-4351: ShuffleHandler port should respect value in config (#163) (Laszlo Bodor reviewed by Jonathan Eagles)
59cbc77 is described below
commit 59cbc77ae169bd8ac5a6a9192cc218d8fbf2f0ac
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Wed Jan 5 21:40:06 2022 +0100
TEZ-4351: ShuffleHandler port should respect value in config (#163) (Laszlo Bodor reviewed by Jonathan Eagles)
---
.../org/apache/tez/auxservices/ShuffleHandler.java | 7 +++--
.../apache/tez/auxservices/TestShuffleHandler.java | 33 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 2 deletions(-)
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index 610a260..d679fce 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -464,6 +464,10 @@ public class ShuffleHandler extends AuxiliaryService {
return jt;
}
+ public int getPort() {
+ return port;
+ }
+
@Override
public void initializeApplication(ApplicationInitializationContext context) {
@@ -537,7 +541,7 @@ public class ShuffleHandler extends AuxiliaryService {
return new Thread(r, WORKER_THREAD_NAME_PREFIX + workerThreadCounter.incrementAndGet());
}
});
-
+ port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
super.serviceInit(new YarnConfiguration(conf));
}
@@ -556,7 +560,6 @@ public class ShuffleHandler extends AuxiliaryService {
conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE, DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE))
.childOption(ChannelOption.SO_KEEPALIVE, true);
initPipeline(bootstrap, conf);
- port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
Channel ch = bootstrap.bind().sync().channel();
accepted.add(ch);
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index ba8da1b..d674192 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.CheckedOutputStream;
import java.util.zip.Checksum;
@@ -1419,4 +1420,36 @@ public class TestShuffleHandler {
}
return new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri);
}
+
+ @Test
+ public void testConfigPortStatic() throws Exception {
+ Random rand = new Random();
+ int port = rand.nextInt(10) + 50000;
+ Configuration conf = new Configuration();
+ // provide a port for ShuffleHandler
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, port);
+ MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2();
+ shuffleHandler.serviceInit(conf);
+ try {
+ shuffleHandler.serviceStart();
+ Assert.assertEquals(port, shuffleHandler.getPort());
+ } finally {
+ shuffleHandler.stop();
+ }
+ }
+
+ @Test
+ public void testConfigPortDynamic() throws Exception {
+ Configuration conf = new Configuration();
+ // 0 as config, should be dynamically chosen by netty
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2();
+ shuffleHandler.serviceInit(conf);
+ try {
+ shuffleHandler.serviceStart();
+ Assert.assertTrue("ShuffleHandler should use a random chosen port", shuffleHandler.getPort() > 0);
+ } finally {
+ shuffleHandler.stop();
+ }
+ }
}