You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2018/09/04 18:12:06 UTC
hadoop git commit: HDDS-75. Support for CopyContainer. Contributed by
Elek, Marton.
Repository: hadoop
Updated Branches:
refs/heads/trunk 9e96ac666 -> b9932162e
HDDS-75. Support for CopyContainer. Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b9932162
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b9932162
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b9932162
Branch: refs/heads/trunk
Commit: b9932162e9eb4acc9c790fc3c4938a5057fc1658
Parents: 9e96ac6
Author: Nanda kumar <na...@apache.org>
Authored: Tue Sep 4 23:41:50 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Tue Sep 4 23:41:50 2018 +0530
----------------------------------------------------------------------
.../apache/hadoop/ozone/OzoneConfigKeys.java | 3 +
.../main/proto/DatanodeContainerProtocol.proto | 12 +-
.../common/src/main/resources/ozone-default.xml | 10 +-
.../container/common/interfaces/Handler.java | 15 +-
.../statemachine/DatanodeStateMachine.java | 25 +--
.../ReplicateContainerCommandHandler.java | 124 ++++++++++++-
.../transport/server/XceiverServerGrpc.java | 12 +-
.../container/keyvalue/KeyValueContainer.java | 3 +-
.../container/keyvalue/KeyValueHandler.java | 100 ++++++-----
.../container/ozoneimpl/OzoneContainer.java | 9 +-
.../replication/ContainerDownloader.java | 40 +++++
.../replication/ContainerReplicationSource.java | 49 ++++++
.../replication/ContainerStreamingOutput.java | 45 +++++
.../replication/GrpcReplicationClient.java | 169 ++++++++++++++++++
.../replication/GrpcReplicationService.java | 130 ++++++++++++++
.../OnDemandContainerReplicationSource.java | 76 ++++++++
.../replication/SimpleContainerDownloader.java | 121 +++++++++++++
.../container/replication/package-info.java | 21 +++
.../TestReplicateContainerCommandHandler.java | 146 ++++++++++++++++
.../commandhandler/package-info.java | 22 +++
.../container/TestContainerReplication.java | 175 +++++++++++++++++++
.../TestReplicateContainerHandler.java | 70 --------
.../container/ozoneimpl/TestOzoneContainer.java | 4 +-
23 files changed, 1244 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 8f53da5..0f2b108 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -265,6 +265,9 @@ public final class OzoneConfigKeys {
public static final long
HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_DEAFULT = 10000L;
+ public static final String OZONE_CONTAINER_COPY_WORKDIR =
+ "hdds.datanode.replication.work.dir";
+
/**
* There is no need to instantiate this class.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 930f314..ba0d2d4 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -430,16 +430,22 @@ message CopyContainerRequestProto {
}
message CopyContainerResponseProto {
- required string archiveName = 1;
+ required int64 containerID = 1;
required uint64 readOffset = 2;
required uint64 len = 3;
required bool eof = 4;
- repeated bytes data = 5;
+ required bytes data = 5;
optional int64 checksum = 6;
}
service XceiverClientProtocolService {
// A client-to-datanode RPC to send container commands
rpc send(stream ContainerCommandRequestProto) returns
- (stream ContainerCommandResponseProto) {}
+ (stream ContainerCommandResponseProto) {};
+
+}
+
+service IntraDatanodeProtocolService {
+ // An intradatanode service to copy the raw containerdata betwen nodes
+ rpc download (CopyContainerRequestProto) returns (stream CopyContainerResponseProto);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a9fd10b..ca3da41 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1124,4 +1124,12 @@
on. Right now, we have SSD and DISK as profile options.</description>
</property>
-</configuration>
\ No newline at end of file
+ <property>
+ <name>hdds.datanode.replication.work.dir</name>
+ <tag>DATANODE</tag>
+ <description>Temporary which is used during the container replication
+ betweeen datanodes. Should have enough space to store multiple container
+ (in compressed format), but doesn't require fast io access such as SSD.
+ </description>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 15eed4f..53e1c68 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -19,6 +19,9 @@
package org.apache.hadoop.ozone.container.common.interfaces;
+import java.io.FileInputStream;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
@@ -30,7 +33,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
-
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
/**
* Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
@@ -67,6 +70,16 @@ public abstract class Handler {
public abstract ContainerCommandResponseProto handle(
ContainerCommandRequestProto msg, Container container);
+ /**
+ * Import container data from a raw input stream.
+ */
+ public abstract Container importContainer(
+ long containerID,
+ long maxSize,
+ FileInputStream rawContainerStream,
+ TarContainerPacker packer)
+ throws IOException;
+
public void setScmID(String scmId) {
this.scmID = scmId;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 0a23912..875d063 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -16,12 +16,17 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -39,15 +44,12 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* State Machine Class.
*/
@@ -87,13 +89,14 @@ public class DatanodeStateMachine implements Closeable {
new OzoneConfiguration(conf), context);
nextHB = new AtomicLong(Time.monotonicNow());
- // When we add new handlers just adding a new handler here should do the
+ // When we add new handlers just adding a new handler here should do the
// trick.
commandDispatcher = CommandDispatcher.newBuilder()
.addHandler(new CloseContainerCommandHandler())
.addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(),
conf))
- .addHandler(new ReplicateContainerCommandHandler())
+ .addHandler(new ReplicateContainerCommandHandler(conf,
+ container.getContainerSet(), container.getDispatcher()))
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index fe1d4e8..d1895a8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -16,14 +16,32 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+import java.io.FileInputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
+import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
@@ -33,22 +51,120 @@ import org.slf4j.LoggerFactory;
* Command handler to copy containers from sources.
*/
public class ReplicateContainerCommandHandler implements CommandHandler {
+
static final Logger LOG =
LoggerFactory.getLogger(ReplicateContainerCommandHandler.class);
+ private ContainerDispatcher containerDispatcher;
+
private int invocationCount;
private long totalTime;
- private boolean cmdExecuted;
+
+ private ContainerDownloader downloader;
+
+ private Configuration conf;
+
+ private TarContainerPacker packer = new TarContainerPacker();
+
+ private ContainerSet containerSet;
+
+ private Lock lock = new ReentrantLock();
+
+ public ReplicateContainerCommandHandler(
+ Configuration conf,
+ ContainerSet containerSet,
+ ContainerDispatcher containerDispatcher,
+ ContainerDownloader downloader) {
+ this.conf = conf;
+ this.containerSet = containerSet;
+ this.downloader = downloader;
+ this.containerDispatcher = containerDispatcher;
+ }
+
+ public ReplicateContainerCommandHandler(
+ Configuration conf,
+ ContainerSet containerSet,
+ ContainerDispatcher containerDispatcher) {
+ this(conf, containerSet, containerDispatcher,
+ new SimpleContainerDownloader(conf));
+ }
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
- LOG.warn("Replicate command is not yet handled");
+
+ ReplicateContainerCommand replicateCommand =
+ (ReplicateContainerCommand) command;
try {
- cmdExecuted = true;
+
+ long containerID = replicateCommand.getContainerID();
+ LOG.info("Starting replication of container {} from {}", containerID,
+ replicateCommand.getSourceDatanodes());
+ CompletableFuture<Path> tempTarFile = downloader
+ .getContainerDataFromReplicas(containerID,
+ replicateCommand.getSourceDatanodes());
+
+ CompletableFuture<Void> result =
+ tempTarFile.thenAccept(path -> {
+ LOG.info("Container {} is downloaded, starting to import.",
+ containerID);
+ importContainer(containerID, path);
+ });
+
+ result.whenComplete((aVoid, throwable) -> {
+ if (throwable != null) {
+ LOG.error("Container replication was unsuccessful .", throwable);
+ } else {
+ LOG.info("Container {} is replicated successfully", containerID);
+ }
+ });
+ } finally {
+ updateCommandStatus(context, command, true, LOG);
+
+ }
+ }
+
+ protected void importContainer(long containerID, Path tarFilePath) {
+ lock.lock();
+ try {
+ ContainerData originalContainerData;
+ try (FileInputStream tempContainerTarStream = new FileInputStream(
+ tarFilePath.toFile())) {
+ byte[] containerDescriptorYaml =
+ packer.unpackContainerDescriptor(tempContainerTarStream);
+ originalContainerData = ContainerDataYaml.readContainer(
+ containerDescriptorYaml);
+ }
+
+ try (FileInputStream tempContainerTarStream = new FileInputStream(
+ tarFilePath.toFile())) {
+
+ Handler handler = containerDispatcher.getHandler(
+ originalContainerData.getContainerType());
+
+ Container container = handler.importContainer(containerID,
+ originalContainerData.getMaxSize(),
+ tempContainerTarStream,
+ packer);
+
+ containerSet.addContainer(container);
+ }
+
+ } catch (Exception e) {
+ LOG.error(
+ "Can't import the downloaded container data id=" + containerID,
+ e);
+ try {
+ Files.delete(tarFilePath);
+ } catch (Exception ex) {
+ LOG.error(
+ "Container import is failed and the downloaded file can't be "
+ + "deleted: "
+ + tarFilePath.toAbsolutePath().toString());
+ }
} finally {
- updateCommandStatus(context, command, cmdExecuted, LOG);
+ lock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 4dc232d..4a90144 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+
+import org.apache.ratis.shaded.io.grpc.BindableService;
import org.apache.ratis.shaded.io.grpc.Server;
import org.apache.ratis.shaded.io.grpc.ServerBuilder;
import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
@@ -54,7 +56,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
* @param conf - Configuration
*/
public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
- ContainerDispatcher dispatcher) {
+ ContainerDispatcher dispatcher, BindableService... additionalServices) {
Preconditions.checkNotNull(conf);
this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
@@ -80,6 +82,14 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
.addService(new GrpcXceiverService(dispatcher))
.build();
+ NettyServerBuilder nettyServerBuilder =
+ ((NettyServerBuilder) ServerBuilder.forPort(port))
+ .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+ .addService(new GrpcXceiverService(dispatcher));
+ for (BindableService service : additionalServices) {
+ nettyServerBuilder.addService(service);
+ }
+ server = nettyServerBuilder.build();
storageContainer = dispatcher;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index f60c5e9..b893a38 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -35,7 +35,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.io.nativeio.NativeIO;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index e254833..5acecb4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -18,9 +18,15 @@
package org.apache.hadoop.ozone.container.keyvalue;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.BlockID;
@@ -35,80 +41,72 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetSmallFileRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .KeyValue;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.PutSmallFileRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Type;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
-import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
-import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume
.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
-import org.apache.hadoop.ozone.container.keyvalue.statemachine
- .background.BlockDeletingService;
+import org.apache.hadoop.ozone.container.keyvalue.statemachine.background
+ .BlockDeletingService;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.ReflectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.CONTAINER_INTERNAL_ERROR;
+ .Result.BLOCK_NOT_COMMITTED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CLOSED_CONTAINER_IO;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .Result.CONTAINER_INTERNAL_ERROR;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.DELETE_ON_OPEN_CONTAINER;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.IO_EXCEPTION;
+ .Result.GET_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.INVALID_CONTAINER_STATE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.GET_SMALL_FILE_ERROR;
+ .Result.IO_EXCEPTION;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.PUT_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .Result.BLOCK_NOT_COMMITTED;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Stage;
-import static org.apache.hadoop.ozone
- .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static org.apache.hadoop.ozone
- .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone
- .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
-import static org.apache.hadoop.ozone
- .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.hdds.HddsConfigKeys
- .HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Handler for KeyValue Container type.
@@ -831,4 +829,22 @@ public class KeyValueHandler extends Handler {
throw new StorageContainerException(msg, result);
}
}
+
+ public Container importContainer(long containerID, long maxSize,
+ FileInputStream rawContainerStream,
+ TarContainerPacker packer)
+ throws IOException {
+
+ KeyValueContainerData containerData =
+ new KeyValueContainerData(containerID,
+ maxSize);
+
+ KeyValueContainer container = new KeyValueContainer(containerData,
+ conf);
+
+ populateContainerPathFields(container, maxSize);
+ container.importContainerData(rawContainerStream, packer);
+ return container;
+
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 812777b..b1bf381 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -35,6 +35,9 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverSe
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
+import org.apache.hadoop.ozone.container.replication
+ .OnDemandContainerReplicationSource;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +82,7 @@ public class OzoneContainer {
context);
server = new XceiverServerSpi[]{
new XceiverServerGrpc(datanodeDetails, this.config, this
- .hddsDispatcher),
+ .hddsDispatcher, createReplicationService()),
XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this
.config, hddsDispatcher)
};
@@ -87,6 +90,10 @@ public class OzoneContainer {
}
+ private GrpcReplicationService createReplicationService() {
+ return new GrpcReplicationService(
+ new OnDemandContainerReplicationSource(containerSet));
+ }
/**
* Build's container map.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
new file mode 100644
index 0000000..9511241
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.Closeable;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+/**
+ * Service to download container data from other datanodes.
+ * <p>
+ * The implementation of this interface should copy the raw container data in
+ * compressed form to working directory.
+ * <p>
+ * A smart implementation would use multiple sources to do parallel download.
+ */
+public interface ContainerDownloader extends Closeable {
+
+ CompletableFuture<Path> getContainerDataFromReplicas(long containerId,
+ List<DatanodeDetails> sources);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
new file mode 100644
index 0000000..69582f7
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Contract to prepare provide the container in binary form..
+ * <p>
+ * Prepare will be called when container is closed. An implementation could
+ * precache any binary representation of a container and store the pre packede
+ * images.
+ */
+public interface ContainerReplicationSource {
+
+ /**
+ * Prepare for the replication.
+ *
+ * @param containerId The name of the container the package.
+ */
+ void prepare(long containerId);
+
+ /**
+ * Copy the container data to an output stream.
+ *
+ * @param containerId Container to replicate
+ * @param destination The destination stream to copy all the container data.
+ * @throws IOException
+ */
+ void copyData(long containerId, OutputStream destination)
+ throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java
new file mode 100644
index 0000000..f7fd8a4
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * JAX-RS streaming output to return the binary container data.
+ */
+public class ContainerStreamingOutput implements StreamingOutput {
+
+ private long containerId;
+
+ private ContainerReplicationSource containerReplicationSource;
+
+ public ContainerStreamingOutput(long containerId,
+ ContainerReplicationSource containerReplicationSource) {
+ this.containerId = containerId;
+ this.containerReplicationSource = containerReplicationSource;
+ }
+
+ @Override
+ public void write(OutputStream outputStream)
+ throws IOException, WebApplicationException {
+ containerReplicationSource.copyData(containerId, outputStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
new file mode 100644
index 0000000..91d098f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.BufferedOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .CopyContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .CopyContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+ .IntraDatanodeProtocolServiceGrpc;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+ .IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+
+import com.google.common.base.Preconditions;
+import org.apache.ratis.shaded.io.grpc.ManagedChannel;
+import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client to read container data from Grpc.
+ */
+public class GrpcReplicationClient {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GrpcReplicationClient.class);
+
+ private final ManagedChannel channel;
+
+ private final IntraDatanodeProtocolServiceStub client;
+
+ private final Path workingDirectory;
+
+ public GrpcReplicationClient(String host,
+ int port, Path workingDir) {
+
+ channel = NettyChannelBuilder.forAddress(host, port)
+ .usePlaintext()
+ .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+ .build();
+ client = IntraDatanodeProtocolServiceGrpc.newStub(channel);
+ this.workingDirectory = workingDir;
+
+ }
+
+ public CompletableFuture<Path> download(long containerId) {
+ CopyContainerRequestProto request =
+ CopyContainerRequestProto.newBuilder()
+ .setContainerID(containerId)
+ .setLen(-1)
+ .setReadOffset(0)
+ .build();
+
+ CompletableFuture<Path> response = new CompletableFuture<>();
+
+ Path destinationPath =
+ getWorkingDirectory().resolve("container-" + containerId + ".tar.gz");
+
+ client.download(request,
+ new StreamDownloader(containerId, response, destinationPath));
+ return response;
+ }
+
+ private Path getWorkingDirectory() {
+ return workingDirectory;
+ }
+
+ public void shutdown() {
+ channel.shutdown();
+ }
+
+ /**
+ * Grpc stream observer to ComletableFuture adapter.
+ */
+ public static class StreamDownloader
+ implements StreamObserver<CopyContainerResponseProto> {
+
+ private final CompletableFuture<Path> response;
+
+ private final long containerId;
+
+ private BufferedOutputStream stream;
+
+ private Path outputPath;
+
+ public StreamDownloader(long containerId, CompletableFuture<Path> response,
+ Path outputPath) {
+ this.response = response;
+ this.containerId = containerId;
+ this.outputPath = outputPath;
+ try {
+ outputPath = Preconditions.checkNotNull(outputPath);
+ Path parentPath = Preconditions.checkNotNull(outputPath.getParent());
+ Files.createDirectories(parentPath);
+ stream =
+ new BufferedOutputStream(new FileOutputStream(outputPath.toFile()));
+ } catch (IOException e) {
+ throw new RuntimeException("OutputPath can't be used: " + outputPath,
+ e);
+ }
+
+ }
+
+ @Override
+ public void onNext(CopyContainerResponseProto chunk) {
+ try {
+ stream.write(chunk.getData().toByteArray());
+ } catch (IOException e) {
+ response.completeExceptionally(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ try {
+ stream.close();
+ LOG.error("Container download was unsuccessfull", throwable);
+ try {
+ Files.delete(outputPath);
+ } catch (IOException ex) {
+ LOG.error(
+ "Error happened during the download but can't delete the "
+ + "temporary destination.", ex);
+ }
+ response.completeExceptionally(throwable);
+ } catch (IOException e) {
+ response.completeExceptionally(e);
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ try {
+ stream.close();
+ response.complete(outputPath);
+ LOG.info("Container is downloaded to {}", outputPath);
+ } catch (IOException e) {
+ response.completeExceptionally(e);
+ }
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
new file mode 100644
index 0000000..d8f696f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .CopyContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .CopyContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+ .IntraDatanodeProtocolServiceGrpc;
+
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to make containers available for replication.
+ */
+public class GrpcReplicationService extends
+ IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceImplBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GrpcReplicationService.class);
+
+ private final ContainerReplicationSource containerReplicationSource;
+
+ public GrpcReplicationService(
+ ContainerReplicationSource containerReplicationSource) {
+ this.containerReplicationSource = containerReplicationSource;
+ }
+
+ @Override
+ public void download(CopyContainerRequestProto request,
+ StreamObserver<CopyContainerResponseProto> responseObserver) {
+ LOG.info("Streaming container data ({}) to other datanode",
+ request.getContainerID());
+ try {
+ GrpcOutputStream outputStream =
+ new GrpcOutputStream(responseObserver, request.getContainerID());
+ containerReplicationSource
+ .copyData(request.getContainerID(), outputStream);
+
+ } catch (IOException e) {
+ LOG.error("Can't stream the container data", e);
+ responseObserver.onError(e);
+ }
+ }
+
+ private static class GrpcOutputStream extends OutputStream
+ implements Closeable {
+
+ private static final int BUFFER_SIZE_IN_BYTES = 1024 * 1024;
+
+ private final StreamObserver<CopyContainerResponseProto> responseObserver;
+
+ private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+ private long containerId;
+
+ private int readOffset = 0;
+
+ private int writtenBytes;
+
+ GrpcOutputStream(
+ StreamObserver<CopyContainerResponseProto> responseObserver,
+ long containerId) {
+ this.responseObserver = responseObserver;
+ this.containerId = containerId;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ try {
+ buffer.write(b);
+ if (buffer.size() > BUFFER_SIZE_IN_BYTES) {
+ flushBuffer(false);
+ }
+ } catch (Exception ex) {
+ responseObserver.onError(ex);
+ }
+ }
+
+ private void flushBuffer(boolean eof) {
+ if (buffer.size() > 0) {
+ CopyContainerResponseProto response =
+ CopyContainerResponseProto.newBuilder()
+ .setContainerID(containerId)
+ .setData(ByteString.copyFrom(buffer.toByteArray()))
+ .setEof(eof)
+ .setReadOffset(readOffset)
+ .setLen(buffer.size())
+ .build();
+ responseObserver.onNext(response);
+ readOffset += buffer.size();
+ writtenBytes += buffer.size();
+ buffer.reset();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flushBuffer(true);
+ LOG.info("{} bytes written to the rpc stream from container {}",
+ writtenBytes, containerId);
+ responseObserver.onCompleted();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
new file mode 100644
index 0000000..d557b54
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A naive implementation of the replication source which creates a tar file
+ * on-demand without pre-create the compressed archives.
+ */
+public class OnDemandContainerReplicationSource
+ implements ContainerReplicationSource {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContainerReplicationSource.class);
+
+ private ContainerSet containerSet;
+
+ private ContainerPacker packer = new TarContainerPacker();
+
+ public OnDemandContainerReplicationSource(
+ ContainerSet containerSet) {
+ this.containerSet = containerSet;
+ }
+
+ @Override
+ public void prepare(long containerId) {
+
+ }
+
+ @Override
+ public void copyData(long containerId, OutputStream destination)
+ throws IOException {
+
+ Container container = containerSet.getContainer(containerId);
+
+ Preconditions
+ .checkNotNull(container, "Container is not found " + containerId);
+
+ switch (container.getContainerType()) {
+ case KeyValueContainer:
+ packer.pack(container,
+ destination);
+ break;
+ default:
+ LOG.warn("Container type " + container.getContainerType()
+ + " is not replicable as no compression algorithm for that.");
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
new file mode 100644
index 0000000..a461a98
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple ContainerDownloaderImplementation to download the missing container
+ * from the first available datanode.
+ * <p>
+ * This is not the most effective implementation as it uses only one source
+ * for he container download.
+ */
+public class SimpleContainerDownloader implements ContainerDownloader {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SimpleContainerDownloader.class);
+
+ private final Path workingDirectory;
+
+ private ExecutorService executor;
+
+ public SimpleContainerDownloader(Configuration conf) {
+
+ String workDirString =
+ conf.get(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR);
+
+ if (workDirString == null) {
+ workingDirectory = Paths.get(System.getProperty("java.io.tmpdir"))
+ .resolve("container-copy");
+ } else {
+ workingDirectory = Paths.get(workDirString);
+ }
+
+ ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Container downloader thread - %d").build();
+ executor = Executors.newSingleThreadExecutor(build);
+ LOG.info("Starting container downloader service to copy "
+ + "containers to replicate.");
+ }
+
+ @Override
+ public CompletableFuture<Path> getContainerDataFromReplicas(long containerId,
+ List<DatanodeDetails> sourceDatanodes) {
+
+ CompletableFuture<Path> result = null;
+ for (DatanodeDetails datanode : sourceDatanodes) {
+ try {
+
+ if (result == null) {
+ GrpcReplicationClient grpcReplicationClient =
+ new GrpcReplicationClient(datanode.getIpAddress(),
+ datanode.getPort(Name.STANDALONE).getValue(),
+ workingDirectory);
+ result = grpcReplicationClient.download(containerId);
+ } else {
+ result = result.thenApply(CompletableFuture::completedFuture)
+ .exceptionally(t -> {
+ LOG.error("Error on replicating container: " + containerId, t);
+ GrpcReplicationClient grpcReplicationClient =
+ new GrpcReplicationClient(datanode.getIpAddress(),
+ datanode.getPort(Name.STANDALONE).getValue(),
+ workingDirectory);
+ return grpcReplicationClient.download(containerId);
+ }).thenCompose(Function.identity());
+ }
+ } catch (Exception ex) {
+ LOG.error(String.format(
+ "Container %s download from datanode %s was unsuccessful. "
+ + "Trying the next datanode", containerId, datanode), ex);
+ }
+
+ }
+ return result;
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("Can't stop container downloader gracefully", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
new file mode 100644
index 0000000..38a853c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+/**
+ Classes to replicate container data between datanodes.
+**/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
new file mode 100644
index 0000000..6a14d33
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.TestGenericTestUtils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test replication command handler.
+ */
+public class TestReplicateContainerCommandHandler {
+
+ private static final String EXCEPTION_MESSAGE = "Oh my god";
+ private ReplicateContainerCommandHandler handler;
+ private StubDownloader downloader;
+ private ReplicateContainerCommand command;
+ private List<Long> importedContainerIds;
+
+ @Before
+ public void init() {
+ importedContainerIds = new ArrayList<>();
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ ContainerSet containerSet = Mockito.mock(ContainerSet.class);
+ ContainerDispatcher containerDispatcher =
+ Mockito.mock(ContainerDispatcher.class);
+
+ downloader = new StubDownloader();
+
+ handler = new ReplicateContainerCommandHandler(conf, containerSet,
+ containerDispatcher, downloader) {
+ @Override
+ protected void importContainer(long containerID, Path tarFilePath) {
+ importedContainerIds.add(containerID);
+ }
+ };
+
+ //the command
+ ArrayList<DatanodeDetails> datanodeDetails = new ArrayList<>();
+ datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
+ datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
+
+ command = new ReplicateContainerCommand(1L, datanodeDetails);
+ }
+
+ @Test
+ public void handle() throws TimeoutException, InterruptedException {
+ //GIVEN
+
+ //WHEN
+ handler.handle(command, null, Mockito.mock(StateContext.class), null);
+
+ TestGenericTestUtils
+ .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
+
+ Assert.assertNotNull(downloader.futureByContainers.get(1L));
+ downloader.futureByContainers.get(1L).complete(Paths.get("/tmp/test"));
+
+ TestGenericTestUtils
+ .waitFor(() -> importedContainerIds.size() == 1, 100, 2000);
+ }
+
+ @Test
+ public void handleWithErrors() throws TimeoutException, InterruptedException {
+ //GIVEN
+ GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+ .captureLogs(ReplicateContainerCommandHandler.LOG);
+
+ //WHEN
+ handler.handle(command, null, Mockito.mock(StateContext.class), null);
+
+ //THEN
+
+ TestGenericTestUtils
+ .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
+
+ Assert.assertNotNull(downloader.futureByContainers.get(1L));
+ downloader.futureByContainers.get(1L)
+ .completeExceptionally(new IllegalArgumentException(
+ EXCEPTION_MESSAGE));
+
+ TestGenericTestUtils
+ .waitFor(() -> {
+ String output = logCapturer.getOutput();
+ return output.contains("unsuccessful") && output
+ .contains(EXCEPTION_MESSAGE); },
+ 100,
+ 2000);
+ }
+
+ private static class StubDownloader implements ContainerDownloader {
+
+ private Map<Long, CompletableFuture<Path>> futureByContainers =
+ new HashMap<>();
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public CompletableFuture<Path> getContainerDataFromReplicas(
+ long containerId, List<DatanodeDetails> sources) {
+ CompletableFuture<Path> future = new CompletableFuture<>();
+ futureByContainers.put(containerId, future);
+ return future;
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
new file mode 100644
index 0000000..05ac76d
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Tests for command handlers.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
new file mode 100644
index 0000000..7391b25
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .DatanodeBlockID;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+
+import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer
+ .writeChunkForContainer;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+/**
+ * Tests ozone containers replication.
+ */
+public class TestContainerReplication {
+ /**
+ * Set the timeout for every test.
+ */
+ @Rule
+ public Timeout testTimeout = new Timeout(300000);
+
+ @Test
+ public void testContainerReplication() throws Exception {
+ //GIVEN
+ OzoneConfiguration conf = newOzoneConfiguration();
+
+ long containerId = 1L;
+
+ conf.setSocketAddr("hdls.datanode.http-address",
+ new InetSocketAddress("0.0.0.0", 0));
+
+ MiniOzoneCluster cluster =
+ MiniOzoneCluster.newBuilder(conf).setNumDatanodes(2)
+ .setRandomContainerPort(true).build();
+ cluster.waitForClusterToBeReady();
+
+ HddsDatanodeService firstDatanode = cluster.getHddsDatanodes().get(0);
+
+ //copy from the first datanode
+ List<DatanodeDetails> sourceDatanodes = new ArrayList<>();
+ sourceDatanodes.add(firstDatanode.getDatanodeDetails());
+
+ Pipeline sourcePipelines =
+ ContainerTestHelper.createPipeline(sourceDatanodes);
+
+ //create a new client
+ XceiverClientSpi client = new XceiverClientGrpc(sourcePipelines, conf);
+ client.connect();
+
+ //New container for testing
+ TestOzoneContainer.createContainerForTesting(client, containerId);
+
+ ContainerCommandRequestProto requestProto =
+ writeChunkForContainer(client, containerId, 1024);
+
+ DatanodeBlockID blockID = requestProto.getWriteChunk().getBlockID();
+
+ // Put Key to the test container
+ ContainerCommandRequestProto putKeyRequest = ContainerTestHelper
+ .getPutKeyRequest(sourcePipelines, requestProto.getWriteChunk());
+
+ ContainerProtos.KeyData keyData = putKeyRequest.getPutKey().getKeyData();
+
+ ContainerCommandResponseProto response = client.sendCommand(putKeyRequest);
+
+ Assert.assertNotNull(response);
+ Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+ Assert.assertTrue(putKeyRequest.getTraceID().equals(response.getTraceID()));
+
+ HddsDatanodeService destinationDatanode =
+ chooseDatanodeWithoutContainer(sourcePipelines,
+ cluster.getHddsDatanodes());
+
+ //WHEN: send the order to replicate the container
+ cluster.getStorageContainerManager().getScmNodeManager()
+ .addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(),
+ new ReplicateContainerCommand(containerId,
+ sourcePipelines.getMachines()));
+
+ Thread.sleep(3000);
+
+ OzoneContainer ozoneContainer =
+ destinationDatanode.getDatanodeStateMachine().getContainer();
+
+
+
+ Container container =
+ ozoneContainer
+ .getContainerSet().getContainer(containerId);
+
+ Assert.assertNotNull(
+ "Container is not replicated to the destination datanode",
+ container);
+
+ Assert.assertNotNull(
+ "ContainerData of the replicated container is null",
+ container.getContainerData());
+
+ long keyCount = ((KeyValueContainerData) container.getContainerData())
+ .getKeyCount();
+
+ KeyValueHandler handler = (KeyValueHandler) ozoneContainer.getDispatcher()
+ .getHandler(ContainerType.KeyValueContainer);
+
+ KeyData key = handler.getKeyManager()
+ .getKey(container, BlockID.getFromProtobuf(blockID));
+
+ Assert.assertNotNull(key);
+ Assert.assertEquals(1, key.getChunks().size());
+ Assert.assertEquals(requestProto.getWriteChunk().getChunkData(),
+ key.getChunks().get(0));
+
+ }
+
+ private HddsDatanodeService chooseDatanodeWithoutContainer(Pipeline pipeline,
+ List<HddsDatanodeService> dataNodes) {
+ for (HddsDatanodeService datanode : dataNodes) {
+ if (!pipeline.getMachines().contains(datanode.getDatanodeDetails())) {
+ return datanode;
+ }
+ }
+ throw new AssertionError("No datanode outside of the pipeline");
+ }
+
+ static OzoneConfiguration newOzoneConfiguration() {
+ final OzoneConfiguration conf = new OzoneConfiguration();
+ return conf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java
deleted file mode 100644
index 9e08212..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.client.rest.OzoneException;
-import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-import org.apache.hadoop.test.GenericTestUtils;
-
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
- .OZONE_SCM_CONTAINER_SIZE;
-import org.junit.Test;
-
-/**
- * Tests the behavior of the datanode, when replicate container command is
- * received.
- */
-public class TestReplicateContainerHandler {
-
- @Test
- public void test() throws IOException, TimeoutException, InterruptedException,
- OzoneException {
-
- GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
- .captureLogs(ReplicateContainerCommandHandler.LOG);
-
- OzoneConfiguration conf = new OzoneConfiguration();
- conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB");
- MiniOzoneCluster cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
- cluster.waitForClusterToBeReady();
-
- DatanodeDetails datanodeDetails =
- cluster.getHddsDatanodes().get(0).getDatanodeDetails();
- //send the order to replicate the container
- cluster.getStorageContainerManager().getScmNodeManager()
- .addDatanodeCommand(datanodeDetails.getUuid(),
- new ReplicateContainerCommand(1L,
- new ArrayList<>()));
-
- //TODO: here we test only the serialization/unserialization as
- // the implementation is not yet done
- GenericTestUtils
- .waitFor(() -> logCapturer.getOutput().contains("not yet handled"), 500,
- 5 * 1000);
-
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index f112d26..5dd88fb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -513,7 +513,7 @@ public class TestOzoneContainer {
return new XceiverClientGrpc(pipeline, conf);
}
- private static void createContainerForTesting(XceiverClientSpi client,
+ public static void createContainerForTesting(XceiverClientSpi client,
long containerID) throws Exception {
// Create container
ContainerProtos.ContainerCommandRequestProto request =
@@ -525,7 +525,7 @@ public class TestOzoneContainer {
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
}
- private static ContainerProtos.ContainerCommandRequestProto
+ public static ContainerProtos.ContainerCommandRequestProto
writeChunkForContainer(XceiverClientSpi client,
long containerID, int dataLen) throws Exception {
// Write Chunk
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org