You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2021/04/23 04:25:06 UTC
[ozone] branch master updated: HDDS-5103. Fix Install Snapshot
Mechanism in SCMStateMachine. (#2155)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new bef180e HDDS-5103. Fix Install Snapshot Mechanism in SCMStateMachine. (#2155)
bef180e is described below
commit bef180e11d73cc994f19b1b5350cb6aa8417b531
Author: bshashikant <sh...@apache.org>
AuthorDate: Fri Apr 23 09:54:42 2021 +0530
HDDS-5103. Fix Install Snapshot Mechanism in SCMStateMachine. (#2155)
---
.../hadoop/hdds/scm/ha/InterSCMGrpcClient.java | 14 ++----
.../hdds/scm/ha/InterSCMGrpcProtocolService.java | 1 +
.../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 14 +++++-
.../apache/hadoop/hdds/scm/ha/SCMNodeDetails.java | 4 ++
.../hadoop/hdds/scm/ha/SCMSnapshotDownloader.java | 2 +-
.../hadoop/hdds/scm/ha/SCMSnapshotProvider.java | 28 +++++------
.../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 19 ++++++--
.../hadoop/hdds/scm/TestSCMInstallSnapshot.java | 4 +-
.../ozone/scm/TestSCMInstallSnapshotWithHA.java | 56 ++++------------------
pom.xml | 6 +--
10 files changed, 63 insertions(+), 85 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
index 2c4b98f..08dd307 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos;
import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointResponseProto;
import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolServiceGrpc;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
@@ -52,13 +51,10 @@ public class InterSCMGrpcClient implements SCMSnapshotDownloader{
private final InterSCMProtocolServiceGrpc.InterSCMProtocolServiceStub
client;
- private final long timeout;
-
- public InterSCMGrpcClient(final String host, final ConfigurationSource conf) {
- Preconditions.checkNotNull(conf);
- int port = conf.getInt(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY,
- ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT);
- timeout =
+ public InterSCMGrpcClient(final String host, final int leaderPort,
+ final ConfigurationSource conf) {
+ final int port = leaderPort;
+ final long timeout =
conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval();
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(host, port).usePlaintext()
@@ -95,7 +91,7 @@ public class InterSCMGrpcClient implements SCMSnapshotDownloader{
}
@Override
- public void close() throws Exception {
+ public void close() {
shutdown();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
index d92220a..b6f08a5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
@@ -69,6 +69,7 @@ public class InterSCMGrpcProtocolService {
LOG.info("Ignore. already started.");
return;
} else {
+ LOG.info("Starting SCM Grpc Service at port {}", port);
server.start();
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index dca469a..00aff5ec 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -101,14 +101,19 @@ public class SCMHAManagerImpl implements SCMHAManager {
if (ratisServer.getDivision().getGroup().getPeers().isEmpty()) {
// this is a bootstrapped node
// It will first try to add itself to existing ring
- boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf),
+ final SCMNodeDetails nodeDetails =
+ scm.getSCMHANodeDetails().getLocalNodeDetails();
+ final boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf),
new AddSCMRequest.Builder().setClusterId(scm.getClusterId())
.setScmId(scm.getScmId())
- .setRatisAddr(scm.getSCMHANodeDetails().getLocalNodeDetails()
+ .setRatisAddr(nodeDetails
// TODO : Should we use IP instead of hostname??
.getRatisHostPortStr()).build(), scm.getSCMNodeId());
if (!success) {
throw new IOException("Adding SCM to existing HA group failed");
+ } else {
+ LOG.info("Successfully added SCM {} to group {}",
+ nodeDetails.getNodeId(), ratisServer.getDivision().getGroup());
}
} else {
LOG.info(" scm role is {} peers {}",
@@ -357,6 +362,11 @@ public class SCMHAManagerImpl implements SCMHAManager {
}
@VisibleForTesting
+ public void stopGrpcService() {
+ grpcServer.stop();
+ }
+
+ @VisibleForTesting
public static Logger getLogger() {
return LOG;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
index 47b0a23..17901ec 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
@@ -210,4 +210,8 @@ public final class SCMNodeDetails extends NodeDetails {
public String getDatanodeAddressKey() {
return datanodeAddressKey;
}
+
+ public int getGrpcPort() {
+ return grpcPort;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java
index 7d5d3eb..7713a8c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java
@@ -39,5 +39,5 @@ public interface SCMSnapshotDownloader {
*/
CompletableFuture<Path> download(Path destination) throws IOException;
- void close() throws Exception;
+ void close();
}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java
index 093b810..e5bdfbe 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java
@@ -55,8 +55,6 @@ public class SCMSnapshotProvider {
private final ConfigurationSource conf;
- private SCMSnapshotDownloader client;
-
private Map<String, SCMNodeDetails> peerNodesMap;
public SCMSnapshotProvider(ConfigurationSource conf,
@@ -81,13 +79,13 @@ public class SCMSnapshotProvider {
this.peerNodesMap.put(peerNode.getNodeId(), peerNode);
}
}
- this.client = null;
}
@VisibleForTesting
public void setPeerNodesMap(Map<String, SCMNodeDetails> peerNodesMap) {
this.peerNodesMap = peerNodesMap;
}
+
/**
* Download the latest checkpoint from SCM Leader .
* @param leaderSCMNodeID leader SCM Node ID.
@@ -103,18 +101,19 @@ public class SCMSnapshotProvider {
.getAbsolutePath();
File targetFile = new File(snapshotFilePath + ".tar.gz");
- // the client instance will be initialized only when first install snapshot
- // notification from ratis leader will be received.
- if (client == null) {
- client = new InterSCMGrpcClient(
- peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(),
- conf);
- }
+ // the downloadClient instance will be created as and when install snapshot
+ // request is received. No caching of the client as it should be a very rare
+ int port = peerNodesMap.get(leaderSCMNodeID).getGrpcPort();
+ SCMSnapshotDownloader downloadClient = new InterSCMGrpcClient(
+ peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(),
+ port, conf);
try {
- client.download(targetFile.toPath()).get();
- } catch (InterruptedException | ExecutionException e) {
+ downloadClient.download(targetFile.toPath()).get();
+ } catch (ExecutionException | InterruptedException e) {
LOG.error("Rocks DB checkpoint downloading failed", e);
throw new IOException(e);
+ } finally {
+ downloadClient.close();
}
@@ -136,9 +135,4 @@ public class SCMSnapshotProvider {
return scmSnapshotDir;
}
- public void stop() throws Exception {
- if (client != null) {
- client.close();
- }
- }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 29dcf75..bef743b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -22,6 +22,7 @@ import java.lang.reflect.InvocationTargetException;
import java.util.EnumMap;
import java.util.Map;
import java.util.Collection;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -51,6 +52,7 @@ import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.util.ExitUtils;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -184,11 +186,20 @@ public class SCMStateMachine extends BaseStateMachine {
@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
-
- String leaderNodeId = RaftPeerId.valueOf(roleInfoProto.getFollowerInfo()
- .getLeaderInfo().getId().getId()).toString();
+ if (!roleInfoProto.getFollowerInfo().hasLeaderInfo()) {
+ return JavaUtils.completeExceptionally(new IOException("Failed to " +
+ "notifyInstallSnapshotFromLeader due to missing leader info"));
+ }
+ String leaderAddress = roleInfoProto.getFollowerInfo()
+ .getLeaderInfo().getId().getAddress();
+ Optional<SCMNodeDetails> leaderDetails =
+ scm.getSCMHANodeDetails().getPeerNodeDetails().stream().filter(
+ p -> p.getRatisHostPortStr().equals(leaderAddress))
+ .findFirst();
+ Preconditions.checkState(leaderDetails.isPresent());
+ final String leaderNodeId = leaderDetails.get().getNodeId();
LOG.info("Received install snapshot notification from SCM leader: {} with "
- + "term index: {}", leaderNodeId, firstTermIndexInLog);
+ + "term index: {}", leaderAddress, firstTermIndexInLog);
CompletableFuture<TermIndex> future = CompletableFuture.supplyAsync(
() -> scm.getScmHAManager().installSnapshotFromLeader(leaderNodeId),
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
index a7864b0..5f35b12 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
@@ -100,7 +100,9 @@ public class TestSCMInstallSnapshot {
new RatisReplicationConfig(ONE), "Owner2").getPipelineID());
pipelineManager.openPipeline(ratisPipeline2.getId());
SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder()
- .setRpcAddress(new InetSocketAddress("0.0.0.0", 0)).setSCMNodeId("scm1")
+ .setRpcAddress(new InetSocketAddress("0.0.0.0", 0))
+ .setGrpcPort(ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT)
+ .setSCMNodeId("scm1")
.build();
Map<String, SCMNodeDetails> peerMap = new HashMap<>();
peerMap.put(scmNodeDetails.getNodeId(), scmNodeDetails);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
index 1d8646f..a0196c9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
@@ -51,7 +51,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import org.junit.jupiter.api.Disabled;
import org.slf4j.Logger;
import org.slf4j.event.Level;
@@ -71,7 +70,7 @@ public class TestSCMInstallSnapshotWithHA {
private int numOfSCMs = 3;
private static final long SNAPSHOT_THRESHOLD = 5;
- // private static final int LOG_PURGE_GAP = 5;
+ private static final int LOG_PURGE_GAP = 5;
/**
* Create a MiniOzoneCluster for testing.
@@ -87,8 +86,8 @@ public class TestSCMInstallSnapshotWithHA {
scmServiceId = "scm-service-test1";
SCMHAConfiguration scmhaConfiguration =
conf.getObject(SCMHAConfiguration.class);
- // scmhaConfiguration.setRaftLogPurgeEnabled(true);
- // scmhaConfiguration.setRaftLogPurgeGap(LOG_PURGE_GAP);
+ scmhaConfiguration.setRaftLogPurgeEnabled(true);
+ scmhaConfiguration.setRaftLogPurgeGap(LOG_PURGE_GAP);
scmhaConfiguration.setRatisSnapshotThreshold(SNAPSHOT_THRESHOLD);
conf.setFromObject(scmhaConfiguration);
@@ -114,22 +113,10 @@ public class TestSCMInstallSnapshotWithHA {
}
}
- /**
- * This test is disabled for now as there seems to be an issue with
- * Ratis install Snapshot code. In ratis while a new node gets added,
- * unless and until the node gets added to the voter list, the follower state
- * is not updated with leader info. So, while an install snapshot notification
- * is received in the leader, the leader info is not set and hence, out of
- * ratis transfer using the same leader info doesn't work.
- *
- * TODO: Fix this
- * */
@Test
- @Disabled
public void testInstallSnapshot() throws Exception {
// Get the leader SCM
StorageContainerManager leaderSCM = getLeader(cluster);
- String leaderNodeId = leaderSCM.getScmNodeDetails().getNodeId();
Assert.assertNotNull(leaderSCM);
// Find the inactive SCM
String followerId = getInactiveSCM(cluster).getScmId();
@@ -138,20 +125,9 @@ public class TestSCMInstallSnapshotWithHA {
// Do some transactions so that the log index increases
List<ContainerInfo> containers = writeToIncreaseLogIndex(leaderSCM, 200);
- // Get the latest db checkpoint from the leader SCM.
- TransactionInfo transactionInfo =
- leaderSCM.getScmHAManager().asSCMHADBTransactionBuffer()
- .getLatestTrxInfo();
- TermIndex leaderTermIndex =
- TermIndex.valueOf(transactionInfo.getTerm(),
- transactionInfo.getTransactionIndex());
- long leaderSnaphsotIndex = leaderTermIndex.getIndex();
- long leaderSnapshotTermIndex = leaderTermIndex.getTerm();
-
- DBCheckpoint leaderDbCheckpoint =
- leaderSCM.getScmMetadataStore().getStore().getCheckpoint(false);
-
- // Start the inactive
+ // Start the inactive SCM. Install Snapshot will happen as part
+ // of setConfiguration() call to ratis leader and the follower will catch
+ // up
cluster.startInactiveSCM(followerId);
// The recently started should be lagging behind the leader .
@@ -159,23 +135,7 @@ public class TestSCMInstallSnapshotWithHA {
follower.getScmHAManager().getRatisServer().getSCMStateMachine()
.getLastAppliedTermIndex().getIndex();
assertTrue(
- followerLastAppliedIndex < leaderSnaphsotIndex);
-
- SCMHAManagerImpl scmhaManager =
- (SCMHAManagerImpl) (follower.getScmHAManager());
- // Install leader 's db checkpoint on the lagging .
- scmhaManager.installCheckpoint(leaderNodeId, leaderDbCheckpoint);
-
- SCMStateMachine followerStateMachine =
- follower.getScmHAManager().getRatisServer().getSCMStateMachine();
- // After the new checkpoint is installed, the follower
- // lastAppliedIndex must >= the snapshot index of the checkpoint. It
- // could be great than snapshot index if there is any conf entry from ratis.
- followerLastAppliedIndex = followerStateMachine
- .getLastAppliedTermIndex().getIndex();
- assertTrue(followerLastAppliedIndex >= leaderSnaphsotIndex);
- assertTrue(followerStateMachine
- .getLastAppliedTermIndex().getTerm() >= leaderSnapshotTermIndex);
+ followerLastAppliedIndex >= 200);
// Verify that the follower 's DB contains the transactions which were
// made while it was inactive.
@@ -318,7 +278,7 @@ public class TestSCMInstallSnapshotWithHA {
scm.getScmHAManager().getRatisServer().getSCMStateMachine();
long logIndex = scm.getScmHAManager().getRatisServer().getSCMStateMachine()
.getLastAppliedTermIndex().getIndex();
- while (logIndex < targetLogIndex) {
+ while (logIndex <= targetLogIndex) {
containers.add(scm.getContainerManager()
.allocateContainer(
new RatisReplicationConfig(ReplicationFactor.THREE),
diff --git a/pom.xml b/pom.xml
index f012350..2a38358 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,10 +79,10 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<declared.ozone.version>${ozone.version}</declared.ozone.version>
<!-- Apache Ratis version -->
- <ratis.version>2.0.0</ratis.version>
+ <ratis.version>2.1.0-43915d2-SNAPSHOT</ratis.version>
<!-- Apache Ratis thirdparty version -->
- <ratis.thirdparty.version>0.6.0</ratis.thirdparty.version>
+ <ratis.thirdparty.version>0.7.0-a398b19-SNAPSHOT</ratis.thirdparty.version>
<distMgmtSnapshotsId>apache.snapshots.https</distMgmtSnapshotsId>
<distMgmtSnapshotsName>Apache Development Snapshot Repository</distMgmtSnapshotsName>
@@ -183,7 +183,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<grpc-compile.version>1.33.0</grpc-compile.version>
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
- <netty.version>4.1.51.Final</netty.version>
+ <netty.version>4.1.63.Final</netty.version>
<!-- define the Java language version used by the compiler -->
<javac.version>1.8</javac.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org