You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2021/02/18 23:52:29 UTC
[ozone] branch HDDS-2823 updated: HDDS-4773. Add functionality to
transfer Rocks db checkpoint from leader to follower (#1870)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new 1cbd6e3 HDDS-4773. Add functionality to transfer Rocks db checkpoint from leader to follower (#1870)
1cbd6e3 is described below
commit 1cbd6e3c5229b398c98e4fe8fc5a39114283e3be
Author: bshashikant <sh...@apache.org>
AuthorDate: Fri Feb 19 05:22:15 2021 +0530
HDDS-4773. Add functionality to transfer Rocks db checkpoint from leader to follower (#1870)
---
.../java/org/apache/hadoop/hdds/HddsUtils.java | 9 ++
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 7 +-
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 4 +
.../apache/hadoop/hdds/utils/HddsServerUtil.java | 59 +++++++
.../dev-support/findbugsExcludeFile.xml | 3 +
hadoop-hdds/interface-client/pom.xml | 13 ++
.../src/main/proto/InterSCMProtocol.proto | 46 ++++++
.../hadoop/hdds/scm/ha/InterSCMGrpcClient.java | 172 +++++++++++++++++++++
.../hdds/scm/ha/InterSCMGrpcProtocolService.java | 85 ++++++++++
.../hadoop/hdds/scm/ha/InterSCMGrpcService.java | 66 ++++++++
.../hadoop/hdds/scm/ha/MockSCMHAManager.java | 5 +
.../hdds/scm/ha/SCMDBCheckpointProvider.java | 88 +++++++++++
.../hadoop/hdds/scm/ha/SCMGrpcOutputStream.java | 131 ++++++++++++++++
.../hadoop/hdds/scm/ha/SCMHAConfiguration.java | 41 +++++
.../apache/hadoop/hdds/scm/ha/SCMHAManager.java | 5 +
.../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 76 +++++++++
.../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java | 39 +++++
...CMHAManager.java => SCMSnapshotDownloader.java} | 37 ++---
.../hadoop/hdds/scm/ha/SCMSnapshotProvider.java | 143 +++++++++++++++++
.../hadoop/hdds/scm/TestSCMInstallSnapshot.java | 100 ++++++++++++
20 files changed, 1106 insertions(+), 23 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index ae8b00b..d8b6439 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds;
import javax.management.ObjectName;
+import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -490,6 +491,14 @@ public final class HddsUtils {
"Path should be a descendant of %s", ancestor);
}
+ public static File createDir(String dirPath) {
+ File dirFile = new File(dirPath);
+ if (!dirFile.mkdirs() && !dirFile.exists()) {
+ throw new IllegalArgumentException("Unable to create path: " + dirFile);
+ }
+ return dirFile;
+ }
+
/**
* Leverages the Configuration.getPassword method to attempt to get
* passwords from the CredentialProvider API before falling back to
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index b96ab64..17fb7a0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -195,6 +195,10 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_HTTPS_ADDRESS_KEY =
"ozone.scm.https-address";
+ public static final String OZONE_SCM_ADDRESS_KEY =
+ "ozone.scm.address";
+ public static final String OZONE_SCM_BIND_HOST_DEFAULT =
+ "0.0.0.0";
public static final String OZONE_SCM_HTTP_BIND_HOST_DEFAULT = "0.0.0.0";
public static final int OZONE_SCM_HTTP_BIND_PORT_DEFAULT = 9876;
public static final int OZONE_SCM_HTTPS_BIND_PORT_DEFAULT = 9877;
@@ -283,9 +287,6 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_NODE_ID_KEY =
"ozone.scm.node.id";
- public static final String OZONE_SCM_ADDRESS_KEY =
- "ozone.scm.address";
-
public static final int OZONE_SCM_DEFAULT_PORT =
OZONE_SCM_DATANODE_PORT_DEFAULT;
// The path where datanode ID is to be written to.
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 6e05136..1fd54e5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -125,6 +125,7 @@ public final class OzoneConsts {
public static final String PIPELINE_DB_SUFFIX = "pipeline.db";
public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
public static final String OM_DB_NAME = "om.db";
+ public static final String SCM_DB_NAME = "scm.db";
public static final String OM_DB_BACKUP_PREFIX = "om.db.backup.";
public static final String STORAGE_DIR_CHUNKS = "chunks";
@@ -398,7 +399,10 @@ public final class OzoneConsts {
public static final String DB_TRANSIENT_MARKER = "dbInconsistentMarker";
+ // TODO : rename this to OZONE_RATIS_SNAPSHOT_DIR and use it in both
+ // SCM and OM
public static final String OM_RATIS_SNAPSHOT_DIR = "snapshot";
+ public static final String SCM_RATIS_SNAPSHOT_DIR = "snapshot";
public static final long DEFAULT_OM_UPDATE_ID = -1L;
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index 13e08a1..6188262 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -18,15 +18,28 @@
package org.apache.hadoop.hdds.utils;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.google.protobuf.BlockingService;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorOutputStream;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -39,6 +52,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client;
@@ -487,4 +501,49 @@ public final class HddsServerUtil {
}
return metricsSystem;
}
+
+ /**
+ * Write DB Checkpoint to an output stream as a compressed file (tgz).
+ *
+ * @param checkpoint checkpoint file
+ * @param destination destination output stream.
+ * @throws IOException
+ */
+ public static void writeDBCheckpointToStream(DBCheckpoint checkpoint,
+ OutputStream destination)
+ throws IOException {
+ try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
+ .createCompressorOutputStream(CompressorStreamFactory.GZIP,
+ destination);
+ ArchiveOutputStream archiveOutputStream =
+ new TarArchiveOutputStream(gzippedOut);
+ Stream<Path> files =
+ Files.list(checkpoint.getCheckpointLocation())) {
+ for (Path path : files.collect(Collectors.toList())) {
+ if (path != null) {
+ Path fileName = path.getFileName();
+ if (fileName != null) {
+ includeFile(path.toFile(), fileName.toString(),
+ archiveOutputStream);
+ }
+ }
+ }
+ } catch (CompressorException e) {
+ throw new IOException(
+ "Can't compress the checkpoint: " +
+ checkpoint.getCheckpointLocation(), e);
+ }
+ }
+
+ private static void includeFile(File file, String entryName,
+ ArchiveOutputStream archiveOutputStream)
+ throws IOException {
+ ArchiveEntry archiveEntry =
+ archiveOutputStream.createArchiveEntry(file, entryName);
+ archiveOutputStream.putArchiveEntry(archiveEntry);
+ try (FileInputStream fis = new FileInputStream(file)) {
+ IOUtils.copy(fis, archiveOutputStream);
+ }
+ archiveOutputStream.closeArchiveEntry();
+ }
}
diff --git a/hadoop-hdds/interface-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/interface-client/dev-support/findbugsExcludeFile.xml
index ba54a4f..18cdb6c 100644
--- a/hadoop-hdds/interface-client/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdds/interface-client/dev-support/findbugsExcludeFile.xml
@@ -21,4 +21,7 @@
<Match>
<Package name="org.apache.hadoop.hdds.protocol.proto"/>
</Match>
+ <Match>
+ <Package name="org.apache.hadoop.hdds.protocol.scm.proto"/>
+ </Match>
</FindBugsFilter>
diff --git a/hadoop-hdds/interface-client/pom.xml b/hadoop-hdds/interface-client/pom.xml
index 31a3960..dccb84b 100644
--- a/hadoop-hdds/interface-client/pom.xml
+++ b/hadoop-hdds/interface-client/pom.xml
@@ -71,6 +71,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<protoSourceRoot>${basedir}/src/main/proto/</protoSourceRoot>
<includes>
<include>DatanodeClientProtocol.proto</include>
+ <include>InterSCMProtocol.proto</include>
</includes>
<outputDirectory>target/generated-sources/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
@@ -119,6 +120,18 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
value="org.apache.ratis.thirdparty.com.google.common"
dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto">
</replace>
+ <replace token="com.google.protobuf"
+ value="org.apache.ratis.thirdparty.com.google.protobuf"
+ dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/scm/proto">
+ </replace>
+ <replace token="io.grpc"
+ value="org.apache.ratis.thirdparty.io.grpc"
+ dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/scm/proto">
+ </replace>
+ <replace token="com.google.common"
+ value="org.apache.ratis.thirdparty.com.google.common"
+ dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/scm/proto">
+ </replace>
</tasks>
</configuration>
<goals>
diff --git a/hadoop-hdds/interface-client/src/main/proto/InterSCMProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/InterSCMProtocol.proto
new file mode 100644
index 0000000..8c726c6
--- /dev/null
+++ b/hadoop-hdds/interface-client/src/main/proto/InterSCMProtocol.proto
@@ -0,0 +1,46 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+syntax = "proto2";
+option java_package = "org.apache.hadoop.hdds.protocol.scm.proto";
+option java_outer_classname = "InterSCMProtocolProtos";
+option java_generate_equals_and_hash = true;
+
+
+message CopyDBCheckpointRequestProto {
+ required bool flush = 1;
+}
+
+message CopyDBCheckpointResponseProto {
+ required string clusterId = 1;
+ required uint64 len = 2;
+ required bool eof = 3;
+ required bytes data = 4;
+ required uint64 readOffset = 6;
+ optional int64 checksum = 7;
+}
+
+service InterSCMProtocolService {
+ // An inter SCM service to copy SCM DB checkpoint from leader to follower
+ rpc download (CopyDBCheckpointRequestProto) returns (stream CopyDBCheckpointResponseProto);
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
new file mode 100644
index 0000000..fc347e5
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos;
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointResponseProto;
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolServiceGrpc;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Grpc client to download a Rocks db checkpoint from leader node
+ * in SCM HA ring.
+ */
+public class InterSCMGrpcClient implements SCMSnapshotDownloader{
+ private static final Logger LOG =
+ LoggerFactory.getLogger(InterSCMGrpcClient.class);
+
+ private final ManagedChannel channel;
+
+ private final InterSCMProtocolServiceGrpc.InterSCMProtocolServiceStub
+ client;
+
+ private final long timeout;
+
+ public InterSCMGrpcClient(final String host, final ConfigurationSource conf) {
+ Preconditions.checkNotNull(conf);
+ int port = conf.getObject(SCMHAConfiguration.class).getGrpcBindPort();
+ timeout =
+ conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval();
+ NettyChannelBuilder channelBuilder =
+ NettyChannelBuilder.forAddress(host, port).usePlaintext()
+ .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
+ channel = channelBuilder.build();
+ client = InterSCMProtocolServiceGrpc.newStub(channel).
+ withDeadlineAfter(timeout, TimeUnit.SECONDS);
+ }
+
+
+ @Override
+ public CompletableFuture<Path> download(final Path outputPath) {
+ // By default on every checkpoint, the rocks db will be flushed
+ InterSCMProtocolProtos.CopyDBCheckpointRequestProto request =
+ InterSCMProtocolProtos.CopyDBCheckpointRequestProto.newBuilder()
+ .setFlush(true)
+ .build();
+ CompletableFuture<Path> response = new CompletableFuture<>();
+
+
+ client.download(request,
+ new StreamDownloader(response, outputPath));
+
+ return response;
+ }
+
+ public void shutdown() {
+ channel.shutdown();
+ try {
+ channel.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("failed to shutdown replication channel", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ shutdown();
+ }
+
+ /**
+ * gRPC stream observer to CompletableFuture adapter.
+ */
+ public static class StreamDownloader
+ implements StreamObserver<CopyDBCheckpointResponseProto> {
+
+ private final CompletableFuture<Path> response;
+ private final OutputStream stream;
+ private final Path outputPath;
+
+ public StreamDownloader(CompletableFuture<Path> response,
+ Path outputPath) {
+ this.response = response;
+ this.outputPath = outputPath;
+ try {
+ Preconditions.checkNotNull(outputPath, "Output path cannot be null");
+ stream = new FileOutputStream(outputPath.toFile());
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ "Output path can't be used: " + outputPath, e);
+ }
+ }
+
+ @Override
+ public void onNext(CopyDBCheckpointResponseProto checkPoint) {
+ try {
+ checkPoint.getData().writeTo(stream);
+ } catch (IOException e) {
+ onError(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ try {
+ LOG.error("Download of checkpoint {} was unsuccessful",
+ outputPath, throwable);
+ stream.close();
+ deleteOutputOnFailure();
+ response.completeExceptionally(throwable);
+ } catch (IOException e) {
+ LOG.error("Failed to close {}}",
+ outputPath, e);
+ response.completeExceptionally(e);
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ try {
+ stream.close();
+ LOG.info("Checkpoint is downloaded to {}", outputPath);
+ response.complete(outputPath);
+ } catch (IOException e) {
+ LOG.error("Downloaded checkpoint OK, but failed to close {}",
+ outputPath, e);
+ response.completeExceptionally(e);
+ }
+
+ }
+
+ private void deleteOutputOnFailure() {
+ try {
+ Files.delete(outputPath);
+ } catch (IOException ex) {
+ LOG.error("Failed to delete destination {} for " +
+ "unsuccessful download",
+ outputPath, ex);
+ }
+ }
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
new file mode 100644
index 0000000..9967530
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to serve SCM DB checkpoints available for SCM HA.
+ * Ideally should only be run on a ratis leader.
+ */
+public class InterSCMGrpcProtocolService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(InterSCMGrpcService.class);
+
+ private final int port;
+ private Server server;
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ public InterSCMGrpcProtocolService(final ConfigurationSource conf,
+ final StorageContainerManager scm) {
+ Preconditions.checkNotNull(conf);
+ this.port = conf.getObject(SCMHAConfiguration.class).getGrpcBindPort();
+
+ NettyServerBuilder nettyServerBuilder =
+ ((NettyServerBuilder) ServerBuilder.forPort(port))
+ .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
+
+ InterSCMGrpcService service = new InterSCMGrpcService(scm);
+ ServerBuilder b = nettyServerBuilder.addService(service);
+ Preconditions.checkNotNull(b);
+ server = nettyServerBuilder.build();
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public void start() throws IOException {
+ if (!isStarted.compareAndSet(false, true)) {
+ LOG.info("Ignore. already started.");
+ return;
+ } else {
+ server.start();
+ }
+ }
+
+ public void stop() {
+ if (isStarted.get()) {
+ server.shutdown();
+ try {
+ server.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("failed to shutdown XceiverServerGrpc", e);
+ }
+ isStarted.set(false);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcService.java
new file mode 100644
index 0000000..726f566
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointRequestProto;
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointResponseProto;
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolServiceGrpc;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to handle Rocks db Checkpointing.
+ */
+public class InterSCMGrpcService extends
+ InterSCMProtocolServiceGrpc.InterSCMProtocolServiceImplBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(InterSCMGrpcService.class);
+
+ private static final int BUFFER_SIZE = 1024 * 1024;
+
+ private final String clusterId;
+ private final SCMDBCheckpointProvider provider;
+
+ public InterSCMGrpcService(final StorageContainerManager scm) {
+ Preconditions.checkNotNull(scm);
+ this.clusterId = scm.getClusterId();
+ provider =
+ new SCMDBCheckpointProvider(scm.getScmMetadataStore().getStore());
+ }
+
+ @Override
+ public void download(CopyDBCheckpointRequestProto request,
+ StreamObserver<CopyDBCheckpointResponseProto> responseObserver) {
+ try {
+ SCMGrpcOutputStream outputStream =
+ new SCMGrpcOutputStream(responseObserver, clusterId, BUFFER_SIZE);
+ provider.writeDBCheckPointToSream(outputStream, request.getFlush());
+
+ } catch (IOException e) {
+ LOG.error("Error streaming SCM DB checkpoint", e);
+ responseObserver.onError(e);
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
index 116994e..55818c5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@ -97,6 +97,11 @@ public final class MockSCMHAManager implements SCMHAManager {
return transactionBuffer;
}
+ @Override
+ public SCMSnapshotProvider getSCMSnapshotProvider() {
+ return null;
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java
new file mode 100644
index 0000000..d588bb9
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.Instant;
+
+
+// TODO: define a generic interface for this
+public class SCMDBCheckpointProvider {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMDBCheckpointProvider.class);
+ private transient DBStore scmDbStore;;
+
+ public SCMDBCheckpointProvider(DBStore scmDbStore) {
+ this.scmDbStore = scmDbStore;
+ }
+
+ public void writeDBCheckPointToSream(OutputStream stream, boolean flush)
+ throws IOException {
+ LOG.info("Received request to obtain OM DB checkpoint snapshot");
+ if (scmDbStore == null) {
+ LOG.error("Unable to process checkpointing request. DB Store is null");
+ return;
+ }
+
+ DBCheckpoint checkpoint = null;
+ try {
+
+ checkpoint = scmDbStore.getCheckpoint(flush);
+ if (checkpoint == null || checkpoint.getCheckpointLocation() == null) {
+ throw new IOException("Unable to process metadata snapshot request. "
+ + "Checkpoint request returned null.");
+ }
+
+ Path file = checkpoint.getCheckpointLocation().getFileName();
+ if (file == null) {
+ return;
+ }
+
+ Instant start = Instant.now();
+ HddsServerUtil.writeDBCheckpointToStream(checkpoint, stream);
+ Instant end = Instant.now();
+
+ long duration = Duration.between(start, end).toMillis();
+ LOG.info("Time taken to write the checkpoint to response output " +
+ "stream: {} milliseconds", duration);
+
+ } catch (IOException ioe) {
+ LOG.error("Unable to process metadata snapshot request. ", ioe);
+ throw ioe;
+ } finally {
+ if (checkpoint != null) {
+ try {
+ checkpoint.cleanupCheckpoint();
+ } catch (IOException e) {
+ LOG.error("Error trying to clean checkpoint at {} .",
+ checkpoint.getCheckpointLocation().toString());
+ }
+ }
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMGrpcOutputStream.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMGrpcOutputStream.java
new file mode 100644
index 0000000..1194a52
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMGrpcOutputStream.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Stream to which the tar db checkpoint will be transferred over to the
+ * destination over grpc.
+ * TODO: Make it a generic utility to be used both during container replication
+ * as well as SCM checkpoint transfer
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Adapter from {@code OutputStream} to gRPC {@code StreamObserver}.
+ * Data is buffered in a limited buffer of the specified size.
+ */
+class SCMGrpcOutputStream extends OutputStream {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMGrpcOutputStream.class);
+
+ private final StreamObserver<InterSCMProtocolProtos.
+ CopyDBCheckpointResponseProto> responseObserver;
+
+ private final ByteString.Output buffer;
+
+ private final String clusterId;
+
+ private final int bufferSize;
+
+ private long writtenBytes;
+
+ SCMGrpcOutputStream(
+ StreamObserver<InterSCMProtocolProtos.
+ CopyDBCheckpointResponseProto> responseObserver,
+ String clusterId, int bufferSize) {
+ this.responseObserver = responseObserver;
+ this.clusterId = clusterId;
+ this.bufferSize = bufferSize;
+ buffer = ByteString.newOutput(bufferSize);
+ }
+
+ @Override public void write(int b) {
+ try {
+ buffer.write(b);
+ if (buffer.size() >= bufferSize) {
+ flushBuffer(false);
+ }
+ } catch (Exception ex) {
+ responseObserver.onError(ex);
+ }
+ }
+
+ @Override public void write(@Nonnull byte[] data, int offset, int length) {
+ if ((offset < 0) || (offset > data.length) || (length < 0) || (
+ (offset + length) > data.length) || ((offset + length) < 0)) {
+ throw new IndexOutOfBoundsException();
+ } else if (length == 0) {
+ return;
+ }
+
+ try {
+ if (buffer.size() >= bufferSize) {
+ flushBuffer(false);
+ }
+
+ int remaining = length;
+ int off = offset;
+ int len = Math.min(remaining, bufferSize - buffer.size());
+ while (remaining > 0) {
+ buffer.write(data, off, len);
+ if (buffer.size() >= bufferSize) {
+ flushBuffer(false);
+ }
+ off += len;
+ remaining -= len;
+ len = Math.min(bufferSize, remaining);
+ }
+ } catch (Exception ex) {
+ responseObserver.onError(ex);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flushBuffer(true);
+ LOG.info("Sent {} bytes for cluster {}", writtenBytes, clusterId);
+ responseObserver.onCompleted();
+ buffer.close();
+ }
+
+ private void flushBuffer(boolean eof) {
+ int length = buffer.size();
+ if (length > 0) {
+ ByteString data = buffer.toByteString();
+ LOG.debug("Sending {} bytes (of type {})", length,
+ data.getClass().getSimpleName());
+ InterSCMProtocolProtos.CopyDBCheckpointResponseProto response =
+ InterSCMProtocolProtos.CopyDBCheckpointResponseProto.newBuilder()
+ .setClusterId(clusterId).setData(data).setEof(eof)
+ .setReadOffset(writtenBytes).setLen(length).build();
+ responseObserver.onNext(response);
+ writtenBytes += length;
+ buffer.reset();
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
index 867e8d0..f093e45 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
@@ -177,10 +177,47 @@ public class SCMHAConfiguration {
)
private long ratisRoleCheckerInterval = 15 * 1000L;
+ @Config(key = "ratis.snapshot.dir",
+ type = ConfigType.STRING,
+ defaultValue = "",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The ratis snapshot dir location"
+ )
+ private String ratisSnapshotDir;
+
+ @Config(key = "grpc.bind.port",
+ type = ConfigType.INT,
+ defaultValue = "9899",
+ tags = {OZONE, SCM, HA, RATIS},
+ description = "Port used by SCM for Grpc Server."
+ )
+ // TODO: fix the default grpc port
+ private int grpcBindPort = 9899;
+
+ @Config(key = "grpc.deadline.interval",
+ type = ConfigType.TIME,
+ defaultValue = "30m",
+ tags = {OZONE, SCM, HA, RATIS},
+ description = "Deadline for SCM DB checkpoint interval."
+ )
+ private long grpcDeadlineInterval = 30 * 60 * 1000L;
+
+ public long getGrpcDeadlineInterval() {
+ return grpcDeadlineInterval;
+ }
+
+ public int getGrpcBindPort() {
+ return grpcBindPort;
+ }
+
public String getRatisStorageDir() {
return ratisStorageDir;
}
+ public String getRatisSnapshotDir() {
+ return ratisSnapshotDir;
+ }
+
public void setRatisStorageDir(String dir) {
this.ratisStorageDir = dir;
}
@@ -217,6 +254,10 @@ public class SCMHAConfiguration {
this.raftLogPurgeEnabled = enabled;
}
+ public void setGrpcBindPort(int port) {
+ this.grpcBindPort = port;
+ }
+
public int getRaftLogPurgeGap() {
return raftLogPurgeGap;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index c8b3ff7..4eb2eed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -35,6 +35,11 @@ public interface SCMHAManager {
SCMRatisServer getRatisServer();
/**
+ * Returns SCM snapshot provider.
+ */
+ SCMSnapshotProvider getSCMSnapshotProvider();
+
+ /**
* Returns DB transaction buffer.
*/
DBTransactionBuffer getDBTransactionBuffer();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 1880363..b49ba74 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.ha;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.ratis.server.protocol.TermIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +42,10 @@ public class SCMHAManagerImpl implements SCMHAManager {
private final SCMRatisServer ratisServer;
private final ConfigurationSource conf;
private final SCMDBTransactionBuffer transactionBuffer;
+ private final SCMSnapshotProvider scmSnapshotProvider;
+
+ // this should ideally be started only in a ratis leader
+ private final InterSCMGrpcProtocolService grpcServer;
/**
* Creates SCMHAManager instance.
@@ -51,6 +57,10 @@ public class SCMHAManagerImpl implements SCMHAManager {
new SCMDBTransactionBuffer(scm);
this.ratisServer = new SCMRatisServerImpl(
conf.getObject(SCMHAConfiguration.class), conf, scm, transactionBuffer);
+ this.scmSnapshotProvider = new SCMSnapshotProvider(conf,
+ scm.getSCMHANodeDetails().getPeerNodeDetails());
+ grpcServer = new InterSCMGrpcProtocolService(conf, scm);
+
}
/**
@@ -59,6 +69,7 @@ public class SCMHAManagerImpl implements SCMHAManager {
@Override
public void start() throws IOException {
ratisServer.start();
+ grpcServer.start();
}
public SCMRatisServer getRatisServer() {
@@ -70,11 +81,76 @@ public class SCMHAManagerImpl implements SCMHAManager {
return transactionBuffer;
}
+ @Override
+ public SCMSnapshotProvider getSCMSnapshotProvider() {
+ return scmSnapshotProvider;
+ }
+
+ /**
+ * Download and install latest checkpoint from leader OM.
+ *
+ * @param leaderId peerNodeID of the leader OM
+ * @return If checkpoint is installed successfully, return the
+ * corresponding termIndex. Otherwise, return null.
+ */
+ public TermIndex installSnapshotFromLeader(String leaderId) {
+ if(scmSnapshotProvider == null) {
+ LOG.error("OM Snapshot Provider is not configured as there are no peer " +
+ "nodes.");
+ return null;
+ }
+
+ DBCheckpoint omDBCheckpoint = getDBCheckpointFromLeader(leaderId);
+ LOG.info("Downloaded checkpoint from Leader {} to the location {}",
+ leaderId, omDBCheckpoint.getCheckpointLocation());
+
+ TermIndex termIndex = null;
+ try {
+ termIndex = installCheckpoint(leaderId, omDBCheckpoint);
+ } catch (Exception ex) {
+ LOG.error("Failed to install snapshot from Leader OM.", ex);
+ }
+ return termIndex;
+ }
+
+ /**
+ * Install checkpoint. If the checkpoints snapshot index is greater than
+ * SCM's last applied transaction index, then re-initialize the OM
+ * state via this checkpoint. Before re-initializing OM state, the OM Ratis
+ * server should be stopped so that no new transactions can be applied.
+ */
+ TermIndex installCheckpoint(String leaderId, DBCheckpoint omDBCheckpoint)
+ throws Exception {
+ // TODO : implement install checkpoint
+ return null;
+ }
+
+
+ /**
+ * Download the latest SCM DB checkpoint from the leader OM.
+ *
+ * @param leaderId OMNodeID of the leader OM node.
+ * @return latest DB checkpoint from leader OM.
+ */
+ private DBCheckpoint getDBCheckpointFromLeader(String leaderId) {
+ LOG.info("Downloading checkpoint from leader SCM {} and reloading state " +
+ "from the checkpoint.", leaderId);
+
+ try {
+ return scmSnapshotProvider.getSCMDBSnapshot(leaderId);
+ } catch (IOException e) {
+ LOG.error("Failed to download checkpoint from OM leader {}", leaderId, e);
+ }
+ return null;
+ }
+
+
/**
* {@inheritDoc}
*/
@Override
public void shutdown() throws IOException {
ratisServer.stop();
+ grpcServer.stop();
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
index 48946b4..b61023f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -18,16 +18,27 @@
package org.apache.hadoop.hdds.scm.ha;
+import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.nio.file.Paths;
import java.util.Collection;
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_DIR;
+
/**
* Utility class used by SCM HA.
*/
public final class SCMHAUtils {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SCMHAUtils.class);
private SCMHAUtils() {
// not used
}
@@ -62,4 +73,32 @@ public final class SCMHAUtils {
"suffix '" + suffix + "' should not already have '.' prepended.";
return key + "." + suffix;
}
+
+ /**
+ * Get the local directory where ratis logs will be stored.
+ */
+ public static String getSCMRatisDirectory(ConfigurationSource conf) {
+ String scmRatisDirectory =
+ conf.getObject(SCMHAConfiguration.class).getRatisStorageDir();
+
+ if (Strings.isNullOrEmpty(scmRatisDirectory)) {
+ scmRatisDirectory = ServerUtils.getDefaultRatisDirectory(conf);
+ }
+ return scmRatisDirectory;
+ }
+
+ public static String getSCMRatisSnapshotDirectory(ConfigurationSource conf) {
+ String snapshotDir =
+ conf.getObject(SCMHAConfiguration.class).getRatisStorageDir();
+
+ // If ratis snapshot directory is not set, fall back to ozone.metadata.dir.
+ if (Strings.isNullOrEmpty(snapshotDir)) {
+ LOG.warn("SCM snapshot dir is not configured. Falling back to {} config",
+ OZONE_METADATA_DIRS);
+ File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+ snapshotDir =
+ Paths.get(metaDirPath.getPath(), OM_RATIS_SNAPSHOT_DIR).toString();
+ }
+ return snapshotDir;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java
similarity index 61%
copy from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
copy to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java
index c8b3ff7..7d5d3eb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java
@@ -14,33 +14,30 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.hadoop.hdds.scm.ha;
import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
/**
- * SCMHAManager provides HA service for SCM.
+ * Contract to download a SCM Snapshot from remote server..
+ * <p>
+ *
+ * The underlying implementation is supposed to download SCM snapshot via
+ * any chosen protocol(for now its Grpc).
+ * images.
*/
-public interface SCMHAManager {
-
- /**
- * Starts HA service.
- */
- void start() throws IOException;
+public interface SCMSnapshotDownloader {
/**
- * Returns RatisServer instance associated with the SCM instance.
+ * Downloads the contents to the target file path.
+ *
+ * @param destination
+ * @return Future task for download progress
+ * @throws IOException
*/
- SCMRatisServer getRatisServer();
+ CompletableFuture<Path> download(Path destination) throws IOException;
- /**
- * Returns DB transaction buffer.
- */
- DBTransactionBuffer getDBTransactionBuffer();
-
- /**
- * Stops the HA service.
- */
- void shutdown() throws IOException;
-}
+ void close() throws Exception;
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java
new file mode 100644
index 0000000..c0e1db3
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SCMSnapshotProvider downloads the latest checkpoint from the
+ * leader SCM and loads the checkpoint into State Machine.
+ */
+public class SCMSnapshotProvider {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMSnapshotProvider.class);
+
+ private final File scmSnapshotDir;
+
+
+ private final ConfigurationSource conf;
+
+ private SCMSnapshotDownloader client;
+
+ private Map<String, SCMNodeDetails> peerNodesMap;
+
+ public SCMSnapshotProvider(ConfigurationSource conf,
+ List<SCMNodeDetails> peerNodes) {
+ LOG.info("Initializing SCM Snapshot Provider");
+ this.conf = conf;
+ // Create Ratis storage dir
+ String scmRatisDirectory = SCMHAUtils.getSCMRatisDirectory(conf);
+
+ if (scmRatisDirectory == null || scmRatisDirectory.isEmpty()) {
+ throw new IllegalArgumentException(HddsConfigKeys.OZONE_METADATA_DIRS +
+ " must be defined.");
+ }
+ HddsUtils.createDir(scmRatisDirectory);
+
+ // Create Ratis snapshot dir
+ scmSnapshotDir = HddsUtils.createDir(
+ SCMHAUtils.getSCMRatisSnapshotDirectory(conf));
+ if (peerNodes != null) {
+ this.peerNodesMap = new HashMap<>();
+ for (SCMNodeDetails peerNode : peerNodes) {
+ this.peerNodesMap.put(peerNode.getNodeId(), peerNode);
+ }
+ }
+ this.client = null;
+ }
+
+ @VisibleForTesting
+ public void setPeerNodesMap(Map<String, SCMNodeDetails> peerNodesMap) {
+ this.peerNodesMap = peerNodesMap;
+ }
+ /**
+ * Download the latest checkpoint from SCM Leader .
+ * @param leaderSCMNodeID leader SCM Node ID.
+ * @return the DB checkpoint (including the ratis snapshot index)
+ */
+ public DBCheckpoint getSCMDBSnapshot(String leaderSCMNodeID)
+ throws IOException {
+ String snapshotTime = Long.toString(System.currentTimeMillis());
+ String snapshotFileName =
+ OzoneConsts.SCM_DB_NAME + "-" + leaderSCMNodeID + "-" + snapshotTime;
+ String snapshotFilePath =
+ Paths.get(scmSnapshotDir.getAbsolutePath(), snapshotFileName).toFile()
+ .getAbsolutePath();
+ File targetFile = new File(snapshotFileName + ".tar.gz");
+
+ // the client instance will be initialized only when first install snapshot
+ // notification from ratis leader will be received.
+ if (client == null) {
+ client = new InterSCMGrpcClient(
+ peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(),
+ conf);
+ }
+ try {
+ client.download(targetFile.toPath()).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Rocks DB checkpoint downloading failed", e);
+ throw new IOException(e);
+ }
+
+
+ // Untar the checkpoint file.
+ Path untarredDbDir = Paths.get(snapshotFilePath);
+ FileUtil.unTar(targetFile, untarredDbDir.toFile());
+ FileUtils.deleteQuietly(targetFile);
+
+ LOG.info("Successfully downloaded latest checkpoint from leader SCM: {}",
+ leaderSCMNodeID);
+
+ RocksDBCheckpoint scmCheckpoint = new RocksDBCheckpoint(untarredDbDir);
+ return scmCheckpoint;
+ }
+
+ @VisibleForTesting
+ public File getScmSnapshotDir() {
+ return scmSnapshotDir;
+ }
+
+ public void stop() throws Exception {
+ if (client != null) {
+ client.close();
+ }
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
new file mode 100644
index 0000000..3768d95
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.ha.SCMHAConfiguration;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.ha.SCMSnapshotProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
+
+/**
+ * Class to test install snapshot feature for SCM HA.
+ */
+public class TestSCMInstallSnapshot {
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ conf = new OzoneConfiguration();
+ SCMHAConfiguration scmhaConfiguration = conf.getObject(
+ SCMHAConfiguration.class);
+ scmhaConfiguration.setRatisSnapshotThreshold(1L);
+ conf.setFromObject(scmhaConfiguration);
+ cluster = MiniOzoneCluster
+ .newBuilder(conf)
+ .setNumDatanodes(3)
+ .build();
+ cluster.waitForClusterToBeReady();
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testInstallSnapshot() throws Exception {
+ StorageContainerManager scm = cluster.getStorageContainerManager();
+ ContainerManagerV2 containerManager = scm.getContainerManager();
+ PipelineManager pipelineManager = scm.getPipelineManager();
+ Pipeline ratisPipeline1 = pipelineManager.getPipeline(
+ containerManager.allocateContainer(
+ RATIS, THREE, "Owner1").getPipelineID());
+ pipelineManager.openPipeline(ratisPipeline1.getId());
+ Pipeline ratisPipeline2 = pipelineManager.getPipeline(
+ containerManager.allocateContainer(
+ RATIS, ONE, "Owner2").getPipelineID());
+ pipelineManager.openPipeline(ratisPipeline2.getId());
+ SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder()
+ .setRpcAddress(new InetSocketAddress("0.0.0.0", 0)).setSCMNodeId("scm1")
+ .build();
+ Map<String, SCMNodeDetails> peerMap = new HashMap<>();
+ peerMap.put(scmNodeDetails.getNodeId(), scmNodeDetails);
+ SCMSnapshotProvider provider =
+ scm.getScmHAManager().getSCMSnapshotProvider();
+ provider.setPeerNodesMap(peerMap);
+ provider.getSCMDBSnapshot(scmNodeDetails.getNodeId());
+ final File[] files = FileUtil.listFiles(provider.getScmSnapshotDir());
+ Assert.assertEquals(1, files.length);
+ Assert.assertTrue(files[0].getName().startsWith(
+ OzoneConsts.SCM_DB_NAME + "-" + scmNodeDetails.getNodeId()));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org