You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2022/04/06 09:55:14 UTC
[hadoop] branch branch-3.3 updated: HDFS-16481. Provide support to set Http and Rpc ports in MiniJournalCluster (#4028). Contributed by Viraj Jasani.
This is an automated email from the ASF dual-hosted git repository.
aajisaka pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new a6fb77f7eb9 HDFS-16481. Provide support to set Http and Rpc ports in MiniJournalCluster (#4028). Contributed by Viraj Jasani.
a6fb77f7eb9 is described below
commit a6fb77f7eb91548042594fe9f7aab11766f61919
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Fri Mar 4 22:17:48 2022 +0530
HDFS-16481. Provide support to set Http and Rpc ports in MiniJournalCluster (#4028). Contributed by Viraj Jasani.
(cherry picked from commit 278568203b9c2033743ecca60dbc62d397a85a8d)
---
.../main/java/org/apache/hadoop/net/NetUtils.java | 26 ++++++
.../hadoop/hdfs/qjournal/MiniJournalCluster.java | 41 ++++++++-
.../hdfs/qjournal/TestMiniJournalCluster.java | 98 +++++++++++++++++++++-
3 files changed, 161 insertions(+), 4 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
index f1e209fe1f9..70069460365 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
@@ -1035,6 +1035,32 @@ public class NetUtils {
return port;
}
+ /**
+ * Return free ports. There is no guarantee they will remain free, so
+ * ports should be used immediately. The number of free ports returned by
+ * this method should match argument {@code numOfPorts}. Num of ports
+ * provided in the argument should not exceed 25.
+ *
+ * @param numOfPorts Number of free ports to acquire.
+ * @return Free ports for binding a local socket.
+ */
+ public static Set<Integer> getFreeSocketPorts(int numOfPorts) {
+ Preconditions.checkArgument(numOfPorts > 0 && numOfPorts <= 25,
+ "Valid range for num of ports is between 0 and 26");
+ final Set<Integer> freePorts = new HashSet<>(numOfPorts);
+ for (int i = 0; i < numOfPorts * 5; i++) {
+ int port = getFreeSocketPort();
+ if (port == 0) {
+ continue;
+ }
+ freePorts.add(port);
+ if (freePorts.size() == numOfPorts) {
+ return freePorts;
+ }
+ }
+ throw new IllegalStateException(numOfPorts + " free ports could not be acquired.");
+ }
+
/**
* Return an @{@link InetAddress} to bind to. If bindWildCardAddress is true
* than returns null.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
index 1c5a5dd6b84..15f8f1164a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.qjournal;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
import static org.junit.Assert.fail;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -44,13 +45,16 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.test.GenericTestUtils;
-public class MiniJournalCluster {
+public final class MiniJournalCluster implements Closeable {
+
public static final String CLUSTER_WAITACTIVE_URI = "waitactive";
public static class Builder {
private String baseDir;
private int numJournalNodes = 3;
private boolean format = true;
private final Configuration conf;
+ private int[] httpPorts = null;
+ private int[] rpcPorts = null;
static {
DefaultMetricsSystem.setMiniClusterMode(true);
@@ -75,6 +79,16 @@ public class MiniJournalCluster {
return this;
}
+ public Builder setHttpPorts(int... ports) {
+ this.httpPorts = ports;
+ return this;
+ }
+
+ public Builder setRpcPorts(int... ports) {
+ this.rpcPorts = ports;
+ return this;
+ }
+
public MiniJournalCluster build() throws IOException {
return new MiniJournalCluster(this);
}
@@ -98,6 +112,19 @@ public class MiniJournalCluster {
private final JNInfo[] nodes;
private MiniJournalCluster(Builder b) throws IOException {
+
+ if (b.httpPorts != null && b.httpPorts.length != b.numJournalNodes) {
+ throw new IllegalArgumentException(
+ "Num of http ports (" + b.httpPorts.length + ") should match num of JournalNodes ("
+ + b.numJournalNodes + ")");
+ }
+
+ if (b.rpcPorts != null && b.rpcPorts.length != b.numJournalNodes) {
+ throw new IllegalArgumentException(
+ "Num of rpc ports (" + b.rpcPorts.length + ") should match num of JournalNodes ("
+ + b.numJournalNodes + ")");
+ }
+
LOG.info("Starting MiniJournalCluster with " +
b.numJournalNodes + " journal nodes");
@@ -172,8 +199,10 @@ public class MiniJournalCluster {
Configuration conf = new Configuration(b.conf);
File logDir = getStorageDir(idx);
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString());
- conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:0");
- conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:0");
+ int httpPort = b.httpPorts != null ? b.httpPorts[idx] : 0;
+ int rpcPort = b.rpcPorts != null ? b.rpcPorts[idx] : 0;
+ conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:" + rpcPort);
+ conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:" + httpPort);
return conf;
}
@@ -273,4 +302,10 @@ public class MiniJournalCluster {
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, quorumJournalURI.toString());
}
}
+
+ @Override
+ public void close() throws IOException {
+ this.shutdown();
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java
index cace7c92891..ccbbc94c99e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java
@@ -22,15 +22,23 @@ import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
-import org.junit.Test;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestMiniJournalCluster {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestMiniJournalCluster.class);
+
@Test
public void testStartStop() throws IOException {
Configuration conf = new Configuration();
@@ -52,4 +60,92 @@ public class TestMiniJournalCluster {
c.shutdown();
}
}
+
+ @Test
+ public void testStartStopWithPorts() throws Exception {
+ Configuration conf = new Configuration();
+
+ LambdaTestUtils.intercept(
+ IllegalArgumentException.class,
+ "Num of http ports (1) should match num of JournalNodes (3)",
+ "MiniJournalCluster port validation failed",
+ () -> {
+ new MiniJournalCluster.Builder(conf).setHttpPorts(8481).build();
+ });
+
+ LambdaTestUtils.intercept(
+ IllegalArgumentException.class,
+ "Num of rpc ports (2) should match num of JournalNodes (3)",
+ "MiniJournalCluster port validation failed",
+ () -> {
+ new MiniJournalCluster.Builder(conf).setRpcPorts(8481, 8482).build();
+ });
+
+ LambdaTestUtils.intercept(
+ IllegalArgumentException.class,
+ "Num of rpc ports (1) should match num of JournalNodes (3)",
+ "MiniJournalCluster port validation failed",
+ () -> {
+ new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 10000).setRpcPorts(8481)
+ .build();
+ });
+
+ LambdaTestUtils.intercept(
+ IllegalArgumentException.class,
+ "Num of http ports (4) should match num of JournalNodes (3)",
+ "MiniJournalCluster port validation failed",
+ () -> {
+ new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 1000, 2000)
+ .setRpcPorts(8481, 8482, 8483).build();
+ });
+
+ final Set<Integer> httpAndRpcPorts = NetUtils.getFreeSocketPorts(6);
+ LOG.info("Free socket ports: {}", httpAndRpcPorts);
+
+ for (Integer httpAndRpcPort : httpAndRpcPorts) {
+ assertNotEquals("None of the acquired socket port should not be zero", 0,
+ httpAndRpcPort.intValue());
+ }
+
+ final int[] httpPorts = new int[3];
+ final int[] rpcPorts = new int[3];
+ int httpPortIdx = 0;
+ int rpcPortIdx = 0;
+ for (Integer httpAndRpcPort : httpAndRpcPorts) {
+ if (httpPortIdx < 3) {
+ httpPorts[httpPortIdx++] = httpAndRpcPort;
+ } else {
+ rpcPorts[rpcPortIdx++] = httpAndRpcPort;
+ }
+ }
+
+ LOG.info("Http ports selected: {}", httpPorts);
+ LOG.info("Rpc ports selected: {}", rpcPorts);
+
+ try (MiniJournalCluster miniJournalCluster = new MiniJournalCluster.Builder(conf)
+ .setHttpPorts(httpPorts)
+ .setRpcPorts(rpcPorts).build()) {
+ miniJournalCluster.waitActive();
+ URI uri = miniJournalCluster.getQuorumJournalURI("myjournal");
+ String[] addrs = uri.getAuthority().split(";");
+ assertEquals(3, addrs.length);
+
+ assertEquals(httpPorts[0], miniJournalCluster.getJournalNode(0).getHttpAddress().getPort());
+ assertEquals(httpPorts[1], miniJournalCluster.getJournalNode(1).getHttpAddress().getPort());
+ assertEquals(httpPorts[2], miniJournalCluster.getJournalNode(2).getHttpAddress().getPort());
+
+ assertEquals(rpcPorts[0],
+ miniJournalCluster.getJournalNode(0).getRpcServer().getAddress().getPort());
+ assertEquals(rpcPorts[1],
+ miniJournalCluster.getJournalNode(1).getRpcServer().getAddress().getPort());
+ assertEquals(rpcPorts[2],
+ miniJournalCluster.getJournalNode(2).getRpcServer().getAddress().getPort());
+
+ JournalNode node = miniJournalCluster.getJournalNode(0);
+ String dir = node.getConf().get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY);
+ assertEquals(new File(MiniDFSCluster.getBaseDirectory() + "journalnode-0").getAbsolutePath(),
+ dir);
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org