You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2017/05/28 07:25:35 UTC
hadoop git commit: HDFS-11865. Ozone: Do not initialize Ratis cluster
during datanode startup.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 e641bee7b -> c1d714d93
HDFS-11865. Ozone: Do not initialize Ratis cluster during datanode startup.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c1d714d9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c1d714d9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c1d714d9
Branch: refs/heads/HDFS-7240
Commit: c1d714d93338dbc172114bf176cc821850cb5a65
Parents: e641bee
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Sun May 28 15:19:32 2017 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Sun May 28 15:19:32 2017 +0800
----------------------------------------------------------------------
.../apache/hadoop/scm/XceiverClientRatis.java | 28 +-----
.../main/java/org/apache/ratis/RatisHelper.java | 93 ++++++++++++++++++++
.../java/org/apache/ratis/package-info.java | 22 +++++
.../com/google/protobuf/package-info.java | 22 +++++
.../apache/hadoop/ozone/OzoneConfigKeys.java | 7 +-
.../server/ratis/XceiverServerRatis.java | 40 ++++-----
.../transport/server/ratis/package-info.java | 23 +++++
.../apache/hadoop/ozone/MiniOzoneCluster.java | 29 +++---
.../apache/hadoop/ozone/RatisTestHelper.java | 11 +--
.../ozone/container/ContainerTestHelper.java | 26 +++++-
.../ozoneimpl/TestOzoneContainerRatis.java | 34 +++++--
.../transport/server/TestContainerServer.java | 29 ++++--
hadoop-project/pom.xml | 2 +-
13 files changed, 272 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
index 738a588..a0ad24e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
@@ -22,13 +22,10 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.ratis.RatisHelper;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.ratis.client.ClientFactory;
import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
@@ -36,10 +33,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
/**
* An abstract implementation of {@link XceiverClientSpi} using Ratis.
@@ -67,24 +62,6 @@ public final class XceiverClientRatis implements XceiverClientSpi {
this.rpcType = rpcType;
}
- static RaftClient newRaftClient(Pipeline pipeline, RpcType rpcType) {
- final List<RaftPeer> peers = pipeline.getMachines().stream()
- .map(dn -> dn.getXferAddr())
- .map(addr -> new RaftPeer(new RaftPeerId(addr), addr))
- .collect(Collectors.toList());
-
- final RaftProperties properties = new RaftProperties();
- final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(
- properties, null));
-
- return RaftClient.newBuilder()
- .setClientRpc(factory.newRaftClientRpc())
- .setServers(peers)
- .setLeaderId(new RaftPeerId(pipeline.getLeader().getXferAddr()))
- .setProperties(properties)
- .build();
- }
-
@Override
public Pipeline getPipeline() {
return pipeline;
@@ -92,7 +69,8 @@ public final class XceiverClientRatis implements XceiverClientSpi {
@Override
public void connect() throws Exception {
- if (!client.compareAndSet(null, newRaftClient(pipeline, rpcType))) {
+ if (!client.compareAndSet(null,
+ RatisHelper.newRaftClient(rpcType, getPipeline()))) {
throw new IllegalStateException("Client is already connected.");
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
new file mode 100644
index 0000000..bedd9a8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.RpcType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Ratis helper methods.
+ */
+public interface RatisHelper {
+ Logger LOG = LoggerFactory.getLogger(RatisHelper.class);
+
+ static String toRaftPeerIdString(DatanodeID id) {
+ return id.getIpAddr() + ":" + id.getContainerPort();
+ }
+
+ static RaftPeerId toRaftPeerId(DatanodeID id) {
+ return RaftPeerId.valueOf(toRaftPeerIdString(id));
+ }
+
+ static RaftPeer toRaftPeer(String id) {
+ return new RaftPeer(RaftPeerId.valueOf(id), id);
+ }
+
+ static RaftPeer toRaftPeer(DatanodeID id) {
+ return toRaftPeer(toRaftPeerIdString(id));
+ }
+
+ static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
+ return pipeline.getMachines().stream()
+ .map(RatisHelper::toRaftPeer)
+ .collect(Collectors.toList());
+ }
+
+ static RaftPeer[] toRaftPeerArray(Pipeline pipeline) {
+ return toRaftPeers(pipeline).toArray(RaftPeer.EMPTY_PEERS);
+ }
+
+ static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) {
+ return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()),
+ toRaftPeers(pipeline));
+ }
+
+ static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) {
+ return newRaftClient(rpcType, leader.getId(),
+ new ArrayList<>(Arrays.asList(leader)));
+ }
+
+ static RaftClient newRaftClient(
+ RpcType rpcType, RaftPeerId leader, List<RaftPeer> peers) {
+ LOG.trace("newRaftClient: {}, leader={}, peers={}", rpcType, leader, peers);
+ final RaftProperties properties = new RaftProperties();
+ final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(
+ properties, null));
+
+ return RaftClient.newBuilder()
+ .setClientRpc(factory.newRaftClientRpc())
+ .setServers(peers)
+ .setLeaderId(leader)
+ .setProperties(properties)
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java
new file mode 100644
index 0000000..c13c20c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+/**
+ * This package contains classes related to Apache Ratis.
+ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java
new file mode 100644
index 0000000..032dd96
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.shaded.com.google.protobuf;
+
+/**
+ * This package contains classes related to the shaded protobuf in Apache Ratis.
+ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index feca620..7489054 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -83,10 +83,9 @@ public final class OzoneConfigKeys {
= ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY;
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT;
- public static final String DFS_CONTAINER_RATIS_CONF =
- "dfs.container.ratis.conf";
- public static final String DFS_CONTAINER_RATIS_DATANODE_ADDRESS =
- "dfs.container.ratis.datanode.address";
+ /** A unique ID to identify a Ratis server. */
+ public static final String DFS_CONTAINER_RATIS_SERVER_ID =
+ "dfs.container.ratis.server.id";
public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
"dfs.container.ratis.datanode.storage.dir";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 4c82ac2..69f3801 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -18,9 +18,7 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@@ -28,23 +26,25 @@ import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Collections;
+import java.util.Objects;
/**
* Creates a ratis server endpoint that acts as the communication layer for
* Ozone containers.
*/
public final class XceiverServerRatis implements XceiverServerSpi {
+ static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
+
static RaftProperties newRaftProperties(
RpcType rpc, int port, String storageDir) {
final RaftProperties properties = new RaftProperties();
@@ -62,37 +62,31 @@ public final class XceiverServerRatis implements XceiverServerSpi {
Configuration ozoneConf, ContainerDispatcher dispatcher)
throws IOException {
final String id = ozoneConf.get(
- OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS);
- final Collection<String> servers = ozoneConf.getStringCollection(
- OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF);
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID);
+ final int port = ozoneConf.getInt(
+ OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+ OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
final String storageDir = ozoneConf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
final String rpcType = ozoneConf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
- return new XceiverServerRatis(id, servers, storageDir, dispatcher, rpc);
+ return new XceiverServerRatis(id, port, storageDir, dispatcher, rpc);
}
private final int port;
private final RaftServer server;
private XceiverServerRatis(
- String id, Collection<String> servers, String storageDir,
+ String id, int port, String storageDir,
ContainerDispatcher dispatcher, RpcType rpcType) throws IOException {
- Preconditions.checkArgument(servers.contains(id),
- "%s is not one of %s specified in %s",
- id, servers, OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF);
-
- final List<RaftPeer> peers = servers.stream()
- .map(addr -> new RaftPeer(new RaftPeerId(addr), addr))
- .collect(Collectors.toList());
-
- this.port = NetUtils.createSocketAddr(id).getPort();
+ Objects.requireNonNull(id, "id == null");
+ this.port = port;
this.server = RaftServer.newBuilder()
- .setServerId(new RaftPeerId(id))
- .setPeers(peers)
+ .setServerId(RaftPeerId.valueOf(id))
+ .setPeers(Collections.emptyList())
.setProperties(newRaftProperties(rpcType, port, storageDir))
.setStateMachine(new ContainerStateMachine(dispatcher))
.build();
@@ -100,6 +94,8 @@ public final class XceiverServerRatis implements XceiverServerSpi {
@Override
public void start() throws IOException {
+ LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
+ server.getId(), getIPCPort());
server.start();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
new file mode 100644
index 0000000..8debfe0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+/**
+ * This package contains classes for the server implementation
+ * using Apache Ratis
+ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 173b911..5cfcaff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.ksm.KeySpaceManager;
import org.apache.hadoop.scm.ScmConfigKeys;
@@ -102,12 +103,11 @@ public final class MiniOzoneCluster extends MiniDFSCluster
if (!useRatis) {
return;
}
- final String[] ids = dnConf.getStrings(
- OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF);
- // TODO: use the i-th raft server as the i-th datanode address
- // this only work for one Raft cluster
- setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS,
- ids[i]);
+ final String address = ContainerTestHelper.createLocalAddress();
+ setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID,
+ address);
+ setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+ String.valueOf(NetUtils.createSocketAddr(address).getPort()));
setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
getInstanceStorageDir(i, -1).getCanonicalPath());
}
@@ -206,16 +206,13 @@ public final class MiniOzoneCluster extends MiniDFSCluster
*/
public void waitOzoneReady() throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
- if (scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY)
- >= numDataNodes) {
- return true;
- }
- LOG.info("Waiting for cluster to be ready. Got {} of {} DN Heartbeats.",
- scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY),
- numDataNodes);
-
- return false;
- }, 1000, 5 * 60 * 1000); //wait for 5 mins.
+ final int healthy = scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY);
+ final boolean isReady = healthy >= numDataNodes;
+ LOG.info("{}. Got {} of {} DN Heartbeats.",
+ isReady? "Cluster is ready" : "Waiting for cluster to be ready",
+ healthy, numDataNodes);
+ return isReady;
+ }, 1000, 60 * 1000); //wait for 1 min.
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
index d56fad2..89664eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
@@ -26,7 +26,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.stream.Collectors;
/**
* Helpers for Ratis tests.
@@ -34,23 +33,17 @@ import java.util.stream.Collectors;
public interface RatisTestHelper {
Logger LOG = LoggerFactory.getLogger(RatisTestHelper.class);
- static void initRatisConf(
- RpcType rpc, Pipeline pipeline, Configuration conf) {
+ static void initRatisConf(RpcType rpc, Configuration conf) {
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, true);
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, rpc.name());
LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY
+ " = " + rpc.name());
- final String s = pipeline.getMachines().stream()
- .map(dn -> dn.getXferAddr())
- .collect(Collectors.joining(","));
- conf.setStrings(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF, s);
- LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF + " = " + s);
}
static XceiverClientRatis newXceiverClientRatis(
RpcType rpcType, Pipeline pipeline, OzoneConfiguration conf)
throws IOException {
- initRatisConf(rpcType, pipeline, conf);
+ initRatisConf(rpcType, conf);
return XceiverClientRatis.newXceiverClientRatis(pipeline, conf);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index a1abfeb..6db7621 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -80,6 +80,11 @@ public final class ContainerTestHelper {
return createPipeline(containerName, 1);
}
+ public static String createLocalAddress() throws IOException {
+ try(ServerSocket s = new ServerSocket(0)) {
+ return "127.0.0.1:" + s.getLocalPort();
+ }
+ }
public static DatanodeID createDatanodeID() throws IOException {
ServerSocket socket = new ServerSocket(0);
int port = socket.getLocalPort();
@@ -100,13 +105,26 @@ public final class ContainerTestHelper {
public static Pipeline createPipeline(String containerName, int numNodes)
throws IOException {
Preconditions.checkArgument(numNodes >= 1);
- final DatanodeID leader = createDatanodeID();
- Pipeline pipeline = new Pipeline(leader.getDatanodeUuid());
+ final List<DatanodeID> ids = new ArrayList<>(numNodes);
+ for(int i = 0; i < numNodes; i++) {
+ ids.add(createDatanodeID());
+ }
+ return createPipeline(containerName, ids);
+ }
+
+ public static Pipeline createPipeline(
+ String containerName, Iterable<DatanodeID> ids)
+ throws IOException {
+ Objects.requireNonNull(ids, "ids == null");
+ final Iterator<DatanodeID> i = ids.iterator();
+ Preconditions.checkArgument(i.hasNext());
+ final DatanodeID leader = i.next();
+ final Pipeline pipeline = new Pipeline(leader.getDatanodeUuid());
pipeline.setContainerName(containerName);
pipeline.addMember(leader);
- for(int i = 1; i < numNodes; i++) {
- pipeline.addMember(createDatanodeID());
+ for(; i.hasNext();) {
+ pipeline.addMember(i.next());
}
return pipeline;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java
index 2662909..3adb881 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java
@@ -18,23 +18,31 @@
package org.apache.hadoop.ozone.container.ozoneimpl;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.ratis.RatisHelper;
+import org.apache.hadoop.scm.XceiverClientRatis;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.CheckedBiConsumer;
+import org.apache.ratis.util.CollectionUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
/**
* Tests ozone containers with Apache Ratis.
*/
@@ -78,19 +86,31 @@ public class TestOzoneContainerRatis {
throws Exception {
LOG.info(testName + "(rpc=" + rpc + ", numNodes=" + numNodes);
+ // create Ozone clusters
final OzoneConfiguration conf = newOzoneConfiguration();
- final String containerName = OzoneUtils.getRequestID();
- final Pipeline pipeline = ContainerTestHelper.createPipeline(
- containerName, numNodes);
- final XceiverClientSpi client = RatisTestHelper.newXceiverClientRatis(
- rpc, pipeline, conf);
-
+ RatisTestHelper.initRatisConf(rpc, conf);
final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL)
- .numDataNodes(pipeline.getMachines().size())
+ .numDataNodes(numNodes)
.build();
cluster.waitOzoneReady();
+ final String containerName = OzoneUtils.getRequestID();
+ final List<DataNode> datanodes = cluster.getDataNodes();
+ final Pipeline pipeline = ContainerTestHelper.createPipeline(containerName,
+ CollectionUtils.as(datanodes, DataNode::getDatanodeId));
+
+ LOG.info("pipeline=" + pipeline);
+ // Create Ratis cluster
+ final RaftPeer[] peers = RatisHelper.toRaftPeerArray(pipeline);
+ for(RaftPeer p : peers) {
+ final RaftClient client = RatisHelper.newRaftClient(rpc, p);
+ client.reinitialize(peers, p.getId());
+ }
+
+ LOG.info("reinitialize done");
+ final XceiverClientSpi client = XceiverClientRatis.newXceiverClientRatis(
+ pipeline, conf);
try {
test.accept(containerName, client);
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
index 5fc6a7c..ad64cae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
@@ -40,7 +40,11 @@ import org.apache.hadoop.scm.XceiverClientRatis;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.RatisHelper;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.util.CheckedBiConsumer;
import org.junit.Assert;
import org.junit.Test;
@@ -92,7 +96,8 @@ public class TestContainerServer {
(pipeline, conf) -> conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort()),
XceiverClient::new,
- (dn, conf) -> new XceiverServer(conf, new TestContainerDispatcher()));
+ (dn, conf) -> new XceiverServer(conf, new TestContainerDispatcher()),
+ (dn, p) -> {});
}
@FunctionalInterface
@@ -115,7 +120,8 @@ public class TestContainerServer {
static XceiverServerRatis newXceiverServerRatis(
DatanodeID dn, OzoneConfiguration conf) throws IOException {
final String id = dn.getXferAddr();
- conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS, id);
+ conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID, id);
+ conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, dn.getContainerPort());
final String dir = TEST_DIR + id.replace(':', '_');
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
@@ -123,13 +129,22 @@ public class TestContainerServer {
return XceiverServerRatis.newXceiverServerRatis(conf, dispatcher);
}
+ static void initXceiverServerRatis(
+ RpcType rpc, DatanodeID id, Pipeline pipeline) throws IOException {
+ final RaftPeer p = RatisHelper.toRaftPeer(id);
+ final RaftPeer[] peers = RatisHelper.toRaftPeerArray(pipeline);
+ final RaftClient client = RatisHelper.newRaftClient(rpc, p);
+ client.reinitialize(peers, p.getId());
+ }
+
+
static void runTestClientServerRatis(RpcType rpc, int numNodes)
throws Exception {
runTestClientServer(numNodes,
- (pipeline, conf) -> RatisTestHelper.initRatisConf(
- rpc, pipeline, conf),
+ (pipeline, conf) -> RatisTestHelper.initRatisConf(rpc, conf),
XceiverClientRatis::newXceiverClientRatis,
- TestContainerServer::newXceiverServerRatis);
+ TestContainerServer::newXceiverServerRatis,
+ (dn, p) -> initXceiverServerRatis(rpc, dn, p));
}
static void runTestClientServer(
@@ -138,7 +153,8 @@ public class TestContainerServer {
CheckedBiFunction<Pipeline, OzoneConfiguration, XceiverClientSpi,
IOException> createClient,
CheckedBiFunction<DatanodeID, OzoneConfiguration, XceiverServerSpi,
- IOException> createServer)
+ IOException> createServer,
+ CheckedBiConsumer<DatanodeID, Pipeline, IOException> initServer)
throws Exception {
final List<XceiverServerSpi> servers = new ArrayList<>();
XceiverClientSpi client = null;
@@ -153,6 +169,7 @@ public class TestContainerServer {
final XceiverServerSpi s = createServer.apply(dn, conf);
servers.add(s);
s.start();
+ initServer.accept(dn, pipeline);
}
client = createClient.apply(pipeline, conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index a5e7720..7d8460a 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -99,7 +99,7 @@
<ldap-api.version>1.0.0-M33</ldap-api.version>
<!-- Apache Ratis version -->
- <ratis.version>0.1-SNAPSHOT</ratis.version>
+ <ratis.version>0.1.1-alpha-SNAPSHOT</ratis.version>
<!-- define the Java language version used by the compiler -->
<javac.version>1.8</javac.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org