You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2021/04/23 04:25:06 UTC

[ozone] branch master updated: HDDS-5103. Fix Install Snapshot Mechanism in SCMStateMachine. (#2155)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new bef180e  HDDS-5103. Fix Install Snapshot Mechanism in SCMStateMachine. (#2155)
bef180e is described below

commit bef180e11d73cc994f19b1b5350cb6aa8417b531
Author: bshashikant <sh...@apache.org>
AuthorDate: Fri Apr 23 09:54:42 2021 +0530

    HDDS-5103. Fix Install Snapshot Mechanism in SCMStateMachine. (#2155)
---
 .../hadoop/hdds/scm/ha/InterSCMGrpcClient.java     | 14 ++----
 .../hdds/scm/ha/InterSCMGrpcProtocolService.java   |  1 +
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       | 14 +++++-
 .../apache/hadoop/hdds/scm/ha/SCMNodeDetails.java  |  4 ++
 .../hadoop/hdds/scm/ha/SCMSnapshotDownloader.java  |  2 +-
 .../hadoop/hdds/scm/ha/SCMSnapshotProvider.java    | 28 +++++------
 .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 19 ++++++--
 .../hadoop/hdds/scm/TestSCMInstallSnapshot.java    |  4 +-
 .../ozone/scm/TestSCMInstallSnapshotWithHA.java    | 56 ++++------------------
 pom.xml                                            |  6 +--
 10 files changed, 63 insertions(+), 85 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
index 2c4b98f..08dd307 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos;
 import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointResponseProto;
 import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolServiceGrpc;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
@@ -52,13 +51,10 @@ public class InterSCMGrpcClient implements SCMSnapshotDownloader{
   private final InterSCMProtocolServiceGrpc.InterSCMProtocolServiceStub
       client;
 
-  private final long timeout;
-
-  public InterSCMGrpcClient(final String host, final ConfigurationSource conf) {
-    Preconditions.checkNotNull(conf);
-    int port = conf.getInt(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY,
-        ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT);
-    timeout =
+  public InterSCMGrpcClient(final String host, final int leaderPort,
+      final ConfigurationSource conf) {
+    final int port = leaderPort;
+    final long  timeout =
         conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval();
     NettyChannelBuilder channelBuilder =
         NettyChannelBuilder.forAddress(host, port).usePlaintext()
@@ -95,7 +91,7 @@ public class InterSCMGrpcClient implements SCMSnapshotDownloader{
   }
 
   @Override
-  public void close() throws Exception {
+  public void close() {
     shutdown();
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
index d92220a..b6f08a5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
@@ -69,6 +69,7 @@ public class InterSCMGrpcProtocolService {
       LOG.info("Ignore. already started.");
       return;
     } else {
+      LOG.info("Starting SCM Grpc Service at port {}", port);
       server.start();
     }
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index dca469a..00aff5ec 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -101,14 +101,19 @@ public class SCMHAManagerImpl implements SCMHAManager {
     if (ratisServer.getDivision().getGroup().getPeers().isEmpty()) {
       // this is a bootstrapped node
       // It will first try to add itself to existing ring
-      boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf),
+      final SCMNodeDetails nodeDetails =
+          scm.getSCMHANodeDetails().getLocalNodeDetails();
+      final boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf),
           new AddSCMRequest.Builder().setClusterId(scm.getClusterId())
               .setScmId(scm.getScmId())
-              .setRatisAddr(scm.getSCMHANodeDetails().getLocalNodeDetails()
+              .setRatisAddr(nodeDetails
                   // TODO : Should we use IP instead of hostname??
                   .getRatisHostPortStr()).build(), scm.getSCMNodeId());
       if (!success) {
         throw new IOException("Adding SCM to existing HA group failed");
+      } else {
+        LOG.info("Successfully added SCM {} to group {}",
+            nodeDetails.getNodeId(), ratisServer.getDivision().getGroup());
       }
     } else {
       LOG.info(" scm role is {} peers {}",
@@ -357,6 +362,11 @@ public class SCMHAManagerImpl implements SCMHAManager {
   }
 
   @VisibleForTesting
+  public void stopGrpcService() {
+    grpcServer.stop();
+  }
+
+  @VisibleForTesting
   public static Logger getLogger() {
     return LOG;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
index 47b0a23..17901ec 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
@@ -210,4 +210,8 @@ public final class SCMNodeDetails extends NodeDetails {
   public String getDatanodeAddressKey() {
     return datanodeAddressKey;
   }
+
+  public int getGrpcPort() {
+    return grpcPort;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java
index 7d5d3eb..7713a8c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java
@@ -39,5 +39,5 @@ public interface SCMSnapshotDownloader {
    */
   CompletableFuture<Path> download(Path destination) throws IOException;
 
-  void close() throws Exception;
+  void close();
 }
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java
index 093b810..e5bdfbe 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java
@@ -55,8 +55,6 @@ public class SCMSnapshotProvider {
 
   private final ConfigurationSource conf;
 
-  private SCMSnapshotDownloader client;
-
   private Map<String, SCMNodeDetails> peerNodesMap;
 
   public SCMSnapshotProvider(ConfigurationSource conf,
@@ -81,13 +79,13 @@ public class SCMSnapshotProvider {
         this.peerNodesMap.put(peerNode.getNodeId(), peerNode);
       }
     }
-    this.client = null;
   }
 
   @VisibleForTesting
   public void setPeerNodesMap(Map<String, SCMNodeDetails> peerNodesMap) {
     this.peerNodesMap = peerNodesMap;
   }
+
   /**
    * Download the latest checkpoint from SCM Leader .
    * @param leaderSCMNodeID leader SCM Node ID.
@@ -103,18 +101,19 @@ public class SCMSnapshotProvider {
             .getAbsolutePath();
     File targetFile = new File(snapshotFilePath + ".tar.gz");
 
-    // the client instance will be initialized only when first install snapshot
-    // notification from ratis leader will be received.
-    if (client == null) {
-      client = new InterSCMGrpcClient(
-          peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(),
-          conf);
-    }
+    // the downloadClient instance will be created as and when install snapshot
+    // request is received. No caching of the client as it should be a very rare
+    int port = peerNodesMap.get(leaderSCMNodeID).getGrpcPort();
+    SCMSnapshotDownloader downloadClient = new InterSCMGrpcClient(
+        peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(),
+        port, conf);
     try {
-      client.download(targetFile.toPath()).get();
-    } catch (InterruptedException | ExecutionException e) {
+      downloadClient.download(targetFile.toPath()).get();
+    } catch (ExecutionException | InterruptedException e) {
       LOG.error("Rocks DB checkpoint downloading failed", e);
       throw new IOException(e);
+    } finally {
+      downloadClient.close();
     }
 
 
@@ -136,9 +135,4 @@ public class SCMSnapshotProvider {
     return scmSnapshotDir;
   }
 
-  public void stop() throws Exception {
-    if (client != null) {
-      client.close();
-    }
-  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 29dcf75..bef743b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -22,6 +22,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.EnumMap;
 import java.util.Map;
 import java.util.Collection;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -51,6 +52,7 @@ import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.util.ExitUtils;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -184,11 +186,20 @@ public class SCMStateMachine extends BaseStateMachine {
   @Override
   public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
       RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
-
-    String leaderNodeId = RaftPeerId.valueOf(roleInfoProto.getFollowerInfo()
-        .getLeaderInfo().getId().getId()).toString();
+    if (!roleInfoProto.getFollowerInfo().hasLeaderInfo()) {
+      return JavaUtils.completeExceptionally(new IOException("Failed to " +
+          "notifyInstallSnapshotFromLeader due to missing leader info"));
+    }
+    String leaderAddress = roleInfoProto.getFollowerInfo()
+        .getLeaderInfo().getId().getAddress();
+    Optional<SCMNodeDetails> leaderDetails =
+        scm.getSCMHANodeDetails().getPeerNodeDetails().stream().filter(
+            p -> p.getRatisHostPortStr().equals(leaderAddress))
+            .findFirst();
+    Preconditions.checkState(leaderDetails.isPresent());
+    final String leaderNodeId = leaderDetails.get().getNodeId();
     LOG.info("Received install snapshot notification from SCM leader: {} with "
-        + "term index: {}", leaderNodeId, firstTermIndexInLog);
+        + "term index: {}", leaderAddress, firstTermIndexInLog);
 
     CompletableFuture<TermIndex> future = CompletableFuture.supplyAsync(
         () -> scm.getScmHAManager().installSnapshotFromLeader(leaderNodeId),
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
index a7864b0..5f35b12 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
@@ -100,7 +100,9 @@ public class TestSCMInstallSnapshot {
             new RatisReplicationConfig(ONE), "Owner2").getPipelineID());
     pipelineManager.openPipeline(ratisPipeline2.getId());
     SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder()
-        .setRpcAddress(new InetSocketAddress("0.0.0.0", 0)).setSCMNodeId("scm1")
+        .setRpcAddress(new InetSocketAddress("0.0.0.0", 0))
+        .setGrpcPort(ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT)
+        .setSCMNodeId("scm1")
         .build();
     Map<String, SCMNodeDetails> peerMap = new HashMap<>();
     peerMap.put(scmNodeDetails.getNodeId(), scmNodeDetails);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
index 1d8646f..a0196c9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
@@ -51,7 +51,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
-import org.junit.jupiter.api.Disabled;
 import org.slf4j.Logger;
 import org.slf4j.event.Level;
 
@@ -71,7 +70,7 @@ public class TestSCMInstallSnapshotWithHA {
   private int numOfSCMs = 3;
 
   private static final long SNAPSHOT_THRESHOLD = 5;
- // private static final int LOG_PURGE_GAP = 5;
+  private static final int LOG_PURGE_GAP = 5;
 
   /**
    * Create a MiniOzoneCluster for testing.
@@ -87,8 +86,8 @@ public class TestSCMInstallSnapshotWithHA {
     scmServiceId = "scm-service-test1";
     SCMHAConfiguration scmhaConfiguration =
         conf.getObject(SCMHAConfiguration.class);
-  //  scmhaConfiguration.setRaftLogPurgeEnabled(true);
-  //  scmhaConfiguration.setRaftLogPurgeGap(LOG_PURGE_GAP);
+    scmhaConfiguration.setRaftLogPurgeEnabled(true);
+    scmhaConfiguration.setRaftLogPurgeGap(LOG_PURGE_GAP);
     scmhaConfiguration.setRatisSnapshotThreshold(SNAPSHOT_THRESHOLD);
     conf.setFromObject(scmhaConfiguration);
 
@@ -114,22 +113,10 @@ public class TestSCMInstallSnapshotWithHA {
     }
   }
 
-  /**
-   * This test is disabled for now as there seems to be an issue with
-   * Ratis install Snapshot code. In ratis while a new node gets added,
-   * unless and until the node gets added to the voter list, the follower state
-   * is not updated with leader info. So, while an install snapshot notification
-   * is received in the leader, the leader info is not set and hence, out of
-   * ratis transfer using the same leader info doesn't work.
-   *
-   * TODO: Fix this
-   * */
   @Test
-  @Disabled
   public void testInstallSnapshot() throws Exception {
     // Get the leader SCM
     StorageContainerManager leaderSCM = getLeader(cluster);
-    String leaderNodeId = leaderSCM.getScmNodeDetails().getNodeId();
     Assert.assertNotNull(leaderSCM);
     // Find the inactive SCM
     String followerId = getInactiveSCM(cluster).getScmId();
@@ -138,20 +125,9 @@ public class TestSCMInstallSnapshotWithHA {
     // Do some transactions so that the log index increases
     List<ContainerInfo> containers = writeToIncreaseLogIndex(leaderSCM, 200);
 
-    // Get the latest db checkpoint from the leader SCM.
-    TransactionInfo transactionInfo =
-        leaderSCM.getScmHAManager().asSCMHADBTransactionBuffer()
-            .getLatestTrxInfo();
-    TermIndex leaderTermIndex =
-        TermIndex.valueOf(transactionInfo.getTerm(),
-            transactionInfo.getTransactionIndex());
-    long leaderSnaphsotIndex = leaderTermIndex.getIndex();
-    long leaderSnapshotTermIndex = leaderTermIndex.getTerm();
-
-    DBCheckpoint leaderDbCheckpoint =
-        leaderSCM.getScmMetadataStore().getStore().getCheckpoint(false);
-
-    // Start the inactive
+    // Start the inactive SCM. Install Snapshot will happen as part
+    // of setConfiguration() call to ratis leader and the follower will catch
+    // up
     cluster.startInactiveSCM(followerId);
 
     // The recently started  should be lagging behind the leader .
@@ -159,23 +135,7 @@ public class TestSCMInstallSnapshotWithHA {
         follower.getScmHAManager().getRatisServer().getSCMStateMachine()
             .getLastAppliedTermIndex().getIndex();
     assertTrue(
-        followerLastAppliedIndex < leaderSnaphsotIndex);
-
-    SCMHAManagerImpl scmhaManager =
-        (SCMHAManagerImpl) (follower.getScmHAManager());
-    // Install leader 's db checkpoint on the lagging .
-    scmhaManager.installCheckpoint(leaderNodeId, leaderDbCheckpoint);
-
-    SCMStateMachine followerStateMachine =
-        follower.getScmHAManager().getRatisServer().getSCMStateMachine();
-    // After the new checkpoint is installed, the follower
-    // lastAppliedIndex must >= the snapshot index of the checkpoint. It
-    // could be great than snapshot index if there is any conf entry from ratis.
-    followerLastAppliedIndex = followerStateMachine
-            .getLastAppliedTermIndex().getIndex();
-    assertTrue(followerLastAppliedIndex >= leaderSnaphsotIndex);
-    assertTrue(followerStateMachine
-        .getLastAppliedTermIndex().getTerm() >= leaderSnapshotTermIndex);
+        followerLastAppliedIndex >= 200);
 
     // Verify that the follower 's DB contains the transactions which were
     // made while it was inactive.
@@ -318,7 +278,7 @@ public class TestSCMInstallSnapshotWithHA {
         scm.getScmHAManager().getRatisServer().getSCMStateMachine();
     long logIndex = scm.getScmHAManager().getRatisServer().getSCMStateMachine()
         .getLastAppliedTermIndex().getIndex();
-    while (logIndex < targetLogIndex) {
+    while (logIndex <= targetLogIndex) {
       containers.add(scm.getContainerManager()
           .allocateContainer(
               new RatisReplicationConfig(ReplicationFactor.THREE),
diff --git a/pom.xml b/pom.xml
index f012350..2a38358 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,10 +79,10 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
     <declared.ozone.version>${ozone.version}</declared.ozone.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>2.0.0</ratis.version>
+    <ratis.version>2.1.0-43915d2-SNAPSHOT</ratis.version>
 
     <!-- Apache Ratis thirdparty version -->
-    <ratis.thirdparty.version>0.6.0</ratis.thirdparty.version>
+    <ratis.thirdparty.version>0.7.0-a398b19-SNAPSHOT</ratis.thirdparty.version>
 
     <distMgmtSnapshotsId>apache.snapshots.https</distMgmtSnapshotsId>
     <distMgmtSnapshotsName>Apache Development Snapshot Repository</distMgmtSnapshotsName>
@@ -183,7 +183,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
     <grpc-compile.version>1.33.0</grpc-compile.version>
     <os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
 
-    <netty.version>4.1.51.Final</netty.version>
+    <netty.version>4.1.63.Final</netty.version>
 
     <!-- define the Java language version used by the compiler -->
     <javac.version>1.8</javac.version>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org