You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2021/02/28 22:45:32 UTC
[ozone] branch HDDS-2823 updated: HDDS-4718. Bootstrap new SCM node
(#1953)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new 920381c HDDS-4718. Bootstrap new SCM node (#1953)
920381c is described below
commit 920381c56412a275d9138726b2b930adad8e18d3
Author: bshashikant <sh...@apache.org>
AuthorDate: Mon Mar 1 04:15:08 2021 +0530
HDDS-4718. Bootstrap new SCM node (#1953)
---
.../java/org/apache/hadoop/hdds/HddsUtils.java | 4 +-
.../org/apache/hadoop/hdds/scm/AddSCMRequest.java | 113 ++++
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 6 +-
.../hadoop/hdds/scm/ha/SCMHAConfiguration.java | 20 +-
.../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java | 32 +
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 3 +-
.../org/apache/hadoop/ozone/audit/SCMAction.java | 3 +-
.../apache/hadoop/ozone/common/StorageInfo.java | 8 +-
.../common/src/main/resources/ozone-default.xml | 11 +-
.../transport/server/ratis/XceiverServerRatis.java | 3 +
.../scm/protocol/ScmBlockLocationProtocol.java | 6 +
...lockLocationProtocolClientSideTranslatorPB.java | 21 +
.../SCMBlockLocationFailoverProxyProvider.java | 2 +-
.../java/org/apache/hadoop/hdds/utils/HAUtils.java | 25 +-
.../interface-client/src/main/proto/hdds.proto | 10 +
.../src/main/proto/ScmServerProtocol.proto | 3 +
.../hadoop/hdds/scm/ha/InterSCMGrpcClient.java | 4 +-
.../hdds/scm/ha/InterSCMGrpcProtocolService.java | 4 +-
.../hadoop/hdds/scm/ha/MockSCMHAManager.java | 11 +
.../org/apache/hadoop/hdds/scm/ha/RatisUtil.java | 10 +-
.../apache/hadoop/hdds/scm/ha/SCMHAManager.java | 4 +
.../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 40 +-
.../hadoop/hdds/scm/ha/SCMHANodeDetails.java | 24 +-
.../apache/hadoop/hdds/scm/ha/SCMNodeDetails.java | 20 +-
.../apache/hadoop/hdds/scm/ha/SCMRatisServer.java | 3 +
.../hadoop/hdds/scm/ha/SCMRatisServerImpl.java | 327 ++++------
.../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 79 ++-
...lockLocationProtocolServerSideTranslatorPB.java | 14 +
.../hdds/scm/server/SCMBlockProtocolServer.java | 28 +
.../hadoop/hdds/scm/server/SCMStorageConfig.java | 33 +-
.../hdds/scm/server/StorageContainerManager.java | 47 +-
.../hadoop/hdds/scm/TestHddsServerUtils.java | 13 -
.../hdds/scm/ha/TestReplicationAnnotation.java | 7 +
.../hadoop/ozone/client/TestHddsClientUtils.java | 22 -
.../org/apache/hadoop/ozone/ha/NodeDetails.java | 9 +
.../apache/hadoop/ozone/MiniOzoneChaosCluster.java | 26 +-
.../apache/hadoop/hdds/scm/TestSCMSnapshot.java | 4 +
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 34 +-
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 89 ++-
.../hadoop/ozone/MiniOzoneHAClusterImpl.java | 718 +++++++++++++++++++++
.../hadoop/ozone/MiniOzoneOMHAClusterImpl.java | 353 +---------
.../hadoop/ozone/TestSecureOzoneCluster.java | 7 +
.../hadoop/ozone/TestStorageContainerManager.java | 21 +-
.../client/rpc/TestContainerStateMachine.java | 1 +
.../ozone/scm/TestStorageContainerManagerHA.java | 211 ++++++
.../ozone/om/ScmBlockLocationTestingClient.java | 6 +
46 files changed, 1706 insertions(+), 733 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 6c7bff7..bcf7f30 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -311,8 +311,8 @@ public final class HddsUtils {
public static InetSocketAddress getSingleSCMAddress(
ConfigurationSource conf) {
Collection<InetSocketAddress> singleton = getSCMAddresses(conf);
- Preconditions.checkArgument(singleton.size() == 1,
- MULTIPLE_SCM_NOT_YET_SUPPORTED);
+ // Preconditions.checkArgument(singleton.size() == 1,
+ // MULTIPLE_SCM_NOT_YET_SUPPORTED);
return singleton.iterator().next();
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/AddSCMRequest.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/AddSCMRequest.java
new file mode 100644
index 0000000..8133c57
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/AddSCMRequest.java
@@ -0,0 +1,113 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+/**
+ * Class for ADD SCM request to be sent by Bootstrapping SCM to existing
+ * leader SCM.
+ */
+public class AddSCMRequest {
+
+ private final String clusterId;
+ private final String scmId;
+ private final String ratisAddr;
+
+ public AddSCMRequest(String clusterId, String scmId, String addr) {
+ this.clusterId = clusterId;
+ this.scmId = scmId;
+ this.ratisAddr = addr;
+ }
+
+ public static AddSCMRequest getFromProtobuf(
+ HddsProtos.AddScmRequestProto proto) {
+ return new Builder().setClusterId(proto.getClusterId())
+ .setScmId(proto.getScmId()).setRatisAddr(proto.getRatisAddr()).build();
+ }
+
+ public HddsProtos.AddScmRequestProto getProtobuf() {
+ return HddsProtos.AddScmRequestProto.newBuilder().setClusterId(clusterId)
+ .setScmId(scmId).setRatisAddr(ratisAddr).build();
+ }
+ /**
+ * Builder for AddSCMRequest.
+ */
+ public static class Builder {
+ private String clusterId;
+ private String scmId;
+ private String ratisAddr;
+
+
+ /**
+ * sets the cluster id.
+ * @param cid clusterId to be set
+ * @return Builder for AddSCMRequest
+ */
+ public AddSCMRequest.Builder setClusterId(String cid) {
+ this.clusterId = cid;
+ return this;
+ }
+
+ /**
+ * sets the scmId.
+ * @param id scmId
+ * @return Builder for AddSCMRequest
+ */
+ public AddSCMRequest.Builder setScmId(String id) {
+ this.scmId = id;
+ return this;
+ }
+
+ /**
+ * Set ratis address in Scm HA.
+ * @param addr address in the format of [ip|hostname]:port
+ * @return Builder for AddSCMRequest
+ */
+ public AddSCMRequest.Builder setRatisAddr(String addr) {
+ this.ratisAddr = addr;
+ return this;
+ }
+
+ public AddSCMRequest build() {
+ return new AddSCMRequest(clusterId, scmId, ratisAddr);
+ }
+ }
+
+ /**
+ * Gets the clusterId from the Version file.
+ * @return ClusterId
+ */
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ /**
+ * Gets the SCM Id from the Version file.
+ * @return SCM Id
+ */
+ public String getScmId() {
+ return scmId;
+ }
+
+ public String getRatisAddr() {
+ return ratisAddr;
+ }
+
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 5c77cd9..1ffcda1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -384,7 +384,11 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_RATIS_PORT_KEY
= "ozone.scm.ratis.port";
public static final int OZONE_SCM_RATIS_PORT_DEFAULT
- = 9864;
+ = 9865;
+ public static final String OZONE_SCM_GRPC_PORT_KEY
+ = "ozone.scm.grpc.port";
+ public static final int OZONE_SCM_GRPC_PORT_DEFAULT
+ = 9866;
public static final String OZONE_SCM_RATIS_RPC_TYPE_KEY
= "ozone.scm.ratis.rpc.type";
public static final String OZONE_SCM_RATIS_RPC_TYPE_DEFAULT
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
index f093e45..ddf008c 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
@@ -149,12 +149,12 @@ public class SCMHAConfiguration {
@Config(key = "ratis.leader.election.timeout",
type = ConfigType.TIME,
- defaultValue = "1s",
+ defaultValue = "5s",
tags = {SCM, OZONE, HA, RATIS},
description = "The minimum timeout duration for SCM ratis leader" +
" election. Default is 1s."
)
- private long ratisLeaderElectionTimeout = 1 * 1000L;
+ private long ratisLeaderElectionTimeout = 5 * 1000L;
@Config(key = "ratis.server.failure.timeout.duration",
type = ConfigType.TIME,
@@ -185,15 +185,6 @@ public class SCMHAConfiguration {
)
private String ratisSnapshotDir;
- @Config(key = "grpc.bind.port",
- type = ConfigType.INT,
- defaultValue = "9899",
- tags = {OZONE, SCM, HA, RATIS},
- description = "Port used by SCM for Grpc Server."
- )
- // TODO: fix the default grpc port
- private int grpcBindPort = 9899;
-
@Config(key = "grpc.deadline.interval",
type = ConfigType.TIME,
defaultValue = "30m",
@@ -206,9 +197,6 @@ public class SCMHAConfiguration {
return grpcDeadlineInterval;
}
- public int getGrpcBindPort() {
- return grpcBindPort;
- }
public String getRatisStorageDir() {
return ratisStorageDir;
@@ -254,10 +242,6 @@ public class SCMHAConfiguration {
this.raftLogPurgeEnabled = enabled;
}
- public void setGrpcBindPort(int port) {
- this.grpcBindPort = port;
- }
-
public int getRaftLogPurgeGap() {
return raftLogPurgeGap;
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
index 28b3dcd..45c0401 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -22,14 +22,19 @@ package org.apache.hadoop.hdds.scm.ha;
import com.google.common.base.Strings;
import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.ozone.ha.ConfUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
+
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_DIR;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEFAULT_SERVICE_ID;
@@ -131,4 +136,31 @@ public final class SCMHAUtils {
}
return localScmServiceId;
}
+
+ /**
+ * Removes the self node from the list of nodes in the
+ * configuration.
+ * @param configuration OzoneConfiguration
+ * @return Updated OzoneConfiguration
+ */
+
+ public static OzoneConfiguration removeSelfId(
+ OzoneConfiguration configuration) {
+ final OzoneConfiguration conf = new OzoneConfiguration(configuration);
+ String scmNodes = conf.get(ConfUtils
+ .addKeySuffixes(ScmConfigKeys.OZONE_SCM_NODES_KEY,
+ getScmServiceId(conf)));
+ if (scmNodes != null) {
+ final String selfId = conf.get(ScmConfigKeys.OZONE_SCM_NODE_ID_KEY);
+ String[] parts = scmNodes.split(",");
+ List<String> partsLeft = new ArrayList<>();
+ for (String part : parts) {
+ if (!part.equals(selfId)) {
+ partsLeft.add(part);
+ }
+ }
+ conf.set(ScmConfigKeys.OZONE_SCM_NODES_KEY, String.join(",", partsLeft));
+ }
+ return conf;
+ }
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index a5d3c7d..9d82444 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -38,8 +38,7 @@ public final class OzoneConsts {
public static final String STORAGE_DIR = "scm";
public static final String SCM_ID = "scmUuid";
- public static final String SCM_NODES_INFO = "scmNodesInfo";
- public static final String SCM_NODES_SEPARATOR = "-";
+ public static final String CLUSTER_ID_PREFIX = "CID-";
public static final String OZONE_SIMPLE_ROOT_USER = "root";
public static final String OZONE_SIMPLE_HDFS_USER = "hdfs";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
index 102d47a..a4ae55e 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -44,7 +44,8 @@ public enum SCMAction implements AuditAction {
START_REPLICATION_MANAGER,
STOP_REPLICATION_MANAGER,
GET_REPLICATION_MANAGER_STATUS,
- GET_CONTAINER_WITH_PIPELINE_BATCH;
+ GET_CONTAINER_WITH_PIPELINE_BATCH,
+ ADD_SCM;
@Override
public String getAction() {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
index c88aaa9..492931d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.common;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
+import org.apache.hadoop.ozone.OzoneConsts;
import java.io.File;
import java.io.FileInputStream;
@@ -27,6 +28,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Properties;
+import java.util.UUID;
/**
* Common class for storage information. This class defines the common
@@ -197,11 +199,7 @@ public class StorageInfo {
* @return new clusterID
*/
public static String newClusterID() {
- // TODO:
- // Please check https://issues.apache.org/jira/browse/HDDS-4538
- // hard code clusterID and scmUuid on HDDS-2823,
- // so that multi SCMs won't cause chaos in Datanode side.
- return "CID-1df51ed9-19f1-4283-8f61-5d90a84c196c";
+ return OzoneConsts.CLUSTER_ID_PREFIX + UUID.randomUUID().toString();
}
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 8fac16d..d608720 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1890,7 +1890,7 @@
<property>
<name>ozone.scm.ratis.port</name>
- <value>9872</value>
+ <value>9865</value>
<tag>OZONE, SCM, HA, RATIS</tag>
<description>
The port number of the SCM's Ratis server.
@@ -1898,6 +1898,15 @@
</property>
<property>
+ <name>ozone.scm.grpc.port</name>
+ <value>9866</value>
+ <tag>OZONE, SCM, HA, RATIS</tag>
+ <description>
+ The port number of the SCM's grpc server.
+ </description>
+ </property>
+
+ <property>
<name>ozone.scm.ratis.rpc.type</name>
<value>GRPC</value>
<tag>OZONE, SCM, HA, RATIS</tag>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index dec60f4..f2c4e7e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -765,12 +765,15 @@ public final class XceiverServerRatis implements XceiverServerSpi {
clientId, server.getId(), nextCallId(), group);
RaftClientReply reply;
+ LOG.debug("Received addGroup request for pipeline {}", pipelineID);
+
try {
reply = server.groupManagement(request);
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
}
processReply(reply);
+ LOG.info("Created group {}", pipelineID);
}
@Override
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
index 9c9ba50..9137c6e 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.protocol;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.security.KerberosInfo;
@@ -78,6 +79,11 @@ public interface ScmBlockLocationProtocol extends Closeable {
ScmInfo getScmInfo() throws IOException;
/**
+ * Request to add SCM instance to HA group.
+ */
+ boolean addSCM(AddSCMRequest request) throws IOException;
+
+ /**
* Sort datanodes with distance to client.
* @param nodes list of network name of each node.
* @param clientMachine client address, depends, can be hostname or ipaddress.
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index a9d974c..0662a81 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SortDatanodesRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SortDatanodesResponseProto;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -248,6 +249,26 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
}
/**
+ * Request to add SCM to existing SCM HA group.
+ * @return status
+ * @throws IOException
+ */
+ @Override
+ public boolean addSCM(AddSCMRequest request) throws IOException {
+ HddsProtos.AddScmRequestProto requestProto =
+ request.getProtobuf();
+ HddsProtos.AddScmResponseProto resp;
+ SCMBlockLocationRequest wrapper = createSCMBlockRequest(
+ Type.AddScm)
+ .setAddScmRequestProto(requestProto)
+ .build();
+
+ final SCMBlockLocationResponse wrappedResponse =
+ handleError(submitRequest(wrapper));
+ resp = wrappedResponse.getAddScmResponse();
+ return resp.getSuccess();
+ }
+ /**
* Sort the datanodes based on distance from client.
* @return List<DatanodeDetails></>
* @throws IOException
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
index eb90b5a..d982cf5 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
@@ -143,7 +143,7 @@ public class SCMBlockLocationFailoverProxyProvider implements
nextProxyIndex();
} else {
if (!assignLeaderToNode(newLeader)) {
- LOG.debug("Failing over OM proxy to nodeId: {}", newLeader);
+ LOG.debug("Failing over SCM proxy to nodeId: {}", newLeader);
nextProxyIndex();
}
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
index 1e50364..7d662f6 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
@@ -17,17 +17,15 @@
package org.apache.hadoop.hdds.utils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
import org.apache.hadoop.hdds.tracing.TracingUtil;
-import org.apache.hadoop.io.retry.RetryPolicy;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
/**
* utility class used by SCM and OM for HA.
@@ -40,12 +38,7 @@ public final class HAUtils {
public static ScmInfo getScmInfo(OzoneConfiguration conf)
throws IOException {
try {
- RetryPolicy retryPolicy = retryUpToMaximumCountWithFixedSleep(
- 10, 5, TimeUnit.SECONDS);
- RetriableTask<ScmInfo> retriable = new RetriableTask<>(
- retryPolicy, "getScmInfo",
- () -> getScmBlockClient(conf).getScmInfo());
- return retriable.call();
+ return getScmBlockClient(conf).getScmInfo();
} catch (IOException e) {
throw e;
} catch (Exception e) {
@@ -53,6 +46,18 @@ public final class HAUtils {
}
}
+ public static boolean addSCM(OzoneConfiguration conf, AddSCMRequest request)
+ throws IOException {
+ OzoneConfiguration config = SCMHAUtils.removeSelfId(conf);
+ try {
+ return getScmBlockClient(config).addSCM(request);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException("Failed to add SCM", e);
+ }
+ }
+
/**
* Create a scm block client.
*
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 1636328..9b540ad 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -229,6 +229,16 @@ message GetScmInfoResponseProto {
repeated string peerRoles = 3;
}
+message AddScmRequestProto {
+ required string clusterId = 1;
+ required string scmId = 2;
+ required string ratisAddr = 3;
+}
+
+message AddScmResponseProto {
+ required bool success = 1;
+ optional string scmId = 2;
+}
enum ReplicationType {
RATIS = 1;
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index 51f2eef..1b73d6a 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -38,6 +38,7 @@ enum Type {
DeleteScmKeyBlocks = 12;
GetScmInfo = 13;
SortDatanodes = 14;
+ AddScm = 15;
}
message SCMBlockLocationRequest {
@@ -54,6 +55,7 @@ message SCMBlockLocationRequest {
optional DeleteScmKeyBlocksRequestProto deleteScmKeyBlocksRequest = 12;
optional hadoop.hdds.GetScmInfoRequestProto getScmInfoRequest = 13;
optional SortDatanodesRequestProto sortDatanodesRequest = 14;
+ optional hadoop.hdds.AddScmRequestProto addScmRequestProto = 15;
}
message SCMBlockLocationResponse {
@@ -77,6 +79,7 @@ message SCMBlockLocationResponse {
optional DeleteScmKeyBlocksResponseProto deleteScmKeyBlocksResponse = 12;
optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse = 13;
optional SortDatanodesResponseProto sortDatanodesResponse = 14;
+ optional hadoop.hdds.AddScmResponseProto addScmResponse = 15;
}
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
index fc347e5..2c4b98f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos;
import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointResponseProto;
import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolServiceGrpc;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
@@ -55,7 +56,8 @@ public class InterSCMGrpcClient implements SCMSnapshotDownloader{
public InterSCMGrpcClient(final String host, final ConfigurationSource conf) {
Preconditions.checkNotNull(conf);
- int port = conf.getObject(SCMHAConfiguration.class).getGrpcBindPort();
+ int port = conf.getInt(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY,
+ ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT);
timeout =
conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval();
NettyChannelBuilder channelBuilder =
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
index 9967530..d92220a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.thirdparty.io.grpc.Server;
@@ -46,7 +47,8 @@ public class InterSCMGrpcProtocolService {
public InterSCMGrpcProtocolService(final ConfigurationSource conf,
final StorageContainerManager scm) {
Preconditions.checkNotNull(conf);
- this.port = conf.getObject(SCMHAConfiguration.class).getGrpcBindPort();
+ this.port = conf.getInt(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY,
+ ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT);
NettyServerBuilder nettyServerBuilder =
((NettyServerBuilder) ServerBuilder.forPort(port))
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
index 55818c5..76d8c14 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@ -26,6 +26,7 @@ import java.util.Map;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
@@ -110,6 +111,11 @@ public final class MockSCMHAManager implements SCMHAManager {
ratisServer.stop();
}
+ @Override
+ public boolean addSCM(AddSCMRequest request) throws IOException {
+ return false;
+ }
+
private class MockRatisServer implements SCMRatisServer {
private Map<RequestType, Object> handlers =
@@ -221,5 +227,10 @@ public final class MockSCMHAManager implements SCMHAManager {
RaftPeerId.valueOf("peer"), RaftGroupId.randomId()),
null, new ArrayList<>());
}
+
+ @Override
+ public boolean addSCM(AddSCMRequest request) throws IOException {
+ return false;
+ }
}
}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
index db5ecf4..75db7bf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.ha;
import com.google.common.base.Strings;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.RaftProperties;
@@ -57,7 +58,7 @@ public final class RatisUtil {
// TODO: Check the default values.
final RaftProperties properties = new RaftProperties();
setRaftStorageDir(properties, haConf, conf);
- setRaftRpcProperties(properties, haConf);
+ setRaftRpcProperties(properties, haConf, conf);
setRaftLogProperties(properties, haConf);
setRaftRetryCacheProperties(properties, haConf);
setRaftSnapshotProperties(properties, haConf);
@@ -95,11 +96,12 @@ public final class RatisUtil {
* @param conf SCMHAConfiguration
*/
private static void setRaftRpcProperties(final RaftProperties properties,
- final SCMHAConfiguration conf) {
+ final SCMHAConfiguration conf, ConfigurationSource ozoneConf) {
RaftConfigKeys.Rpc.setType(properties,
RpcType.valueOf(conf.getRatisRpcType()));
- GrpcConfigKeys.Server.setPort(properties,
- conf.getRatisBindAddress().getPort());
+ GrpcConfigKeys.Server.setPort(properties, ozoneConf
+ .getInt(ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT));
GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf("32m"));
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index 4eb2eed..937f0e3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.hdds.scm.ha;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
+
import java.io.IOException;
/**
@@ -48,4 +50,6 @@ public interface SCMHAManager {
* Stops the HA service.
*/
void shutdown() throws IOException;
+
+ boolean addSCM(AddSCMRequest request) throws IOException;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index b49ba74..3ca648e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -17,8 +17,12 @@
package org.apache.hadoop.hdds.scm.ha;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.ratis.server.protocol.TermIndex;
import org.slf4j.Logger;
@@ -43,6 +47,7 @@ public class SCMHAManagerImpl implements SCMHAManager {
private final ConfigurationSource conf;
private final SCMDBTransactionBuffer transactionBuffer;
private final SCMSnapshotProvider scmSnapshotProvider;
+ private final StorageContainerManager scm;
// this should ideally be started only in a ratis leader
private final InterSCMGrpcProtocolService grpcServer;
@@ -53,10 +58,10 @@ public class SCMHAManagerImpl implements SCMHAManager {
public SCMHAManagerImpl(final ConfigurationSource conf,
final StorageContainerManager scm) throws IOException {
this.conf = conf;
+ this.scm = scm;
this.transactionBuffer =
new SCMDBTransactionBuffer(scm);
- this.ratisServer = new SCMRatisServerImpl(
- conf.getObject(SCMHAConfiguration.class), conf, scm, transactionBuffer);
+ this.ratisServer = new SCMRatisServerImpl(conf, scm, transactionBuffer);
this.scmSnapshotProvider = new SCMSnapshotProvider(conf,
scm.getSCMHANodeDetails().getPeerNodeDetails());
grpcServer = new InterSCMGrpcProtocolService(conf, scm);
@@ -69,6 +74,23 @@ public class SCMHAManagerImpl implements SCMHAManager {
@Override
public void start() throws IOException {
ratisServer.start();
+ if (ratisServer.getDivision().getGroup().getPeers().isEmpty()) {
+ // this is a bootstrapped node
+ // It will first try to add itself to existing ring
+ boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf),
+ new AddSCMRequest.Builder().setClusterId(scm.getClusterId())
+ .setScmId(scm.getScmId()).setRatisAddr(
+ scm.getSCMHANodeDetails().getLocalNodeDetails()
+ // TODO : Should we use IP instead of hostname??
+ .getRatisHostPortStr()).build());
+ if (!success) {
+ throw new IOException("Adding SCM to existing HA group failed");
+ }
+ } else {
+ LOG.info(" scm role is {} peers {}",
+ ratisServer.getDivision().getInfo().getCurrentRole(),
+ ratisServer.getDivision().getGroup().getPeers());
+ }
grpcServer.start();
}
@@ -153,4 +175,18 @@ public class SCMHAManagerImpl implements SCMHAManager {
ratisServer.stop();
grpcServer.stop();
}
+
+ @Override
+ public boolean addSCM(AddSCMRequest request) throws IOException {
+ String clusterId = scm.getClusterId();
+ if (!request.getClusterId().equals(scm.getClusterId())) {
+ throw new IOException(
+ "SCM " + request.getScmId() + " with addr " + request.getRatisAddr()
+ + " has cluster Id " + request.getClusterId()
+ + " but leader SCM cluster id is " + clusterId);
+ }
+ Preconditions.checkNotNull(
+ getRatisServer().getDivision().getGroup().getGroupId());
+ return getRatisServer().addSCM(request);
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHANodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHANodeDetails.java
index 499f2d5..b4d83e0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHANodeDetails.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHANodeDetails.java
@@ -54,6 +54,8 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HTTP_BIND_HOST_
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEFAULT_SERVICE_ID;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_BIND_HOST_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_KEY;
@@ -80,6 +82,7 @@ public class SCMHANodeDetails {
OZONE_SCM_SECURITY_SERVICE_PORT_KEY,
OZONE_SCM_SECURITY_SERVICE_BIND_HOST_KEY,
OZONE_SCM_RATIS_PORT_KEY,
+ OZONE_SCM_GRPC_PORT_KEY,
OZONE_SCM_HTTP_BIND_HOST_KEY,
OZONE_SCM_HTTPS_BIND_HOST_KEY,
OZONE_SCM_HTTP_ADDRESS_KEY,
@@ -107,10 +110,14 @@ public class SCMHANodeDetails {
int ratisPort = conf.getInt(
ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT);
+ int grpcPort = conf.getInt(
+ ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY,
+ ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT);
InetSocketAddress rpcAddress = new InetSocketAddress(
InetAddress.getLocalHost(), 0);
SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder()
.setRatisPort(ratisPort)
+ .setGrpcPort(grpcPort)
.setRpcAddress(rpcAddress)
.setDatanodeProtocolServerAddress(
HddsServerUtil.getScmDataNodeBindAddress(conf))
@@ -133,6 +140,7 @@ public class SCMHANodeDetails {
String localScmServiceId = null;
String localScmNodeId = null;
int localRatisPort = 0;
+ int localGrpcPort = 0;
Collection<String> scmServiceIds;
@@ -195,6 +203,11 @@ public class SCMHANodeDetails {
serviceId, nodeId);
int ratisPort = conf.getInt(ratisPortKey, OZONE_SCM_RATIS_PORT_DEFAULT);
+ String grpcPortKey = ConfUtils
+ .addKeySuffixes(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY, serviceId,
+ nodeId);
+ int grpcPort = conf.getInt(grpcPortKey, OZONE_SCM_GRPC_PORT_DEFAULT);
+
InetSocketAddress addr = null;
try {
addr = NetUtils.createSocketAddr(rpcAddrStr, ratisPort);
@@ -215,10 +228,11 @@ public class SCMHANodeDetails {
localScmServiceId = serviceId;
localScmNodeId = nodeId;
localRatisPort = ratisPort;
+ localGrpcPort = grpcPort;
found++;
} else {
peerNodesList.add(getHASCMNodeDetails(conf, serviceId,
- nodeId, addr, ratisPort));
+ nodeId, addr, ratisPort, grpcPort));
}
}
@@ -232,8 +246,9 @@ public class SCMHANodeDetails {
ConfUtils.setNodeSpecificConfigs(nodeSpecificConfigKeys, conf,
localScmServiceId, localScmNodeId, LOG);
- return new SCMHANodeDetails(getHASCMNodeDetails(conf, localScmServiceId,
- localScmNodeId, localRpcAddress, localRatisPort), peerNodesList);
+ return new SCMHANodeDetails(
+ getHASCMNodeDetails(conf, localScmServiceId, localScmNodeId,
+ localRpcAddress, localRatisPort, localGrpcPort), peerNodesList);
} else if (found > 1) {
throwConfException("Configuration has multiple %s addresses that " +
@@ -253,7 +268,7 @@ public class SCMHANodeDetails {
public static SCMNodeDetails getHASCMNodeDetails(OzoneConfiguration conf,
String localScmServiceId, String localScmNodeId,
- InetSocketAddress rpcAddress, int ratisPort) {
+ InetSocketAddress rpcAddress, int ratisPort, int grpcPort) {
Preconditions.checkNotNull(localScmServiceId);
Preconditions.checkNotNull(localScmNodeId);
@@ -261,6 +276,7 @@ public class SCMHANodeDetails {
builder
.setRpcAddress(rpcAddress)
.setRatisPort(ratisPort)
+ .setGrpcPort(grpcPort)
.setSCMServiceId(localScmServiceId)
.setSCMNodeId(localScmNodeId)
.setBlockProtocolServerAddress(
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
index 2da4146..3f56e6a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
@@ -36,7 +36,7 @@ public final class SCMNodeDetails extends NodeDetails {
private String clientProtocolServerAddressKey;
private InetSocketAddress datanodeProtocolServerAddress;
private String datanodeAddressKey;
-
+ private int grpcPort;
public static final Logger LOG =
LoggerFactory.getLogger(SCMNodeDetails.class);
@@ -45,15 +45,17 @@ public final class SCMNodeDetails extends NodeDetails {
*/
@SuppressWarnings("checkstyle:ParameterNumber")
private SCMNodeDetails(String serviceId, String nodeId,
- InetSocketAddress rpcAddr, int ratisPort, String httpAddress,
- String httpsAddress, InetSocketAddress blockProtocolServerAddress,
+ InetSocketAddress rpcAddr, int ratisPort, int grpcPort,
+ String httpAddress, String httpsAddress,
+ InetSocketAddress blockProtocolServerAddress,
InetSocketAddress clientProtocolServerAddress,
- InetSocketAddress datanodeProtocolServerAddress,
- RaftGroup group, RaftPeerId selfPeerId, String datanodeAddressKey,
+ InetSocketAddress datanodeProtocolServerAddress, RaftGroup group,
+ RaftPeerId selfPeerId, String datanodeAddressKey,
String blockProtocolServerAddressKey,
String clientProtocolServerAddressAddressKey) {
super(serviceId, nodeId, rpcAddr, ratisPort,
httpAddress, httpsAddress);
+ this.grpcPort = grpcPort;
this.blockProtocolServerAddress = blockProtocolServerAddress;
this.clientProtocolServerAddress = clientProtocolServerAddress;
this.datanodeProtocolServerAddress = datanodeProtocolServerAddress;
@@ -85,6 +87,7 @@ public final class SCMNodeDetails extends NodeDetails {
private String scmNodeId;
private InetSocketAddress rpcAddress;
private int ratisPort;
+ private int grpcPort;
private String httpAddr;
private String httpsAddr;
private InetSocketAddress blockProtocolServerAddress;
@@ -146,6 +149,11 @@ public final class SCMNodeDetails extends NodeDetails {
return this;
}
+ public Builder setGrpcPort(int port) {
+ this.grpcPort = port;
+ return this;
+ }
+
public Builder setSCMServiceId(String serviceId) {
this.scmServiceId = serviceId;
return this;
@@ -168,7 +176,7 @@ public final class SCMNodeDetails extends NodeDetails {
public SCMNodeDetails build() {
return new SCMNodeDetails(scmServiceId, scmNodeId, rpcAddress,
- ratisPort, httpAddr, httpsAddr, blockProtocolServerAddress,
+ ratisPort, grpcPort, httpAddr, httpsAddr, blockProtocolServerAddress,
clientProtocolServerAddress, datanodeProtocolServerAddress,
raftGroup, selfPeerId, datanodeAddressKey,
blockProtocolServerAddressKey, clientProtocolServerAddressKey);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
index d8a78be..c5a4bae 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.ha;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;
@@ -50,4 +51,6 @@ public interface SCMRatisServer {
* Returns NotLeaderException with useful info.
*/
NotLeaderException triggerNotLeaderException();
+
+ boolean addSCM(AddSCMRequest request) throws IOException;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 505e152..4157046 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -19,10 +19,6 @@ package org.apache.hadoop.hdds.scm.ha;
import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -36,12 +32,13 @@ import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.Time;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
@@ -51,6 +48,7 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.slf4j.Logger;
@@ -63,52 +61,100 @@ public class SCMRatisServerImpl implements SCMRatisServer {
private static final Logger LOG =
LoggerFactory.getLogger(SCMRatisServerImpl.class);
- private final RaftServer.Division division;
+ private final RaftServer server;
+ private final SCMStateMachine stateMachine;
private final StorageContainerManager scm;
- private final InetSocketAddress address;
private final ClientId clientId = ClientId.randomId();
private final AtomicLong callId = new AtomicLong();
+ private final RaftServer.Division division;
// TODO: Refactor and remove ConfigurationSource and use only
// SCMHAConfiguration.
- SCMRatisServerImpl(final SCMHAConfiguration haConf,
- final ConfigurationSource conf, final StorageContainerManager scm,
- final DBTransactionBuffer buffer) throws IOException {
+ SCMRatisServerImpl(final ConfigurationSource conf,
+ final StorageContainerManager scm, final DBTransactionBuffer buffer)
+ throws IOException {
this.scm = scm;
- this.address = haConf.getRatisBindAddress();
- RaftServer server = newRaftServer(scm.getClusterId(), scm.getScmId(),
- scm.getSCMHANodeDetails(), conf)
- .setStateMachine(new SCMStateMachine(scm, this, buffer)).build();
+ this.stateMachine = new SCMStateMachine(scm, this, buffer);
+ final RaftGroupId groupId = buildRaftGroupId(scm.getClusterId());
+ LOG.info("starting Raft server for scm:{}", scm.getScmId());
+ // During SCM startup, the bootstrapped node will be started just with
+ // groupId information, so that it won't trigger any leader election
+ // as it doesn't have any peer info.
+
+ // The primary SCM node which is initialized using scm --init command,
+ // will initialize the raft server with the peer info and it will be
+ // persisted in the raft log post leader election. Now, when the primary
+ // scm boots up, it has peer info embedded in the raft log and will
+ // trigger leader election.
+ this.server =
+ newRaftServer(scm.getScmId(), conf).setStateMachine(stateMachine)
+ .setGroup(RaftGroup.valueOf(groupId)).build();
+ this.division = server.getDivision(groupId);
+ }
+
+ public static void initialize(String clusterId, String scmId,
+ SCMNodeDetails details, OzoneConfiguration conf) throws IOException {
+ final RaftGroup group = buildRaftGroup(details, scmId, clusterId);
+ RaftServer server = newRaftServer(scmId, conf)
+ .setGroup(group).build();
+ server.start();
+ // TODO: Timeout and sleep interval should be made configurable
+ waitforLeaderToBeReady(server, 60000, group);
+ server.close();
+ }
+
+ private static void waitforLeaderToBeReady(RaftServer server, long timeout,
+ RaftGroup group)
+ throws IOException {
+ boolean ready;
+ long st = Time.monotonicNow();
+ do {
+ ready = server.getDivision(group.getGroupId()).getInfo().isLeaderReady();
+ if (!ready) {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } while (!ready && Time.monotonicNow() - st < timeout);
- this.division =
- server.getDivision(server.getGroups().iterator().next().getGroupId());
+ if (!ready) {
+ throw new IOException(String
+ .format("Ratis group %s is not ready in %d ms", group.getGroupId(),
+ timeout));
+ }
}
- public static RaftServer.Builder newRaftServer(final String clusterId,
- final String scmId, final SCMHANodeDetails haDetails,
- final ConfigurationSource conf) throws IOException {
- final String scmNodeId = haDetails.getLocalNodeDetails().getNodeId();
+ private static RaftServer.Builder newRaftServer(final String scmId,
+ final ConfigurationSource conf) {
final SCMHAConfiguration haConf = conf.getObject(SCMHAConfiguration.class);
- SCMHAGroupBuilder haGrpBuilder = scmNodeId != null ?
- new SCMHAGroupBuilder(haDetails, clusterId, scmId) :
- new SCMHAGroupBuilder(haConf, conf, clusterId);
final RaftProperties serverProperties =
RatisUtil.newRaftProperties(haConf, conf);
- return RaftServer.newBuilder().setServerId(haGrpBuilder.getPeerId())
- .setGroup(haGrpBuilder.getRaftGroup()).setProperties(serverProperties)
- .setStateMachine(new SCMStateMachine());
+ return RaftServer.newBuilder().setServerId(RaftPeerId.getRaftPeerId(scmId))
+ .setProperties(serverProperties)
+ .setStateMachine(new SCMStateMachine(false));
}
@Override
public void start() throws IOException {
- division.getRaftServer().start();
+ server.start();
+ }
+
+ @Override
+ public RaftServer.Division getDivision() {
+ return division;
+ }
+
+ @VisibleForTesting
+ public SCMStateMachine getStateMachine() {
+ return stateMachine;
}
@Override
public void registerStateMachineHandler(final RequestType handlerType,
final Object handler) {
- ((SCMStateMachine) division.getStateMachine())
- .registerHandler(handlerType, handler);
+ stateMachine.registerHandler(handlerType, handler);
}
@Override
@@ -116,16 +162,17 @@ public class SCMRatisServerImpl implements SCMRatisServer {
throws IOException, ExecutionException, InterruptedException {
final RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
.setClientId(clientId)
- .setServerId(division.getId())
- .setGroupId(division.getGroup().getGroupId())
+ .setServerId(getDivision().getId())
+ .setGroupId(getDivision().getGroup().getGroupId())
.setCallId(nextCallId())
.setMessage(request.encode())
.setType(RaftClientRequest.writeRequestType())
.build();
final RaftClientReply raftClientReply =
- division.getRaftServer()
- .submitClientRequestAsync(raftClientRequest)
- .get();
+ server.submitClientRequestAsync(raftClientRequest).get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("request {} Reply {}", raftClientRequest, raftClientReply);
+ }
return SCMRatisResponse.decode(raftClientReply);
}
@@ -135,12 +182,7 @@ public class SCMRatisServerImpl implements SCMRatisServer {
@Override
public void stop() throws IOException {
- division.getRaftServer().close();
- }
-
- @Override
- public RaftServer.Division getDivision() {
- return division;
+ server.close();
}
@Override
@@ -159,13 +201,47 @@ public class SCMRatisServerImpl implements SCMRatisServer {
division.getMemberId(), null, division.getGroup().getPeers());
}
+ @Override
+ public boolean addSCM(AddSCMRequest request) throws IOException {
+ List<RaftPeer> newRaftPeerList =
+ new ArrayList<>(getDivision().getGroup().getPeers());
+ // add the SCM node to be added to the raft peer list
+
+ RaftPeer raftPeer = RaftPeer.newBuilder().setId(request.getScmId())
+ .setAddress(request.getRatisAddr()).build();
+ newRaftPeerList.add(raftPeer);
+
+ LOG.info("{}: Submitting SetConfiguration request to Ratis server with" +
+ " new SCM peers list: {}", scm.getScmId(),
+ newRaftPeerList);
+ SetConfigurationRequest configRequest =
+ new SetConfigurationRequest(clientId, division.getPeer().getId(),
+ division.getGroup().getGroupId(), nextCallId(), newRaftPeerList);
+
+ try {
+ RaftClientReply raftClientReply =
+ division.getRaftServer().setConfiguration(configRequest);
+ if (raftClientReply.isSuccess()) {
+ LOG.info("Successfully added new SCM: {}.", request.getScmId());
+ } else {
+ LOG.error("Failed to add new SCM: {}. Ratis reply: {}" +
+ request.getScmId(), raftClientReply);
+ throw new IOException(raftClientReply.getException());
+ }
+ return raftClientReply.isSuccess();
+ } catch (IOException e) {
+ LOG.error("Failed to update Ratis configuration and add new peer. " +
+ "Cannot add new SCM: {}.", scm.getScmId(), e);
+ throw e;
+ }
+ }
+
@VisibleForTesting
public static void validateRatisGroupExists(OzoneConfiguration conf,
String clusterId) throws IOException {
final SCMHAConfiguration haConf = conf.getObject(SCMHAConfiguration.class);
final RaftProperties properties = RatisUtil.newRaftProperties(haConf, conf);
- final RaftGroupId raftGroupId =
- RaftGroupId.valueOf(SCMHAGroupBuilder.buildRaftGroupId(clusterId));
+ final RaftGroupId raftGroupId = buildRaftGroupId(clusterId);
final AtomicBoolean found = new AtomicBoolean(false);
RaftServerConfigKeys.storageDir(properties).parallelStream().forEach(
(dir) -> Optional.ofNullable(dir.listFiles()).map(Arrays::stream)
@@ -199,150 +275,29 @@ public class SCMRatisServerImpl implements SCMRatisServer {
"Could not find any ratis group with id " + raftGroupId);
}
}
- /**
- * If the SCM group starts from {@link ScmConfigKeys#OZONE_SCM_NAMES},
- * its raft peers should locate on different nodes, and use the same port
- * to communicate with each other.
- *
- * Each of the raft peer figures out its {@link RaftPeerId} by computing
- * its position in {@link ScmConfigKeys#OZONE_SCM_NAMES}.
- *
- * Assume {@link ScmConfigKeys#OZONE_SCM_NAMES} is "ip0,ip1,ip2",
- * scm with ip0 identifies its {@link RaftPeerId} as scm0,
- * scm with ip1 identifies its {@link RaftPeerId} as scm1,
- * scm with ip2 identifies its {@link RaftPeerId} as scm2.
- *
- * After startup, they will form a {@link RaftGroup} with groupID
- * "SCM-HA-Service", and communicate with each other via
- * ozone.scm.ha.ratis.bind.port.
- */
- private static class SCMHAGroupBuilder {
- private static final String SCM_SERVICE_ID = "SCM-HA-Service";
- private final RaftGroupId raftGroupId;
- private final RaftGroup raftGroup;
- private RaftPeerId selfPeerId;
-
- /**
- * @return raft group
- */
- public RaftGroup getRaftGroup() {
- return raftGroup;
- }
-
- /**
- * @return raft group id
- */
- public RaftGroupId getRaftGroupId() {
- return raftGroupId;
- }
-
- /**
- * @return raft peer id
- */
- public RaftPeerId getPeerId() {
- return selfPeerId;
- }
-
- /**
- * This will be used when the SCM HA config are not defined.
- * This will be removed once the HA configs are will defined.
- */
- SCMHAGroupBuilder(final SCMHAConfiguration haConf,
- final ConfigurationSource conf, String clusterId) throws IOException {
- // fetch port
- int port = haConf.getRatisBindAddress().getPort();
-
- // fetch localhost
- InetAddress localHost = InetAddress.getLocalHost();
-
- // fetch hosts from ozone.scm.names
- List<String> hosts = parseHosts(conf);
-
- final List<RaftPeer> raftPeers = new ArrayList<>();
- for (int i = 0; i < hosts.size(); ++i) {
- String nodeId = "scm" + i;
- RaftPeerId peerId = RaftPeerId.getRaftPeerId(nodeId);
-
- String host = hosts.get(i);
- if (InetAddress.getByName(host).equals(localHost)) {
- selfPeerId = peerId;
- raftPeers.add(RaftPeer.newBuilder()
- .setId(peerId)
- .setAddress(host + ":" + port)
- .build());
- break;
- }
- }
-
- if (selfPeerId == null) {
- String errorMessage = "localhost " + localHost
- + " does not exist in ozone.scm.names "
- + conf.get(ScmConfigKeys.OZONE_SCM_NAMES);
- throw new IOException(errorMessage);
- }
-
- LOG.info("Build a RaftGroup for SCMHA, " +
- "localHost: {}, OZONE_SCM_NAMES: {}, selfPeerId: {}",
- localHost, conf.get(ScmConfigKeys.OZONE_SCM_NAMES), selfPeerId);
-
- String groupId = clusterId == null ? SCM_SERVICE_ID : clusterId;
- raftGroupId = RaftGroupId.valueOf(
- UUID.nameUUIDFromBytes(groupId.getBytes(StandardCharsets.UTF_8)));
-
- raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
- }
-
- SCMHAGroupBuilder(final SCMHANodeDetails details, final String clusterId,
- final String scmId) {
-
- Preconditions.checkNotNull(clusterId);
- // RaftGroupId is the clusterId
- raftGroupId = RaftGroupId.valueOf(buildRaftGroupId(clusterId));
- Preconditions.checkNotNull(scmId);
- selfPeerId = RaftPeerId.getRaftPeerId(scmId);
- SCMNodeDetails localNodeDetails = details.getLocalNodeDetails();
-
- InetSocketAddress ratisAddr =
- new InetSocketAddress(localNodeDetails.getInetAddress(),
- localNodeDetails.getRatisPort());
-
- RaftPeer localRaftPeer =
- RaftPeer.newBuilder().setId(selfPeerId).setAddress(ratisAddr).build();
-
- List<RaftPeer> raftPeers = new ArrayList<>();
- // Add this Ratis server to the Ratis ring
- raftPeers.add(localRaftPeer);
- // always start as a standalone server
- raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
- }
-
- private static UUID buildRaftGroupId(String id) {
- return UUID.nameUUIDFromBytes(id.getBytes(StandardCharsets.UTF_8));
- }
-
- private List<String> parseHosts(final ConfigurationSource conf)
- throws UnknownHostException {
- // fetch hosts from ozone.scm.names
- List<String> hosts =
- Arrays.stream(conf.getTrimmedStrings(ScmConfigKeys.OZONE_SCM_NAMES))
- .map(scmName -> HddsUtils.getHostName(scmName).get())
- .collect(Collectors.toList());
-
- // if this is not a conf for a multi-server raft cluster,
- // it means we are in integration test, and need to augment
- // the conf to help build a single-server raft cluster.
- if (hosts.size() == 0) {
- // ozone.scm.names is not set
- hosts.add(InetAddress.getLocalHost().getHostName());
- } else if (hosts.size() == 1) {
- // ozone.scm.names is set, yet the conf may not be usable.
- hosts.set(0, InetAddress.getLocalHost().getHostName());
- }
+ private static RaftGroup buildRaftGroup(SCMNodeDetails details,
+ String scmId, String clusterId) {
+ Preconditions.checkNotNull(scmId);
+ final RaftGroupId groupId = buildRaftGroupId(clusterId);
+ RaftPeerId selfPeerId = RaftPeerId.getRaftPeerId(scmId);
+
+ RaftPeer localRaftPeer = RaftPeer.newBuilder().setId(selfPeerId)
+ // TODO : Should we use IP instead of hostname??
+ .setAddress(details.getRatisHostPortStr()).build();
+
+ List<RaftPeer> raftPeers = new ArrayList<>();
+ // Add this Ratis server to the Ratis ring
+ raftPeers.add(localRaftPeer);
+ final RaftGroup group =
+ RaftGroup.valueOf(groupId, raftPeers);
+ return group;
+ }
- LOG.info("fetch hosts {} from ozone.scm.names {}.",
- hosts, conf.get(ScmConfigKeys.OZONE_SCM_NAMES));
- return hosts;
- }
+ private static RaftGroupId buildRaftGroupId(String clusterId) {
+ Preconditions.checkNotNull(clusterId);
+ return RaftGroupId.valueOf(
+ UUID.fromString(clusterId.replace(OzoneConsts.CLUSTER_ID_PREFIX, "")));
}
+
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index d1a7ec5..921a17e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -20,11 +20,12 @@ package org.apache.hadoop.hdds.scm.ha;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -32,17 +33,19 @@ import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.util.Time;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +62,9 @@ public class SCMStateMachine extends BaseStateMachine {
private SCMRatisServer ratisServer;
private Map<RequestType, Object> handlers;
private DBTransactionBuffer transactionBuffer;
+ private final SimpleStateMachineStorage storage =
+ new SimpleStateMachineStorage();
+ private final AtomicBoolean isInitialized;
public SCMStateMachine(final StorageContainerManager scm,
final SCMRatisServer ratisServer, DBTransactionBuffer buffer)
@@ -79,9 +85,11 @@ public class SCMStateMachine extends BaseStateMachine {
), SCM_NOT_INITIALIZED);
}
}
+ isInitialized = new AtomicBoolean(true);
}
- public SCMStateMachine() {
+ public SCMStateMachine(boolean init) {
+ isInitialized = new AtomicBoolean(init);
}
public void registerHandler(RequestType type, Object handler) {
handlers.put(type, handler);
@@ -143,6 +151,9 @@ public class SCMStateMachine extends BaseStateMachine {
public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
LOG.info("current leader SCM steps down.");
+ if (!isInitialized.get()) {
+ return;
+ }
scm.getScmContext().updateLeaderAndTerm(false, 0);
scm.getSCMServiceManager().notifyStatusChanged();
}
@@ -155,6 +166,10 @@ public class SCMStateMachine extends BaseStateMachine {
return;
}
+ if (!isInitialized.get()) {
+ return;
+ }
+
long term = scm.getScmHAManager()
.getRatisServer()
.getDivision()
@@ -179,25 +194,47 @@ public class SCMStateMachine extends BaseStateMachine {
long startTime = Time.monotonicNow();
TermIndex lastTermIndex = getLastAppliedTermIndex();
long lastAppliedIndex = lastTermIndex.getIndex();
- TransactionInfo lastAppliedTrxInfo =
- TransactionInfo.fromTermIndex(lastTermIndex);
- if (transactionBuffer.getLatestTrxInfo()
- .compareTo(lastAppliedTrxInfo) < 0) {
+ if (isInitialized.get()) {
+ TransactionInfo lastAppliedTrxInfo =
+ TransactionInfo.fromTermIndex(lastTermIndex);
+ if (transactionBuffer.getLatestTrxInfo().compareTo(lastAppliedTrxInfo)
+ < 0) {
+ transactionBuffer.updateLatestTrxInfo(
+ TransactionInfo.builder().setCurrentTerm(lastTermIndex.getTerm())
+ .setTransactionIndex(lastTermIndex.getIndex()).build());
+ transactionBuffer.setLatestSnapshot(
+ transactionBuffer.getLatestTrxInfo().toSnapshotInfo());
+ } else {
+ lastAppliedIndex =
+ transactionBuffer.getLatestTrxInfo().getTransactionIndex();
+ }
+
+ transactionBuffer.flush();
+ LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
+ lastAppliedIndex, Time.monotonicNow() - startTime);
+ }
+ super.takeSnapshot();
+ return lastAppliedIndex;
+ }
+
+ @Override
+ public void notifyTermIndexUpdated(long term, long index) {
+ if (transactionBuffer != null) {
transactionBuffer.updateLatestTrxInfo(
- TransactionInfo.builder()
- .setCurrentTerm(lastTermIndex.getTerm())
- .setTransactionIndex(lastTermIndex.getIndex())
- .build());
- transactionBuffer.setLatestSnapshot(
- transactionBuffer.getLatestTrxInfo().toSnapshotInfo());
- } else {
- lastAppliedIndex =
- transactionBuffer.getLatestTrxInfo().getTransactionIndex();
+ TransactionInfo.builder().setCurrentTerm(term)
+ .setTransactionIndex(index).build());
}
+ // We need to call updateLastApplied here because now in ratis when a
+ // node becomes leader, it is checking stateMachineIndex >=
+ // placeHolderIndex (when a node becomes leader, it writes a conf entry
+ // with some information like its peers and termIndex). So, calling
+ // updateLastApplied updates lastAppliedTermIndex.
+ updateLastAppliedTermIndex(term, index);
+ }
- transactionBuffer.flush();
- LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
- lastAppliedIndex, Time.monotonicNow() - startTime);
- return lastAppliedIndex;
+
+ @Override
+ public void notifyConfigurationChanged(long term, long index,
+ RaftProtos.RaftConfigurationProto newRaftConfiguration) {
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index caec525..3385774 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMB
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Status;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -135,6 +136,10 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
response.setGetScmInfoResponse(
getScmInfo(request.getGetScmInfoRequest()));
break;
+ case AddScm:
+ response.setAddScmResponse(
+ getAddSCMResponse(request.getAddScmRequestProto()));
+ break;
case SortDatanodes:
response.setSortDatanodesResponse(sortDatanodes(
request.getSortDatanodesRequest(), request.getVersion()
@@ -221,6 +226,15 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
.build();
}
+ public HddsProtos.AddScmResponseProto getAddSCMResponse(
+ HddsProtos.AddScmRequestProto req)
+ throws IOException {
+ boolean status = impl.addSCM(AddSCMRequest.getFromProtobuf(req));
+ return HddsProtos.AddScmResponseProto.newBuilder()
+ .setSuccess(status)
+ .build();
+ }
+
public SortDatanodesResponseProto sortDatanodes(
SortDatanodesRequestProto request, int clientVersion)
throws ServiceException {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index d8014f6..3a1181e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
@@ -292,6 +293,33 @@ public class SCMBlockProtocolServer implements
}
@Override
+ public boolean addSCM(AddSCMRequest request) throws IOException {
+ LOG.debug("Adding SCM {} addr {} cluster id {}",
+ request.getScmId(), request.getRatisAddr(), request.getClusterId());
+
+ Map<String, String> auditMap = Maps.newHashMap();
+ auditMap.put("scmId", String.valueOf(request.getScmId()));
+ auditMap.put("cluster", String.valueOf(request.getClusterId()));
+ auditMap.put("addr", String.valueOf(request.getRatisAddr()));
+ boolean auditSuccess = true;
+ try{
+ return scm.getScmHAManager().addSCM(request);
+ } catch (Exception ex) {
+ auditSuccess = false;
+ AUDIT.logReadFailure(
+ buildAuditMessageForFailure(SCMAction.ADD_SCM, auditMap, ex)
+ );
+ throw ex;
+ } finally {
+ if(auditSuccess) {
+ AUDIT.logReadSuccess(
+ buildAuditMessageForSuccess(SCMAction.ADD_SCM, auditMap)
+ );
+ }
+ }
+ }
+
+ @Override
public List<DatanodeDetails> sortDatanodes(List<String> nodes,
String clientMachine) throws IOException {
boolean auditSuccess = true;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
index 91b9cd6..013dfe6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
@@ -25,11 +25,10 @@ import org.apache.hadoop.ozone.common.Storage;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
+import java.util.UUID;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
import static org.apache.hadoop.ozone.OzoneConsts.STORAGE_DIR;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_NODES_INFO;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_NODES_SEPARATOR;
/**
@@ -59,17 +58,6 @@ public class SCMStorageConfig extends Storage {
}
}
- public void setScmNodeInfo(String hostname) throws IOException {
- if (getState() == StorageState.INITIALIZED) {
- throw new IOException("SCM is already initialized.");
- } else {
- final StringBuilder b =
- new StringBuilder(hostname).append(SCM_NODES_SEPARATOR)
- .append(getScmId());
- getStorageInfo().setProperty(SCM_NODES_INFO, b.toString());
- }
- }
-
/**
* Retrieves the SCM ID from the version file.
* @return SCM_ID
@@ -78,31 +66,14 @@ public class SCMStorageConfig extends Storage {
return getStorageInfo().getProperty(SCM_ID);
}
- /**
- * Retrieves the SCM Node info from the version file.
- * @return SCM_NODES
- */
- public String getScmNodeInfo() {
- return getStorageInfo().getProperty(SCM_NODES_INFO);
- }
-
@Override
protected Properties getNodeProperties() {
String scmId = getScmId();
if (scmId == null) {
- // TODO:
- // Please check https://issues.apache.org/jira/browse/HDDS-4538
- // hard code clusterID and scmUuid on HDDS-2823,
- // so that multi SCMs won't cause chaos in Datanode side.
- scmId = "3a11fedb-cce5-46ac-bb0d-cfdf17df9a19";
+ scmId = UUID.randomUUID().toString();
}
Properties scmProperties = new Properties();
scmProperties.setProperty(SCM_ID, scmId);
- String scmNodeInfo = getScmNodeInfo();
- if (scmNodeInfo != null) {
- // FOR NON-HA setup, SCM_NODES can be null
- scmProperties.setProperty(SCM_NODES_INFO, getScmNodeInfo());
- }
return scmProperties;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 24bb893..4a34445 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -38,6 +38,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -131,7 +132,6 @@ import org.apache.hadoop.util.JvmPauseMonitor;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.server.RaftServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -689,9 +689,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
}
/**
- * Routine to bootstrap the StorageContainerManager. Thsi will connect to a
+ * Routine to bootstrap the StorageContainerManager. This will connect to a
* running SCM instance which has valid cluster id and fetch the cluster id
- * from there. SCM ids will be also be exchanged here.
+ * from there.
*
* TODO: once SCM HA security is enabled, CSR cerificates will be fetched from
* running scm leader instance as well.
@@ -708,14 +708,15 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
}
// The node here will try to fetch the cluster id from any of existing
// running SCM instances.
- // TODO: need to avoid failover to local SCM Node here
- final ScmInfo scmInfo = HAUtils.getScmInfo(conf);
+ SCMHANodeDetails.loadSCMHAConfig(conf);
+ OzoneConfiguration config = SCMHAUtils.removeSelfId(conf);
+ final ScmInfo scmInfo = HAUtils.getScmInfo(config);
SCMStorageConfig scmStorageConfig = new SCMStorageConfig(conf);
final String persistedClusterId = scmStorageConfig.getClusterID();
final String fetchedId = scmInfo.getClusterId();
Preconditions.checkNotNull(fetchedId);
StorageState state = scmStorageConfig.getState();
- if (state != StorageState.INITIALIZED) {
+ if (state == StorageState.INITIALIZED) {
Preconditions.checkNotNull(scmStorageConfig.getScmId());
if (!fetchedId.equals(persistedClusterId)) {
LOG.error(
@@ -757,22 +758,22 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
if (state != StorageState.INITIALIZED) {
try {
if (clusterId != null && !clusterId.isEmpty()) {
+ // clusterId must be an UUID
+ Preconditions.checkNotNull(UUID.fromString(clusterId));
scmStorageConfig.setClusterId(clusterId);
}
- // TODO: set the scm node info in the version file during upgrade
- // from non-HA SCM to SCM HA set up.
- if (SCMHAUtils.isSCMHAEnabled(conf)) {
- RaftServer server = SCMRatisServerImpl
- .newRaftServer(clusterId, scmStorageConfig.getScmId(), haDetails,
- conf).build();
- // ensure the ratis group exists
- server.start();
- server.close();
- // TODO: Revisit if we need to set the Node info in SCM version file
- scmStorageConfig
- .setScmNodeInfo(haDetails.getLocalNodeDetails().getHostName());
- }
scmStorageConfig.initialize();
+ // TODO: Removing the HA enabled check right now as
+ // when the SCM starts up , it always spins up the ratis
+ // server irrespective of the check. If the ratis server is not
+ // initialized here and starts up during the regular start,
+ // it won't be starting a leader election and hence won't work. The
+ // check will be re-introduced one we have clear segregation path with
+ // ratis enable/disable switch.
+ // if (SCMHAUtils.isSCMHAEnabled(conf)) {
+ SCMRatisServerImpl.initialize(scmStorageConfig.getClusterID(),
+ scmStorageConfig.getScmId(), haDetails.getLocalNodeDetails(), conf);
+ // }
LOG.info("SCM initialization succeeded. Current cluster id for sd={}"
+ "; cid={}; layoutVersion={}; scmId={}",
scmStorageConfig.getStorageDir(), scmStorageConfig.getClusterID(),
@@ -789,7 +790,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
clusterId, scmStorageConfig.getLayoutVersion());
if (SCMHAUtils.isSCMHAEnabled(conf)) {
SCMRatisServerImpl.validateRatisGroupExists(conf, clusterId);
- Preconditions.checkNotNull(scmStorageConfig.getScmNodeInfo());
}
return true;
}
@@ -1169,11 +1169,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
}
/**
- * Check if the current scm is the leader.
- * @return - if the current scm is the leader.
+ * Check if the current scm is the leader and ready for accepting requests.
+ * @return - if the current scm is the leader and is ready.
*/
public boolean checkLeader() {
- return scmContext.isLeader();
+ return scmContext.isLeader() && getScmHAManager().getRatisServer()
+ .getDivision().getInfo().isLeaderReady();
}
public void checkAdminAccess(String remoteUser) throws IOException {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestHddsServerUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestHddsServerUtils.java
index 56d265a..b9efe10 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestHddsServerUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestHddsServerUtils.java
@@ -150,19 +150,6 @@ public class TestHddsServerUtils {
}
/**
- * getScmAddressForDataNodes should fail when OZONE_SCM_NAMES has
- * multiple addresses.
- */
- @Test
- public void testClientFailsWithMultipleScmNames() {
- final String scmHost = "host123,host456";
- final OzoneConfiguration conf = new OzoneConfiguration();
- conf.set(OZONE_SCM_NAMES, scmHost);
- thrown.expect(IllegalArgumentException.class);
- HddsServerUtil.getScmAddressForDataNodes(conf);
- }
-
- /**
* Test {@link ServerUtils#getScmDbDir}.
*/
@Test
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
index d82faa2..a7eb637 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.ha;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.container.ContainerStateManagerV2;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;
@@ -73,6 +74,12 @@ public class TestReplicationAnnotation {
public NotLeaderException triggerNotLeaderException() {
return null;
}
+
+ @Override
+ public boolean addSCM(AddSCMRequest request)
+ throws IOException {
+ return false;
+ }
};
scmhaInvocationHandler = new SCMHAInvocationHandler(
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestHddsClientUtils.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestHddsClientUtils.java
index 6e1e336..fe4df7d 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestHddsClientUtils.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestHddsClientUtils.java
@@ -202,28 +202,6 @@ public class TestHddsClientUtils {
}
@Test
- public void testBlockClientFailsWithMultipleScmNames() {
- // When OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY and OZONE_SCM_CLIENT_ADDRESS_KEY
- // are undefined, fail if OZONE_SCM_NAMES has multiple SCMs.
- final String scmHost = "host123,host456";
- final OzoneConfiguration conf = new OzoneConfiguration();
- conf.set(OZONE_SCM_NAMES, scmHost);
- thrown.expect(IllegalArgumentException.class);
- HddsUtils.getScmAddressForBlockClients(conf);
- }
-
- @Test
- public void testClientFailsWithMultipleScmNames() {
- // When OZONE_SCM_CLIENT_ADDRESS_KEY is undefined, fail if OZONE_SCM_NAMES
- // has multiple SCMs.
- final String scmHost = "host123,host456";
- final OzoneConfiguration conf = new OzoneConfiguration();
- conf.set(OZONE_SCM_NAMES, scmHost);
- thrown.expect(IllegalArgumentException.class);
- HddsUtils.getScmAddressForClients(conf);
- }
-
- @Test
public void testVerifyResourceName() {
final String validName = "my-bucket.01";
HddsClientUtils.verifyResourceName(validName);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/ha/NodeDetails.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/ha/NodeDetails.java
index 76beaa6..c73abb9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/ha/NodeDetails.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/ha/NodeDetails.java
@@ -76,6 +76,15 @@ public class NodeDetails {
return hostPort.toString();
}
+ public String getRatisAddressPortStr() {
+ StringBuilder hostPort = new StringBuilder();
+ hostPort.append(getInetAddress().getHostAddress())
+ .append(":")
+ .append(ratisPort);
+ return hostPort.toString();
+ }
+
+
public int getRatisPort() {
return ratisPort;
}
diff --git a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
index 751777c..45680d3 100644
--- a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
+++ b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
@@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory;
/**
* This class causes random failures in the chaos cluster.
*/
-public class MiniOzoneChaosCluster extends MiniOzoneOMHAClusterImpl {
+public class MiniOzoneChaosCluster extends MiniOzoneHAClusterImpl {
static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneChaosCluster.class);
@@ -91,10 +91,10 @@ public class MiniOzoneChaosCluster extends MiniOzoneOMHAClusterImpl {
}
public MiniOzoneChaosCluster(OzoneConfiguration conf,
- List<OzoneManager> ozoneManagers, StorageContainerManager scm,
+ List<OzoneManager> ozoneManagers, List<StorageContainerManager> scms,
List<HddsDatanodeService> hddsDatanodes, String omServiceID,
- Set<Class<? extends Failures>> clazzes) {
- super(conf, ozoneManagers, scm, hddsDatanodes, omServiceID);
+ String scmServiceId, Set<Class<? extends Failures>> clazzes) {
+ super(conf, ozoneManagers, scms, hddsDatanodes, omServiceID, scmServiceId);
this.numDatanodes = getHddsDatanodes().size();
this.numOzoneManagers = ozoneManagers.size();
@@ -267,11 +267,17 @@ public class MiniOzoneChaosCluster extends MiniOzoneOMHAClusterImpl {
initOMRatisConf();
}
- StorageContainerManager scm;
List<OzoneManager> omList;
+ List<StorageContainerManager> scmList;
try {
- scm = createSCM();
- scm.start();
+ if (numOfSCMs > 1) {
+ throw new IOException("no implemented now");
+ // TODO: do later
+ } else {
+ StorageContainerManager scm = createSCM();
+ scm.start();
+ scmList = Arrays.asList(scm);
+ }
if (numOfOMs > 1) {
omList = createOMService();
} else {
@@ -284,11 +290,11 @@ public class MiniOzoneChaosCluster extends MiniOzoneOMHAClusterImpl {
}
final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(
- scm, null);
+ scmList, null);
MiniOzoneChaosCluster cluster =
- new MiniOzoneChaosCluster(conf, omList, scm, hddsDatanodes,
- omServiceId, clazzes);
+ new MiniOzoneChaosCluster(conf, omList, scmList, hddsDatanodes,
+ omServiceId, scmServiceId, clazzes);
if (startDataNodes) {
cluster.startHddsDatanodes();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
index ac8b00d..463d4b3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
@@ -32,6 +32,8 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.UUID;
+
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
@@ -44,6 +46,7 @@ public class TestSCMSnapshot {
@BeforeClass
public static void setup() throws Exception {
conf = new OzoneConfiguration();
+ conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
SCMHAConfiguration scmhaConfiguration = conf.getObject(
SCMHAConfiguration.class);
scmhaConfiguration.setRatisSnapshotThreshold(1L);
@@ -51,6 +54,7 @@ public class TestSCMSnapshot {
cluster = MiniOzoneCluster
.newBuilder(conf)
.setNumDatanodes(3)
+ .setScmId(UUID.randomUUID().toString())
.build();
cluster.waitForClusterToBeReady();
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index c3d9815..153b312 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -64,6 +64,10 @@ public interface MiniOzoneCluster {
return new MiniOzoneOMHAClusterImpl.Builder(conf);
}
+ static Builder newHABuilder(OzoneConfiguration conf) {
+ return new MiniOzoneHAClusterImpl.Builder(conf);
+ }
+
/**
* Returns the configuration object associated with the MiniOzoneCluster.
*
@@ -115,7 +119,15 @@ public interface MiniOzoneCluster {
*
* @return Service ID String
*/
- String getServiceId();
+ String getOMServiceId();
+
+
+ /**
+ * Returns StorageContainerManager Service ID.
+ *
+ * @return Service ID String
+ */
+ String getSCMServiceId();
/**
* Returns {@link StorageContainerManager} associated with this
@@ -274,6 +286,7 @@ public interface MiniOzoneCluster {
protected static final int DEFAULT_HB_INTERVAL_MS = 1000;
protected static final int DEFAULT_HB_PROCESSOR_INTERVAL_MS = 100;
protected static final int ACTIVE_OMS_NOT_SET = -1;
+ protected static final int ACTIVE_SCMS_NOT_SET = -1;
protected static final int DEFAULT_PIPELIME_LIMIT = 3;
protected static final int DEFAULT_RATIS_RPC_TIMEOUT_SEC = 1;
@@ -285,6 +298,10 @@ public interface MiniOzoneCluster {
protected int numOfOMs;
protected int numOfActiveOMs = ACTIVE_OMS_NOT_SET;
+ protected String scmServiceId;
+ protected int numOfSCMs;
+ protected int numOfActiveSCMs = ACTIVE_SCMS_NOT_SET;
+
protected Optional<Boolean> enableTrace = Optional.of(false);
protected Optional<Integer> hbInterval = Optional.empty();
protected Optional<Integer> hbProcessorInterval = Optional.empty();
@@ -524,6 +541,21 @@ public interface MiniOzoneCluster {
return this;
}
+ public Builder setNumOfStorageContainerManagers(int numSCMs) {
+ this.numOfSCMs = numSCMs;
+ return this;
+ }
+
+ public Builder setNumOfActiveSCMs(int numActiveSCMs) {
+ this.numOfActiveSCMs = numActiveSCMs;
+ return this;
+ }
+
+ public Builder setSCMServiceId(String serviceId) {
+ this.scmServiceId = serviceId;
+ return this;
+ }
+
/**
* Constructs and returns MiniOzoneCluster.
*
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 757c6b8..c2ecd34 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.UUID;
+import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -43,6 +44,9 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails;
+import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
@@ -153,26 +157,58 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
this.reconServer = reconServer;
}
+ /**
+ * Creates a new MiniOzoneCluster without the OzoneManager and
+ * StorageContainerManager. This is used by
+ * {@link MiniOzoneHAClusterImpl} for starting multiple
+ * OzoneManagers and StorageContainerManagers.
+ *
+ * @param conf
+ * @param hddsDatanodes
+ */
+ MiniOzoneClusterImpl(OzoneConfiguration conf,
+ List<HddsDatanodeService> hddsDatanodes, ReconServer reconServer) {
+ this.conf = conf;
+ this.hddsDatanodes = hddsDatanodes;
+ this.reconServer = reconServer;
+ }
+
public OzoneConfiguration getConf() {
return conf;
}
- public String getServiceId() {
+ public String getOMServiceId() {
+ // Non-HA cluster doesn't have OM Service Id.
+ return null;
+ }
+
+ public String getSCMServiceId() {
// Non-HA cluster doesn't have OM Service Id.
return null;
}
+ public void waitForSCMToBeReady() throws TimeoutException,
+ InterruptedException {
+ // Nothing implemented here
+ }
+
+ public StorageContainerManager getActiveSCM() {
+ return scm;
+ }
+
/**
* Waits for the Ozone cluster to be ready for processing requests.
*/
@Override
public void waitForClusterToBeReady()
throws TimeoutException, InterruptedException {
+ waitForSCMToBeReady();
GenericTestUtils.waitFor(() -> {
- final int healthy = scm.getNodeCount(HEALTHY);
+ StorageContainerManager activeScm = getActiveSCM();
+ final int healthy = activeScm.getNodeCount(HEALTHY);
final boolean isNodeReady = healthy == hddsDatanodes.size();
- final boolean exitSafeMode = !scm.isInSafeMode();
- final boolean checkScmLeader = scm.checkLeader();
+ final boolean exitSafeMode = !activeScm.isInSafeMode();
+ final boolean checkScmLeader = activeScm.checkLeader();
LOG.info("{}. Got {} of {} DN Heartbeats.",
isNodeReady ? "Nodes are ready" : "Waiting for nodes to be ready",
@@ -383,13 +419,17 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
shutdownHddsDatanode(getHddsDatanodeIndex(dn));
}
+ public String getClusterId() throws IOException {
+ return scm.getClientProtocolServer().getScmInfo().getClusterId();
+ }
+
@Override
public void shutdown() {
try {
LOG.info("Shutting down the Mini Ozone Cluster");
File baseDir = new File(GenericTestUtils.getTempPath(
MiniOzoneClusterImpl.class.getSimpleName() + "-" +
- scm.getClientProtocolServer().getScmInfo().getClusterId()));
+ getClusterId()));
stop();
FileUtils.deleteDirectory(baseDir);
ContainerCache.getInstance(conf).shutdownCache();
@@ -538,7 +578,8 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
reconServer.execute(new String[] {});
}
- hddsDatanodes = createHddsDatanodes(scm, reconServer);
+ hddsDatanodes = createHddsDatanodes(
+ Collections.singletonList(scm), reconServer);
MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, om, scm,
hddsDatanodes, reconServer);
@@ -578,6 +619,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
Path metaDir = Paths.get(path, "ozone-meta");
Files.createDirectories(metaDir);
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDir.toString());
+ conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
if (!chunkSize.isPresent()) {
//set it to 1MB by default in tests
chunkSize = Optional.of(1);
@@ -649,7 +691,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
return scm;
}
- private void initializeScmStorage(SCMStorageConfig scmStore)
+ protected void initializeScmStorage(SCMStorageConfig scmStore)
throws IOException {
if (scmStore.getState() == StorageState.INITIALIZED) {
return;
@@ -659,10 +701,11 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
scmId = Optional.of(UUID.randomUUID().toString());
}
scmStore.setScmId(scmId.get());
- // TODO : Ratis is enabled by default for mini ozone cluster. Set scm node
- // to local host in the beginning. Fix it MiniozoneSCM HA cluster
- scmStore.setScmNodeInfo("localhost");
scmStore.initialize();
+ if (SCMHAUtils.isSCMHAEnabled(conf)) {
+ SCMRatisServerImpl.initialize(clusterId, scmId.get(),
+ SCMHANodeDetails.loadSCMHAConfig(conf).getLocalNodeDetails(), conf);
+ }
}
void initializeOmStorage(OMStorage omStorage) throws IOException {
@@ -670,7 +713,9 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
return;
}
omStorage.setClusterId(clusterId);
- omStorage.setScmId(scmId.get());
+ if (scmId.isPresent()) {
+ omStorage.setScmId(scmId.get());
+ }
omStorage.setOmId(omId.orElse(UUID.randomUUID().toString()));
// Initialize ozone certificate client if security is enabled.
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
@@ -693,6 +738,21 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
return OzoneManager.createOm(conf);
}
+ protected String getSCMAddresses(List<StorageContainerManager> scms) {
+ StringBuilder stringBuilder = new StringBuilder();
+ Iterator<StorageContainerManager> iter = scms.iterator();
+
+ while (iter.hasNext()) {
+ StorageContainerManager scm = iter.next();
+ stringBuilder.append(scm.getDatanodeRpcAddress().getHostString() +
+ ":" + scm.getDatanodeRpcAddress().getPort());
+ if (iter.hasNext()) {
+ stringBuilder.append(",");
+ }
+
+ }
+ return stringBuilder.toString();
+ }
/**
* Creates HddsDatanodeService(s) instance.
*
@@ -700,11 +760,10 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
* @throws IOException
*/
protected List<HddsDatanodeService> createHddsDatanodes(
- StorageContainerManager scm, ReconServer reconServer)
+ List<StorageContainerManager> scms, ReconServer reconServer)
throws IOException {
configureHddsDatanodes();
- String scmAddress = scm.getDatanodeRpcAddress().getHostString() +
- ":" + scm.getDatanodeRpcAddress().getPort();
+ String scmAddress = getSCMAddresses(scms);
String[] args = new String[] {};
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, scmAddress);
List<HddsDatanodeService> hddsDatanodes = new ArrayList<>();
@@ -747,7 +806,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
return hddsDatanodes;
}
- private void configureSCM() {
+ protected void configureSCM() {
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
new file mode 100644
index 0000000..5cf9d9b
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -0,0 +1,718 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.recon.ReconServer;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+
+/**
+ * MiniOzoneHAClusterImpl creates a complete in-process Ozone cluster
+ * with OM HA and SCM HA suitable for running tests.
+ * The cluster consists of a set of
+ * OzoneManagers, StorageContainerManagers and multiple DataNodes.
+ */
+public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class);
+
+ private final OMHAService omhaService;
+ private final SCMHAService scmhaService;
+
+ private int waitForClusterToBeReadyTimeout = 120000; // 2 min
+
+ private static final Random RANDOM = new Random();
+ private static final int RATIS_RPC_TIMEOUT = 1000; // 1 second
+ public static final int NODE_FAILURE_TIMEOUT = 2000; // 2 seconds
+
+ /**
+ * Creates a new MiniOzoneCluster.
+ *
+ * @throws IOException if there is an I/O error
+ */
+ @SuppressWarnings("checkstyle:ParameterNumber")
+ public MiniOzoneHAClusterImpl(
+ OzoneConfiguration conf,
+ List<OzoneManager> activeOMList,
+ List<OzoneManager> inactiveOMList,
+ List<StorageContainerManager> activeSCMList,
+ List<StorageContainerManager> inactiveSCMList,
+ List<HddsDatanodeService> hddsDatanodes,
+ String omServiceId,
+ String scmServiceId,
+ ReconServer reconServer) {
+ super(conf, hddsDatanodes, reconServer);
+ omhaService =
+ new OMHAService(activeOMList, inactiveOMList, omServiceId);
+ scmhaService =
+ new SCMHAService(activeSCMList, inactiveSCMList, scmServiceId);
+ }
+
+ /**
+ * Creates a new MiniOzoneCluster with all OMs active.
+ * This is used by MiniOzoneChaosCluster.
+ */
+ protected MiniOzoneHAClusterImpl(
+ OzoneConfiguration conf,
+ List<OzoneManager> omList,
+ List<StorageContainerManager> scmList,
+ List<HddsDatanodeService> hddsDatanodes,
+ String omServiceId,
+ String scmServiceId) {
+ this(conf, omList, null, scmList, null, hddsDatanodes,
+ omServiceId, scmServiceId, null);
+ }
+
+ @Override
+ public String getOMServiceId() {
+ return omhaService.getServiceId();
+ }
+
+ @Override
+ public String getSCMServiceId() {
+ return scmhaService.getServiceId();
+ }
+
+ /**
+ * Returns the first OzoneManager from the list.
+ * @return
+ */
+ @Override
+ public OzoneManager getOzoneManager() {
+ return this.omhaService.getServices().get(0);
+ }
+
+ @Override
+ public OzoneClient getRpcClient() throws IOException {
+ String omServiceId = omhaService.getServiceId();
+ if (omServiceId == null) {
+ // Non-HA cluster.
+ return OzoneClientFactory.getRpcClient(getConf());
+ } else {
+ // HA cluster
+ return OzoneClientFactory.getRpcClient(omServiceId, getConf());
+ }
+ }
+
+ public boolean isOMActive(String omNodeId) {
+ return omhaService.isServiceActive(omNodeId);
+ }
+
+ public OzoneManager getOzoneManager(int index) {
+ return this.omhaService.getServiceByIndex(index);
+ }
+
+ public OzoneManager getOzoneManager(String omNodeId) {
+ return this.omhaService.getServiceById(omNodeId);
+ }
+
+ public List<OzoneManager> getOzoneManagersList() {
+ return omhaService.getServices();
+ }
+
+ /**
+ * Get OzoneManager leader object.
+ * @return OzoneManager object, null if there isn't one or more than one
+ */
+ public OzoneManager getOMLeader() {
+ OzoneManager res = null;
+ for (OzoneManager ozoneManager : this.omhaService.getServices()) {
+ if (ozoneManager.isLeaderReady()) {
+ if (res != null) {
+ // Found more than one leader
+ // Return null, expect the caller to retry in a while
+ return null;
+ }
+ // Found a leader
+ res = ozoneManager;
+ }
+ }
+ return res;
+ }
+
+ /**
+ * Start a previously inactive OM.
+ */
+ public void startInactiveOM(String omNodeID) throws IOException {
+ omhaService.startInactiveService(omNodeID, OzoneManager::start);
+ }
+
+ @Override
+ public void restartOzoneManager() throws IOException {
+ for (OzoneManager ozoneManager : this.omhaService.getServices()) {
+ ozoneManager.stop();
+ ozoneManager.restart();
+ }
+ }
+
+ public void shutdownOzoneManager(OzoneManager ozoneManager) {
+ LOG.info("Shutting down OzoneManager " + ozoneManager.getOMNodeId());
+
+ ozoneManager.stop();
+ }
+
+ public void restartOzoneManager(OzoneManager ozoneManager, boolean waitForOM)
+ throws IOException, TimeoutException, InterruptedException {
+ LOG.info("Restarting OzoneManager " + ozoneManager.getOMNodeId());
+ ozoneManager.restart();
+
+ if (waitForOM) {
+ GenericTestUtils.waitFor(ozoneManager::isRunning,
+ 1000, waitForClusterToBeReadyTimeout);
+ }
+ }
+
+ public String getClusterId() throws IOException {
+ return scmhaService.getServices().get(0)
+ .getClientProtocolServer().getScmInfo().getClusterId();
+ }
+
+ public StorageContainerManager getActiveSCM() {
+ for (StorageContainerManager scm : scmhaService.getServices()) {
+ if (scm.checkLeader()) {
+ return scm;
+ }
+ }
+ return null;
+ }
+
+ public void waitForSCMToBeReady()
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(() -> {
+ for (StorageContainerManager scm : scmhaService.getServices()) {
+ if (scm.checkLeader()) {
+ return true;
+ }
+ }
+ return false;
+ }, 1000, waitForClusterToBeReadyTimeout);
+ }
+
+ @Override
+ public void stop() {
+ for (OzoneManager ozoneManager : this.omhaService.getServices()) {
+ if (ozoneManager != null) {
+ LOG.info("Stopping the OzoneManager {}", ozoneManager.getOMNodeId());
+ ozoneManager.stop();
+ ozoneManager.join();
+ }
+ }
+
+ for (StorageContainerManager scm : this.scmhaService.getServices()) {
+ if (scm != null) {
+ LOG.info("Stopping the StorageContainerManager {}", scm.getScmId());
+ scm.stop();
+ scm.join();
+ }
+ }
+ super.stop();
+ }
+
+ public void stopOzoneManager(int index) {
+ omhaService.getServices().get(index).stop();
+ omhaService.getServices().get(index).join();
+ }
+
+ public void stopOzoneManager(String omNodeId) {
+ omhaService.getServiceById(omNodeId).stop();
+ omhaService.getServiceById(omNodeId).join();
+ }
+
+ /**
+ * Builder for configuring the MiniOzoneCluster to run.
+ */
+ public static class Builder extends MiniOzoneClusterImpl.Builder {
+
+ private static final String OM_NODE_ID_PREFIX = "omNode-";
+ private List<OzoneManager> activeOMs = new ArrayList<>();
+ private List<OzoneManager> inactiveOMs = new ArrayList<>();
+
+ private static final String SCM_NODE_ID_PREFIX = "scmNode-";
+ private List<StorageContainerManager> activeSCMs = new ArrayList<>();
+ private List<StorageContainerManager> inactiveSCMs = new ArrayList<>();
+
+ /**
+ * Creates a new Builder.
+ *
+ * @param conf configuration
+ */
+ public Builder(OzoneConfiguration conf) {
+ super(conf);
+ }
+
+ public List<OzoneManager> getActiveOMs() {
+ return activeOMs;
+ }
+
+ public List<OzoneManager> getInactiveOMs() {
+ return inactiveOMs;
+ }
+
+ @Override
+ public MiniOzoneCluster build() throws IOException {
+ if (numOfActiveOMs > numOfOMs) {
+ throw new IllegalArgumentException("Number of active OMs cannot be " +
+ "more than the total number of OMs");
+ }
+
+ // If num of ActiveOMs is not set, set it to numOfOMs.
+ if (numOfActiveOMs == ACTIVE_OMS_NOT_SET) {
+ numOfActiveOMs = numOfOMs;
+ }
+
+ // If num of ActiveOMs is not set, set it to numOfOMs.
+ if (numOfActiveSCMs == ACTIVE_SCMS_NOT_SET) {
+ numOfActiveSCMs = numOfSCMs;
+ }
+
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ initializeConfiguration();
+ initOMRatisConf();
+ StorageContainerManager scm;
+ ReconServer reconServer = null;
+ try {
+ createSCMService();
+ createOMService();
+ if (includeRecon) {
+ configureRecon();
+ reconServer = new ReconServer();
+ reconServer.execute(new String[] {});
+ }
+ } catch (AuthenticationException ex) {
+ throw new IOException("Unable to build MiniOzoneCluster. ", ex);
+ }
+
+ final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(
+ activeSCMs, reconServer);
+
+ MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf,
+ activeOMs, inactiveOMs, activeSCMs, inactiveSCMs,
+ hddsDatanodes, omServiceId, scmServiceId, reconServer);
+
+ if (startDataNodes) {
+ cluster.startHddsDatanodes();
+ }
+ return cluster;
+ }
+
+ protected void initOMRatisConf() {
+ conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
+ conf.setInt(OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY, numOfOmHandlers);
+
+ // If test change the following config values we will respect,
+ // otherwise we will set lower timeout values.
+ long defaultDuration = OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT
+ .getDuration();
+ long curRatisRpcTimeout = conf.getTimeDuration(
+ OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY,
+ defaultDuration, TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY,
+ defaultDuration == curRatisRpcTimeout ?
+ RATIS_RPC_TIMEOUT : curRatisRpcTimeout, TimeUnit.MILLISECONDS);
+
+ long defaultNodeFailureTimeout =
+ OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT.
+ getDuration();
+ long curNodeFailureTimeout = conf.getTimeDuration(
+ OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
+ defaultNodeFailureTimeout,
+ OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT.
+ getUnit());
+ conf.setTimeDuration(
+ OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
+ curNodeFailureTimeout == defaultNodeFailureTimeout ?
+ NODE_FAILURE_TIMEOUT : curNodeFailureTimeout,
+ TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Start OM service with multiple OMs.
+ */
+ protected List<OzoneManager> createOMService() throws IOException,
+ AuthenticationException {
+
+ List<OzoneManager> omList = Lists.newArrayList();
+
+ int retryCount = 0;
+ int basePort = 10000;
+
+ while (true) {
+ try {
+ basePort = 10000 + RANDOM.nextInt(1000) * 4;
+ initOMHAConfig(basePort);
+
+ for (int i = 1; i<= numOfOMs; i++) {
+ // Set nodeId
+ String nodeId = OM_NODE_ID_PREFIX + i;
+ OzoneConfiguration config = new OzoneConfiguration(conf);
+ config.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
+ // Set the OM http(s) address to null so that the cluster picks
+ // up the address set with service ID and node ID in initHAConfig
+ config.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, "");
+ config.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, "");
+
+ // Set metadata/DB dir base path
+ String metaDirPath = path + "/" + nodeId;
+ config.set(OZONE_METADATA_DIRS, metaDirPath);
+ // OMStorage omStore = new OMStorage(config);
+ // initializeOmStorage(omStore);
+ OzoneManager.omInit(config);
+ OzoneManager om = OzoneManager.createOm(config);
+ if (certClient != null) {
+ om.setCertClient(certClient);
+ }
+ omList.add(om);
+
+ if (i <= numOfActiveOMs) {
+ om.start();
+ activeOMs.add(om);
+ LOG.info("Started OzoneManager RPC server at {}",
+ om.getOmRpcServerAddr());
+ } else {
+ inactiveOMs.add(om);
+ LOG.info("Intialized OzoneManager at {}. This OM is currently "
+ + "inactive (not running).", om.getOmRpcServerAddr());
+ }
+ }
+
+ // Set default OM address to point to the first OM. Clients would
+ // try connecting to this address by default
+ conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
+ NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr()));
+
+ break;
+ } catch (BindException e) {
+ for (OzoneManager om : omList) {
+ om.stop();
+ om.join();
+ LOG.info("Stopping OzoneManager server at {}",
+ om.getOmRpcServerAddr());
+ }
+ omList.clear();
+ ++retryCount;
+ LOG.info("MiniOzoneHACluster port conflicts, retried {} times",
+ retryCount);
+ }
+ }
+ return omList;
+ }
+
+ /**
+ * Start OM service with multiple OMs.
+ */
+ protected List<StorageContainerManager> createSCMService()
+ throws IOException, AuthenticationException {
+ List<StorageContainerManager> scmList = Lists.newArrayList();
+
+ int retryCount = 0;
+ int basePort = 12000;
+
+ while (true) {
+ try {
+ basePort = 12000 + RANDOM.nextInt(1000) * 4;
+ initSCMHAConfig(basePort);
+
+ for (int i = 1; i<= numOfSCMs; i++) {
+ // Set nodeId
+ String nodeId = SCM_NODE_ID_PREFIX + i;
+ String metaDirPath = path + "/" + nodeId;
+ OzoneConfiguration scmConfig = new OzoneConfiguration(conf);
+ scmConfig.set(OZONE_METADATA_DIRS, metaDirPath);
+ scmConfig.set(ScmConfigKeys.OZONE_SCM_NODE_ID_KEY, nodeId);
+ scmConfig.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
+
+ configureSCM();
+ if (i == 1) {
+ StorageContainerManager.scmInit(scmConfig, clusterId);
+ } else {
+ StorageContainerManager.scmBootstrap(scmConfig);
+ }
+ StorageContainerManager scm = TestUtils.getScmSimple(scmConfig);
+ HealthyPipelineSafeModeRule rule =
+ scm.getScmSafeModeManager().getHealthyPipelineSafeModeRule();
+ if (rule != null) {
+ // Set threshold to wait for safe mode exit -
+ // this is needed since a pipeline is marked open only after
+ // leader election.
+ rule.setHealthyPipelineThresholdCount(numOfDatanodes / 3);
+ }
+ scmList.add(scm);
+
+ if (i <= numOfActiveSCMs) {
+ scm.start();
+ activeSCMs.add(scm);
+ LOG.info("Started SCM RPC server at {}",
+ scm.getClientProtocolServer());
+ } else {
+ inactiveSCMs.add(scm);
+ LOG.info("Intialized SCM at {}. This SCM is currently "
+ + "inactive (not running).", scm.getClientProtocolServer());
+ }
+ }
+
+
+ break;
+ } catch (BindException e) {
+ for (StorageContainerManager scm : scmList) {
+ scm.stop();
+ scm.join();
+ LOG.info("Stopping StorageContainerManager server at {}",
+ scm.getClientProtocolServer());
+ }
+ scmList.clear();
+ ++retryCount;
+ LOG.info("MiniOzoneHACluster port conflicts, retried {} times",
+ retryCount);
+ }
+ }
+ return scmList;
+ }
+
+ /**
+ * Initialize HA related configurations.
+ */
+ private void initSCMHAConfig(int basePort) throws IOException {
+ // Set configurations required for starting OM HA service, because that
+ // is the serviceID being passed to start Ozone HA cluster.
+ // Here setting internal service and OZONE_OM_SERVICE_IDS_KEY, in this
+ // way in OM start it uses internal service id to find it's service id.
+ conf.set(ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY, scmServiceId);
+ conf.set(ScmConfigKeys.OZONE_SCM_DEFAULT_SERVICE_ID, scmServiceId);
+ String scmNodesKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_NODES_KEY, scmServiceId);
+ StringBuilder scmNodesKeyValue = new StringBuilder();
+ StringBuilder scmNames = new StringBuilder();
+
+ int port = basePort;
+
+ for (int i = 1; i <= numOfSCMs; i++, port+=10) {
+ String scmNodeId = SCM_NODE_ID_PREFIX + i;
+ scmNodesKeyValue.append(",").append(scmNodeId);
+ String scmAddrKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_ADDRESS_KEY, scmServiceId, scmNodeId);
+ String scmHttpAddrKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY, scmServiceId, scmNodeId);
+ String scmHttpsAddrKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_HTTPS_ADDRESS_KEY, scmServiceId, scmNodeId);
+ String scmRatisPortKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY, scmServiceId, scmNodeId);
+ String dnPortKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY,
+ scmServiceId, scmNodeId);
+ String blockClientKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
+ scmServiceId, scmNodeId);
+ String ssClientKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY,
+ scmServiceId, scmNodeId);
+ String scmGrpcPortKey = ConfUtils.addKeySuffixes(
+ ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY, scmServiceId, scmNodeId);
+
+ conf.set(scmAddrKey, "127.0.0.1");
+ conf.set(scmHttpAddrKey, "127.0.0.1:" + (port + 2));
+ conf.set(scmHttpsAddrKey, "127.0.0.1:" + (port + 3));
+ conf.setInt(scmRatisPortKey, port + 4);
+ //conf.setInt("ozone.scm.ha.ratis.bind.port", port + 4);
+ conf.set(dnPortKey, "127.0.0.1:" + (port + 5));
+ conf.set(blockClientKey, "127.0.0.1:" + (port + 6));
+ conf.set(ssClientKey, "127.0.0.1:" + (port + 7));
+ conf.setInt(scmGrpcPortKey, port + 8);
+ scmNames.append(",").append("localhost:" + (port + 5));
+ conf.set(ScmConfigKeys.
+ OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:" + (port + 6));
+ }
+
+ conf.set(scmNodesKey, scmNodesKeyValue.substring(1));
+ conf.set(ScmConfigKeys.OZONE_SCM_NAMES, scmNames.substring(1));
+ }
+
+ /**
+ * Initialize HA related configurations.
+ */
+ private void initOMHAConfig(int basePort) throws IOException {
+ // Set configurations required for starting OM HA service, because that
+ // is the serviceID being passed to start Ozone HA cluster.
+ // Here setting internal service and OZONE_OM_SERVICE_IDS_KEY, in this
+ // way in OM start it uses internal service id to find it's service id.
+ conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServiceId);
+ conf.set(OMConfigKeys.OZONE_OM_INTERNAL_SERVICE_ID, omServiceId);
+ String omNodesKey = ConfUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId);
+ StringBuilder omNodesKeyValue = new StringBuilder();
+
+ int port = basePort;
+
+ for (int i = 1; i <= numOfOMs; i++, port+=6) {
+ String omNodeId = OM_NODE_ID_PREFIX + i;
+ omNodesKeyValue.append(",").append(omNodeId);
+ String omAddrKey = ConfUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_ADDRESS_KEY, omServiceId, omNodeId);
+ String omHttpAddrKey = ConfUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, omServiceId, omNodeId);
+ String omHttpsAddrKey = ConfUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, omServiceId, omNodeId);
+ String omRatisPortKey = ConfUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, omServiceId, omNodeId);
+
+ conf.set(omAddrKey, "127.0.0.1:" + port);
+ conf.set(omHttpAddrKey, "127.0.0.1:" + (port + 2));
+ conf.set(omHttpsAddrKey, "127.0.0.1:" + (port + 3));
+ conf.setInt(omRatisPortKey, port + 4);
+ }
+
+ conf.set(omNodesKey, omNodesKeyValue.substring(1));
+ }
+ }
+
+ @FunctionalInterface
+ public interface CheckedConsumer<T> {
+ void apply(T t) throws IOException;
+ }
+
+ /**
+ * MiniOzoneHAService is a helper class used for both SCM and OM HA.
+ * This class keeps track of active and inactive OM/SCM services
+ * @param <Type>
+ */
+ static class MiniOzoneHAService<Type> {
+ private Map<String, Type> serviceMap;
+ private List<Type> services;
+ private String serviceId;
+ private String serviceName;
+
+ // Active services s denote OM/SCM services which are up and running
+ private List<Type> activeServices;
+ private List<Type> inactiveServices;
+
+ MiniOzoneHAService(String name, List<Type> activeList,
+ List<Type> inactiveList, String serviceId,
+ Function<Type, String> idProvider) {
+ this.serviceName = name;
+ this.serviceMap = Maps.newHashMap();
+ if (activeList != null) {
+ for (Type service : activeList) {
+ this.serviceMap.put(idProvider.apply(service), service);
+ }
+ }
+ if (inactiveList != null) {
+ for (Type service : inactiveList) {
+ this.serviceMap.put(idProvider.apply(service), service);
+ }
+ }
+ this.services = new ArrayList<>(serviceMap.values());
+ this.activeServices = activeList;
+ this.inactiveServices = inactiveList;
+ this.serviceId = serviceId;
+
+ // If the serviceID is null, then this should be a non-HA cluster.
+ if (serviceId == null) {
+ Preconditions.checkArgument(services.size() <= 1);
+ }
+ }
+
+ public String getServiceId() {
+ return serviceId;
+ }
+
+ public List<Type> getServices() {
+ return services;
+ }
+
+ public boolean isServiceActive(String id) {
+ return activeServices.contains(serviceMap.get(id));
+ }
+
+ public Type getServiceByIndex(int index) {
+ return this.services.get(index);
+ }
+
+ public Type getServiceById(String id) {
+ return this.serviceMap.get(id);
+ }
+
+ public void startInactiveService(String id,
+ CheckedConsumer<Type> serviceStarter) throws IOException {
+ Type service = serviceMap.get(id);
+ if (!inactiveServices.contains(service)) {
+ throw new IOException(serviceName + " is already active.");
+ } else {
+ serviceStarter.apply(service);
+ activeServices.add(service);
+ inactiveServices.remove(service);
+ }
+ }
+ }
+
+ static class OMHAService extends MiniOzoneHAService<OzoneManager> {
+ OMHAService(List<OzoneManager> activeList, List<OzoneManager> inactiveList,
+ String serviceId) {
+ super("OM", activeList, inactiveList, serviceId,
+ OzoneManager::getOMNodeId);
+ }
+ }
+
+ static class SCMHAService extends
+ MiniOzoneHAService<StorageContainerManager> {
+ SCMHAService(List<StorageContainerManager> activeList,
+ List<StorageContainerManager> inactiveList,
+ String serviceId) {
+ super("SCM", activeList, inactiveList, serviceId,
+ StorageContainerManager::getScmId);
+ }
+ }
+
+ public List<StorageContainerManager> getStorageContainerManagers() {
+ return this.scmhaService.getServices();
+ }
+
+ public StorageContainerManager getStorageContainerManager() {
+ return getStorageContainerManagers().get(0);
+ }
+
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneOMHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneOMHAClusterImpl.java
index aa40b2e..a28c07a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneOMHAClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneOMHAClusterImpl.java
@@ -18,63 +18,27 @@
package org.apache.hadoop.ozone;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.ha.ConfUtils;
-import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.OMStorage;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.recon.ReconServer;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.BindException;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import java.util.Collections;
/**
* MiniOzoneOMHAClusterImpl creates a complete in-process Ozone cluster
* with OM HA suitable for running tests. The cluster consists of a set of
* OzoneManagers, StorageContainerManager and multiple DataNodes.
*/
-public class MiniOzoneOMHAClusterImpl extends MiniOzoneClusterImpl {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(MiniOzoneOMHAClusterImpl.class);
-
- private Map<String, OzoneManager> ozoneManagerMap;
- private List<OzoneManager> ozoneManagers;
- private String omServiceId;
-
- // Active OMs denote OMs which are up and running
- private List<OzoneManager> activeOMs;
- private List<OzoneManager> inactiveOMs;
-
- private int waitForOMToBeReadyTimeout = 120000; // 2 min
-
- private static final Random RANDOM = new Random();
- private static final int RATIS_RPC_TIMEOUT = 1000; // 1 second
+public final class MiniOzoneOMHAClusterImpl extends MiniOzoneHAClusterImpl {
public static final int NODE_FAILURE_TIMEOUT = 2000; // 2 seconds
/**
- * Creates a new MiniOzoneCluster.
+ * Creates a new MiniOzoneOMHACluster.
*
* @throws IOException if there is an I/O error
*/
@@ -87,173 +51,14 @@ public class MiniOzoneOMHAClusterImpl extends MiniOzoneClusterImpl {
List<HddsDatanodeService> hddsDatanodes,
String omServiceId,
ReconServer reconServer) {
- super(conf, scm, hddsDatanodes, reconServer);
-
- this.ozoneManagerMap = Maps.newHashMap();
- if (activeOMList != null) {
- for (OzoneManager om : activeOMList) {
- this.ozoneManagerMap.put(om.getOMNodeId(), om);
- }
- }
- if (inactiveOMList != null) {
- for (OzoneManager om : inactiveOMList) {
- this.ozoneManagerMap.put(om.getOMNodeId(), om);
- }
- }
- this.ozoneManagers = new ArrayList<>(ozoneManagerMap.values());
- this.activeOMs = activeOMList;
- this.inactiveOMs = inactiveOMList;
- this.omServiceId = omServiceId;
-
- // If the serviceID is null, then this should be a non-HA cluster.
- if (omServiceId == null) {
- Preconditions.checkArgument(ozoneManagers.size() <= 1);
- }
- }
-
- /**
- * Creates a new MiniOzoneCluster with all OMs active.
- * This is used by MiniOzoneChaosCluster.
- */
- protected MiniOzoneOMHAClusterImpl(
- OzoneConfiguration conf,
- List<OzoneManager> omList,
- StorageContainerManager scm,
- List<HddsDatanodeService> hddsDatanodes,
- String omServiceId) {
- this(conf, omList, null, scm, hddsDatanodes, omServiceId, null);
- }
-
- @Override
- public String getServiceId() {
- return omServiceId;
- }
-
- /**
- * Returns the first OzoneManager from the list.
- * @return
- */
- @Override
- public OzoneManager getOzoneManager() {
- return this.ozoneManagers.get(0);
- }
-
- @Override
- public OzoneClient getRpcClient() throws IOException {
- if (omServiceId == null) {
- // Non-HA cluster.
- return OzoneClientFactory.getRpcClient(getConf());
- } else {
- // HA cluster
- return OzoneClientFactory.getRpcClient(omServiceId, getConf());
- }
- }
-
- public boolean isOMActive(String omNodeId) {
- return activeOMs.contains(ozoneManagerMap.get(omNodeId));
- }
-
- public OzoneManager getOzoneManager(int index) {
- return this.ozoneManagers.get(index);
- }
-
- public OzoneManager getOzoneManager(String omNodeId) {
- return this.ozoneManagerMap.get(omNodeId);
- }
-
- public List<OzoneManager> getOzoneManagersList() {
- return ozoneManagers;
- }
-
- /**
- * Get OzoneManager leader object.
- * @return OzoneManager object, null if there isn't one or more than one
- */
- public OzoneManager getOMLeader() {
- OzoneManager res = null;
- for (OzoneManager ozoneManager : this.ozoneManagers) {
- if (ozoneManager.isLeaderReady()) {
- if (res != null) {
- // Found more than one leader
- // Return null, expect the caller to retry in a while
- return null;
- }
- // Found a leader
- res = ozoneManager;
- }
- }
- return res;
- }
-
- /**
- * Start a previously inactive OM.
- */
- public void startInactiveOM(String omNodeID) throws IOException {
- OzoneManager ozoneManager = ozoneManagerMap.get(omNodeID);
- if (!inactiveOMs.contains(ozoneManager)) {
- throw new IOException("OM is already active.");
- } else {
- ozoneManager.start();
- activeOMs.add(ozoneManager);
- inactiveOMs.remove(ozoneManager);
- }
- }
-
- @Override
- public void restartOzoneManager() throws IOException {
- for (OzoneManager ozoneManager : ozoneManagers) {
- ozoneManager.stop();
- ozoneManager.restart();
- }
- }
-
- public void shutdownOzoneManager(OzoneManager ozoneManager) {
- LOG.info("Shutting down OzoneManager " + ozoneManager.getOMNodeId());
-
- ozoneManager.stop();
- }
-
- public void restartOzoneManager(OzoneManager ozoneManager, boolean waitForOM)
- throws IOException, TimeoutException, InterruptedException {
- LOG.info("Restarting OzoneManager " + ozoneManager.getOMNodeId());
- ozoneManager.restart();
-
- if (waitForOM) {
- GenericTestUtils.waitFor(ozoneManager::isRunning,
- 1000, waitForOMToBeReadyTimeout);
- }
- }
-
- @Override
- public void stop() {
- for (OzoneManager ozoneManager : ozoneManagers) {
- if (ozoneManager != null) {
- LOG.info("Stopping the OzoneManager {}", ozoneManager.getOMNodeId());
- ozoneManager.stop();
- ozoneManager.join();
- }
- }
- super.stop();
- }
-
- public void stopOzoneManager(int index) {
- ozoneManagers.get(index).stop();
- ozoneManagers.get(index).join();
- }
-
- public void stopOzoneManager(String omNodeId) {
- ozoneManagerMap.get(omNodeId).stop();
- ozoneManagerMap.get(omNodeId).join();
+ super(conf, activeOMList, inactiveOMList, Collections.singletonList(scm),
+ null, hddsDatanodes, omServiceId, null, reconServer);
}
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
- public static class Builder extends MiniOzoneClusterImpl.Builder {
-
- private static final String NODE_ID_PREFIX = "omNode-";
- private List<OzoneManager> activeOMs = new ArrayList<>();
- private List<OzoneManager> inactiveOMs = new ArrayList<>();
+ public static class Builder extends MiniOzoneHAClusterImpl.Builder {
/**
* Creates a new Builder.
@@ -295,154 +100,16 @@ public class MiniOzoneOMHAClusterImpl extends MiniOzoneClusterImpl {
}
final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(
- scm, reconServer);
+ Collections.singletonList(scm), reconServer);
- MiniOzoneOMHAClusterImpl cluster = new MiniOzoneOMHAClusterImpl(conf,
- activeOMs, inactiveOMs, scm, hddsDatanodes, omServiceId, reconServer);
+ MiniOzoneClusterImpl cluster = new MiniOzoneOMHAClusterImpl(conf,
+ getActiveOMs(), getInactiveOMs(), scm, hddsDatanodes,
+ omServiceId, reconServer);
if (startDataNodes) {
cluster.startHddsDatanodes();
}
return cluster;
}
-
- protected void initOMRatisConf() {
- conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
- conf.setInt(OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY, numOfOmHandlers);
-
- // If test change the following config values we will respect,
- // otherwise we will set lower timeout values.
- long defaultDuration = OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT
- .getDuration();
- long curRatisRpcTimeout = conf.getTimeDuration(
- OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY,
- defaultDuration, TimeUnit.MILLISECONDS);
- conf.setTimeDuration(OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY,
- defaultDuration == curRatisRpcTimeout ?
- RATIS_RPC_TIMEOUT : curRatisRpcTimeout, TimeUnit.MILLISECONDS);
-
- long defaultNodeFailureTimeout =
- OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT.
- getDuration();
- long curNodeFailureTimeout = conf.getTimeDuration(
- OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
- defaultNodeFailureTimeout,
- OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT.
- getUnit());
- conf.setTimeDuration(
- OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
- curNodeFailureTimeout == defaultNodeFailureTimeout ?
- NODE_FAILURE_TIMEOUT : curNodeFailureTimeout,
- TimeUnit.MILLISECONDS);
- }
-
- /**
- * Start OM service with multiple OMs.
- */
- protected List<OzoneManager> createOMService() throws IOException,
- AuthenticationException {
-
- List<OzoneManager> omList = Lists.newArrayList();
-
- int retryCount = 0;
- int basePort = 10000;
-
- while (true) {
- try {
- basePort = 10000 + RANDOM.nextInt(1000) * 4;
- initHAConfig(basePort);
-
- for (int i = 1; i<= numOfOMs; i++) {
- // Set nodeId
- String nodeId = NODE_ID_PREFIX + i;
- OzoneConfiguration config = new OzoneConfiguration(conf);
- config.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
- // Set the OM http(s) address to null so that the cluster picks
- // up the address set with service ID and node ID in initHAConfig
- config.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, "");
- config.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, "");
-
- // Set metadata/DB dir base path
- String metaDirPath = path + "/" + nodeId;
- config.set(OZONE_METADATA_DIRS, metaDirPath);
- OMStorage omStore = new OMStorage(config);
- initializeOmStorage(omStore);
-
- OzoneManager om = OzoneManager.createOm(config);
- if (certClient != null) {
- om.setCertClient(certClient);
- }
- omList.add(om);
-
- if (i <= numOfActiveOMs) {
- om.start();
- activeOMs.add(om);
- LOG.info("Started OzoneManager RPC server at {}",
- om.getOmRpcServerAddr());
- } else {
- inactiveOMs.add(om);
- LOG.info("Intialized OzoneManager at {}. This OM is currently "
- + "inactive (not running).", om.getOmRpcServerAddr());
- }
- }
-
- // Set default OM address to point to the first OM. Clients would
- // try connecting to this address by default
- conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
- NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr()));
-
- break;
- } catch (BindException e) {
- for (OzoneManager om : omList) {
- om.stop();
- om.join();
- LOG.info("Stopping OzoneManager server at {}",
- om.getOmRpcServerAddr());
- }
- omList.clear();
- ++retryCount;
- LOG.info("MiniOzoneOMHACluster port conflicts, retried {} times",
- retryCount);
- }
- }
- return omList;
- }
-
- /**
- * Initialize HA related configurations.
- */
- private void initHAConfig(int basePort) throws IOException {
- // Set configurations required for starting OM HA service, because that
- // is the serviceID being passed to start Ozone HA cluster.
- // Here setting internal service and OZONE_OM_SERVICE_IDS_KEY, in this
- // way in OM start it uses internal service id to find it's service id.
- conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServiceId);
- conf.set(OMConfigKeys.OZONE_OM_INTERNAL_SERVICE_ID, omServiceId);
- String omNodesKey = ConfUtils.addKeySuffixes(
- OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId);
- StringBuilder omNodesKeyValue = new StringBuilder();
-
- int port = basePort;
-
- for (int i = 1; i <= numOfOMs; i++, port+=6) {
- String omNodeId = NODE_ID_PREFIX + i;
- omNodesKeyValue.append(",").append(omNodeId);
- String omAddrKey = ConfUtils.addKeySuffixes(
- OMConfigKeys.OZONE_OM_ADDRESS_KEY, omServiceId, omNodeId);
- String omHttpAddrKey = ConfUtils.addKeySuffixes(
- OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, omServiceId, omNodeId);
- String omHttpsAddrKey = ConfUtils.addKeySuffixes(
- OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, omServiceId, omNodeId);
- String omRatisPortKey = ConfUtils.addKeySuffixes(
- OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, omServiceId, omNodeId);
-
- conf.set(omAddrKey, "127.0.0.1:" + port);
- conf.set(omHttpAddrKey, "127.0.0.1:" + (port + 2));
- conf.set(omHttpsAddrKey, "127.0.0.1:" + (port + 3));
- conf.setInt(omRatisPortKey, port + 4);
- }
-
- conf.set(omNodesKey, omNodesKeyValue.substring(1));
- }
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index 9a6db3a..76a4f82 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -39,6 +39,9 @@ import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails;
+import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -326,6 +329,10 @@ public final class TestSecureOzoneCluster {
scmStore.setScmId(scmId);
// writes the version file properties
scmStore.initialize();
+ if (!SCMHAUtils.isSCMHAEnabled(conf)) {
+ SCMRatisServerImpl.initialize(clusterId, scmId,
+ SCMHANodeDetails.loadSCMHAConfig(conf).getLocalNodeDetails(), conf);
+ }
}
@Test
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index b6d46fd..fce4e57 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -463,15 +463,17 @@ public class TestStorageContainerManager {
Path scmPath = Paths.get(path, "scm-meta");
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
+ UUID clusterId = UUID.randomUUID();
+ String testClusterId = clusterId.toString();
// This will initialize SCM
- StorageContainerManager.scmInit(conf, "testClusterId");
+ StorageContainerManager.scmInit(conf, testClusterId);
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
- Assert.assertEquals("testClusterId", scmStore.getClusterID());
- StorageContainerManager.scmInit(conf, "testClusterIdNew");
+ Assert.assertEquals(testClusterId, scmStore.getClusterID());
+ StorageContainerManager.scmInit(conf, testClusterId);
Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
- Assert.assertEquals("testClusterId", scmStore.getClusterID());
+ Assert.assertEquals(testClusterId, scmStore.getClusterID());
}
@Test
@@ -483,9 +485,10 @@ public class TestStorageContainerManager {
Path scmPath = Paths.get(path, "scm-meta");
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
+ final UUID clusterId = UUID.randomUUID();
// This will initialize SCM
- StorageContainerManager.scmInit(conf, "testClusterId");
- SCMRatisServerImpl.validateRatisGroupExists(conf, "testClusterId");
+ StorageContainerManager.scmInit(conf, clusterId.toString());
+ SCMRatisServerImpl.validateRatisGroupExists(conf, clusterId.toString());
}
@Test
@@ -500,11 +503,11 @@ public class TestStorageContainerManager {
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
cluster.waitForClusterToBeReady();
try {
+ final UUID clusterId = UUID.randomUUID();
// This will initialize SCM
- StorageContainerManager.scmInit(conf, "testClusterId");
+ StorageContainerManager.scmInit(conf, clusterId.toString());
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
- Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
- Assert.assertNotEquals("testClusterId", scmStore.getClusterID());
+ Assert.assertNotEquals(clusterId.toString(), scmStore.getClusterID());
} finally {
cluster.shutdown();
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 9057b0c..15f4581 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -114,6 +114,7 @@ public class TestContainerStateMachine {
.setHbInterval(200)
.setCertificateClient(new CertificateClientTestImpl(conf))
.build();
+ cluster.setWaitForClusterToBeReadyTimeout(300000);
cluster.waitForClusterToBeReady();
cluster.getOzoneManager().startSecretManager();
//the easiest way to create an open container is creating a key
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
new file mode 100644
index 0000000..abf1e3d
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.Rule;
+
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
+
+/**
+ * Base class for Ozone Manager HA tests.
+ */
+public class TestStorageContainerManagerHA {
+
+ private MiniOzoneHAClusterImpl cluster = null;
+ private OzoneConfiguration conf;
+ private String clusterId;
+ private String scmId;
+ private String omServiceId;
+ private static int numOfOMs = 3;
+ private String scmServiceId;
+ private static int numOfSCMs = 3;
+
+
+ @Rule
+ public Timeout timeout = new Timeout(300_000);
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ * <p>
+ * Ozone is made active by setting OZONE_ENABLED = true
+ *
+ * @throws IOException
+ */
+ @Before
+ public void init() throws Exception {
+ conf = new OzoneConfiguration();
+ clusterId = UUID.randomUUID().toString();
+ scmId = UUID.randomUUID().toString();
+ omServiceId = "om-service-test1";
+ scmServiceId = "scm-service-test1";
+ cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
+ .setClusterId(clusterId)
+ .setScmId(scmId)
+ .setOMServiceId(omServiceId)
+ .setSCMServiceId(scmServiceId)
+ .setNumOfStorageContainerManagers(numOfSCMs)
+ .setNumOfOzoneManagers(numOfOMs)
+ .build();
+ cluster.waitForClusterToBeReady();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @After
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testAllSCMAreRunning() throws Exception {
+ int count = 0;
+ List<StorageContainerManager> scms = cluster.getStorageContainerManagers();
+ Assert.assertEquals(numOfSCMs, scms.size());
+ int peerSize = cluster.getStorageContainerManager().getScmHAManager()
+ .getRatisServer().getDivision().getGroup().getPeers().size();
+ for (StorageContainerManager scm : scms) {
+ if (scm.checkLeader()) {
+ count++;
+ }
+ Assert.assertTrue(peerSize == numOfSCMs);
+ }
+ Assert.assertEquals(1, count);
+ count = 0;
+ List<OzoneManager> oms = cluster.getOzoneManagersList();
+ Assert.assertEquals(numOfOMs, oms.size());
+ for (OzoneManager om : oms) {
+ if (om.isLeaderReady()) {
+ count++;
+ }
+ }
+ Assert.assertEquals(1, count);
+ testPutKey();
+ }
+
+ public void testPutKey() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ Instant testStartTime = Instant.now();
+ ObjectStore store =
+ OzoneClientFactory.getRpcClient(cluster.getConf()).getObjectStore();
+ String value = "sample value";
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ String keyName = UUID.randomUUID().toString();
+
+ OzoneOutputStream out = bucket
+ .createKey(keyName, value.getBytes(UTF_8).length, STAND_ALONE, ONE,
+ new HashMap<>());
+ out.write(value.getBytes(UTF_8));
+ out.close();
+ OzoneKey key = bucket.getKey(keyName);
+ Assert.assertEquals(keyName, key.getName());
+ OzoneInputStream is = bucket.readKey(keyName);
+ byte[] fileContent = new byte[value.getBytes(UTF_8).length];
+ is.read(fileContent);
+ Assert.assertEquals(value, new String(fileContent, UTF_8));
+ Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
+ Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
+ is.close();
+ final OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+ .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+ .setRefreshPipeline(true).build();
+ final OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+ final List<OmKeyLocationInfo> keyLocationInfos =
+ keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+ long index = -1;
+ for (StorageContainerManager scm : cluster.getStorageContainerManagers()) {
+ if (scm.checkLeader()) {
+ index = getLastAppliedIndex(scm);
+ }
+ }
+ Assert.assertFalse(index == -1);
+ long finalIndex = index;
+ // Ensure all follower scms have caught up with the leader
+ GenericTestUtils.waitFor(() -> areAllScmInSync(finalIndex), 100, 10000);
+ final long containerID = keyLocationInfos.get(0).getContainerID();
+ for (int k = 0; k < numOfSCMs; k++) {
+ StorageContainerManager scm =
+ cluster.getStorageContainerManagers().get(k);
+ // flush to DB on each SCM
+ ((SCMRatisServerImpl) scm.getScmHAManager().getRatisServer())
+ .getStateMachine().takeSnapshot();
+ Assert.assertTrue(scm.getContainerManager()
+ .containerExist(ContainerID.valueOf(containerID)));
+ Assert.assertNotNull(scm.getScmMetadataStore().getContainerTable()
+ .get(ContainerID.valueOf(containerID)));
+ }
+ }
+
+ private long getLastAppliedIndex(StorageContainerManager scm) {
+ return scm.getScmHAManager().getRatisServer().getDivision().getInfo()
+ .getLastAppliedIndex();
+ }
+
+ private boolean areAllScmInSync(long leaderIndex) {
+ List<StorageContainerManager> scms = cluster.getStorageContainerManagers();
+ boolean sync = false;
+ for (StorageContainerManager scm : scms) {
+ sync = getLastAppliedIndex(scm) == leaderIndex;
+ }
+ return sync;
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
index 49d9f47..32b2ac9 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
@@ -183,6 +184,11 @@ public class ScmBlockLocationTestingClient implements ScmBlockLocationProtocol {
}
@Override
+ public boolean addSCM(AddSCMRequest request) throws IOException {
+ return false;
+ }
+
+ @Override
public List<DatanodeDetails> sortDatanodes(List<String> nodes,
String clientMachine) throws IOException {
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org