You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2018/09/19 18:22:25 UTC
hadoop git commit: HDDS-460. Replication manager failed to import
container data. Contributed by Elek, Marton.
Repository: hadoop
Updated Branches:
refs/heads/trunk efdea85ad -> 042bf74d5
HDDS-460. Replication manager failed to import container data. Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/042bf74d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/042bf74d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/042bf74d
Branch: refs/heads/trunk
Commit: 042bf74d5eb3dc627658b8c4c027628569169513
Parents: efdea85
Author: Nanda kumar <na...@apache.org>
Authored: Wed Sep 19 23:51:50 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Wed Sep 19 23:51:50 2018 +0530
----------------------------------------------------------------------
.../statemachine/DatanodeStateMachine.java | 19 ++-
.../ReplicateContainerCommandHandler.java | 120 ++------------
.../replication/ContainerReplicator.java | 27 +++
.../DownloadAndImportReplicator.java | 136 ++++++++++++++++
.../replication/GrpcReplicationClient.java | 2 +-
.../replication/ReplicationSupervisor.java | 142 ++++++++++++++++
.../container/replication/ReplicationTask.java | 102 ++++++++++++
.../TestReplicateContainerCommandHandler.java | 163 -------------------
.../replication/TestReplicationSupervisor.java | 143 ++++++++++++++++
.../container/replication/package-info.java | 22 +++
10 files changed, 602 insertions(+), 274 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/042bf74d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 875d063..1bade8e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -40,7 +40,12 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.ReplicateContainerCommandHandler;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
+import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
+import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -69,6 +74,7 @@ public class DatanodeStateMachine implements Closeable {
private AtomicLong nextHB;
private Thread stateMachineThread = null;
private Thread cmdProcessThread = null;
+ private final ReplicationSupervisor supervisor;
/**
* Constructs a a datanode state machine.
@@ -89,14 +95,21 @@ public class DatanodeStateMachine implements Closeable {
new OzoneConfiguration(conf), context);
nextHB = new AtomicLong(Time.monotonicNow());
+ ContainerReplicator replicator =
+ new DownloadAndImportReplicator(container.getContainerSet(),
+ container.getDispatcher(),
+ new SimpleContainerDownloader(conf), new TarContainerPacker());
+
+ supervisor =
+ new ReplicationSupervisor(container.getContainerSet(), replicator, 10);
+
// When we add new handlers just adding a new handler here should do the
// trick.
commandDispatcher = CommandDispatcher.newBuilder()
.addHandler(new CloseContainerCommandHandler())
.addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(),
conf))
- .addHandler(new ReplicateContainerCommandHandler(conf,
- container.getContainerSet(), container.getDispatcher()))
+ .addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
@@ -295,6 +308,7 @@ public class DatanodeStateMachine implements Closeable {
public void startDaemon() {
Runnable startStateMachineTask = () -> {
try {
+ supervisor.start();
start();
LOG.info("Ozone container server started.");
} catch (Exception ex) {
@@ -323,6 +337,7 @@ public class DatanodeStateMachine implements Closeable {
*/
public synchronized void stopDaemon() {
try {
+ supervisor.stop();
context.setState(DatanodeStates.SHUTDOWN);
reportManager.shutdown();
this.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/042bf74d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index cb677c2..09c379f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -16,33 +16,17 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-import java.io.FileInputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import org.apache.hadoop.ozone.container.common.statemachine
- .SCMConnectionManager;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
-import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
+import org.apache.hadoop.ozone.container.replication.ReplicationTask;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -58,39 +42,19 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
static final Logger LOG =
LoggerFactory.getLogger(ReplicateContainerCommandHandler.class);
- private ContainerDispatcher containerDispatcher;
-
private int invocationCount;
private long totalTime;
- private ContainerDownloader downloader;
-
private Configuration conf;
- private TarContainerPacker packer = new TarContainerPacker();
-
- private ContainerSet containerSet;
-
- private Lock lock = new ReentrantLock();
+ private ReplicationSupervisor supervisor;
public ReplicateContainerCommandHandler(
Configuration conf,
- ContainerSet containerSet,
- ContainerDispatcher containerDispatcher,
- ContainerDownloader downloader) {
+ ReplicationSupervisor supervisor) {
this.conf = conf;
- this.containerSet = containerSet;
- this.downloader = downloader;
- this.containerDispatcher = containerDispatcher;
- }
-
- public ReplicateContainerCommandHandler(
- Configuration conf,
- ContainerSet containerSet,
- ContainerDispatcher containerDispatcher) {
- this(conf, containerSet, containerDispatcher,
- new SimpleContainerDownloader(conf));
+ this.supervisor = supervisor;
}
@Override
@@ -108,72 +72,12 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
String.format("Replication command is received for container %d "
+ "but the size of source datanodes was 0.", containerID));
- LOG.info("Starting replication of container {} from {}", containerID,
- sourceDatanodes);
- CompletableFuture<Path> tempTarFile = downloader
- .getContainerDataFromReplicas(containerID,
- sourceDatanodes);
-
- CompletableFuture<Void> result =
- tempTarFile.thenAccept(path -> {
- LOG.info("Container {} is downloaded, starting to import.",
- containerID);
- importContainer(containerID, path);
- });
-
- result.whenComplete((aVoid, throwable) -> {
- if (throwable != null) {
- LOG.error("Container replication was unsuccessful .", throwable);
- } else {
- LOG.info("Container {} is replicated successfully", containerID);
- }
- });
- } finally {
- updateCommandStatus(context, command, true, LOG);
+ ReplicationTask replicationTask =
+ new ReplicationTask(containerID, sourceDatanodes);
+ supervisor.addTask(replicationTask);
- }
- }
-
- protected void importContainer(long containerID, Path tarFilePath) {
- lock.lock();
- try {
- ContainerData originalContainerData;
- try (FileInputStream tempContainerTarStream = new FileInputStream(
- tarFilePath.toFile())) {
- byte[] containerDescriptorYaml =
- packer.unpackContainerDescriptor(tempContainerTarStream);
- originalContainerData = ContainerDataYaml.readContainer(
- containerDescriptorYaml);
- }
-
- try (FileInputStream tempContainerTarStream = new FileInputStream(
- tarFilePath.toFile())) {
-
- Handler handler = containerDispatcher.getHandler(
- originalContainerData.getContainerType());
-
- Container container = handler.importContainer(containerID,
- originalContainerData.getMaxSize(),
- tempContainerTarStream,
- packer);
-
- containerSet.addContainer(container);
- }
-
- } catch (Exception e) {
- LOG.error(
- "Can't import the downloaded container data id=" + containerID,
- e);
- try {
- Files.delete(tarFilePath);
- } catch (Exception ex) {
- LOG.error(
- "Container import is failed and the downloaded file can't be "
- + "deleted: "
- + tarFilePath.toAbsolutePath().toString());
- }
} finally {
- lock.unlock();
+ updateCommandStatus(context, command, true, LOG);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/042bf74d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java
new file mode 100644
index 0000000..827b9d6
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+/**
+ * Service to do the real replication task.
+ *
+ * An implementation should download the container and im
+ */
+public interface ContainerReplicator {
+ void replicate(ReplicationTask task);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/042bf74d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
new file mode 100644
index 0000000..5ef5841
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.FileInputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default replication implementation.
+ * <p>
+ * This class does the real job. Executes the download and import the container
+ * to the container set.
+ */
+public class DownloadAndImportReplicator implements ContainerReplicator {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DownloadAndImportReplicator.class);
+
+ private final ContainerSet containerSet;
+
+ private final ContainerDispatcher containerDispatcher;
+
+ private final ContainerDownloader downloader;
+
+ private final TarContainerPacker packer;
+
+ public DownloadAndImportReplicator(
+ ContainerSet containerSet,
+ ContainerDispatcher containerDispatcher,
+ ContainerDownloader downloader,
+ TarContainerPacker packer) {
+ this.containerSet = containerSet;
+ this.containerDispatcher = containerDispatcher;
+ this.downloader = downloader;
+ this.packer = packer;
+ }
+
+ public void importContainer(long containerID, Path tarFilePath) {
+ try {
+ ContainerData originalContainerData;
+ try (FileInputStream tempContainerTarStream = new FileInputStream(
+ tarFilePath.toFile())) {
+ byte[] containerDescriptorYaml =
+ packer.unpackContainerDescriptor(tempContainerTarStream);
+ originalContainerData = ContainerDataYaml.readContainer(
+ containerDescriptorYaml);
+ }
+
+ try (FileInputStream tempContainerTarStream = new FileInputStream(
+ tarFilePath.toFile())) {
+
+ Handler handler = containerDispatcher.getHandler(
+ originalContainerData.getContainerType());
+
+ Container container = handler.importContainer(containerID,
+ originalContainerData.getMaxSize(),
+ tempContainerTarStream,
+ packer);
+
+ containerSet.addContainer(container);
+ }
+
+ } catch (Exception e) {
+ LOG.error(
+ "Can't import the downloaded container data id=" + containerID,
+ e);
+ try {
+ Files.delete(tarFilePath);
+ } catch (Exception ex) {
+ LOG.error(
+ "Container import is failed and the downloaded file can't be "
+ + "deleted: "
+ + tarFilePath.toAbsolutePath().toString());
+ }
+ }
+ }
+
+ @Override
+ public void replicate(ReplicationTask task) {
+ long containerID = task.getContainerId();
+
+ List<DatanodeDetails> sourceDatanodes = task.getSources();
+
+ LOG.info("Starting replication of container {} from {}", containerID,
+ sourceDatanodes);
+
+ CompletableFuture<Path> tempTarFile = downloader
+ .getContainerDataFromReplicas(containerID,
+ sourceDatanodes);
+
+ try {
+ //wait for the download. This thread pool is limiting the paralell
+ //downloads, so it's ok to block here and wait for the full download.
+ Path path = tempTarFile.get();
+ LOG.info("Container {} is downloaded, starting to import.",
+ containerID);
+ importContainer(containerID, path);
+ LOG.info("Container {} is replicated successfully", containerID);
+ task.setStatus(Status.DONE);
+ } catch (Exception e) {
+ LOG.error("Container replication was unsuccessful .", e);
+ task.setStatus(Status.FAILED);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/042bf74d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
index 91d098f..3aafb0c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -157,8 +157,8 @@ public class GrpcReplicationClient {
public void onCompleted() {
try {
stream.close();
- response.complete(outputPath);
LOG.info("Container is downloaded to {}", outputPath);
+ response.complete(outputPath);
} catch (IOException e) {
response.completeExceptionally(e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/042bf74d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
new file mode 100644
index 0000000..1d8d5f6
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Single point to schedule the downloading tasks based on priorities.
+ */
+public class ReplicationSupervisor {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReplicationSupervisor.class);
+
+ private final Set<Worker> threadPool = new HashSet<>();
+
+ private final Map<Long, ReplicationTask> queue = new TreeMap();
+
+ private final ContainerSet containerSet;
+
+ private final ContainerReplicator replicator;
+
+ private final int poolSize;
+
+ public ReplicationSupervisor(
+ ContainerSet containerSet,
+ ContainerReplicator replicator, int poolSize) {
+ this.containerSet = containerSet;
+ this.replicator = replicator;
+ this.poolSize = poolSize;
+ }
+
+ public synchronized void addTask(ReplicationTask task) {
+ queue.putIfAbsent(task.getContainerId(), task);
+ synchronized (threadPool) {
+ threadPool.notify();
+ }
+ }
+
+ public void start() {
+ for (int i = 0; i < poolSize; i++) {
+ Worker worker = new Worker();
+ Thread thread = new Thread(worker, "ContainerReplication-" + i);
+ thread.setDaemon(true);
+ thread.start();
+ threadPool.add(worker);
+ }
+ }
+
+ public synchronized ReplicationTask selectTask() {
+ for (ReplicationTask task : queue.values()) {
+ if (task.getStatus() == Status.QUEUED) {
+ if (containerSet.getContainer(task.getContainerId()) == null) {
+ task.setStatus(Status.DOWNLOADING);
+ return task;
+ } else {
+ LOG.debug("Container {} has already been downloaded.",
+ task.getContainerId());
+ queue.remove(task.getContainerId());
+ }
+ } else if (task.getStatus() == Status.FAILED) {
+ LOG.error(
+ "Container {} can't be downloaded from any of the datanodes.",
+ task.getContainerId());
+ queue.remove(task.getContainerId());
+ } else if (task.getStatus() == Status.DONE) {
+ queue.remove(task.getContainerId());
+ LOG.info("Container {} is replicated.", task.getContainerId());
+ }
+ }
+ //no available task.
+ return null;
+ }
+
+ public void stop() {
+ for (Worker worker : threadPool) {
+ worker.stop();
+ }
+ }
+
+ @VisibleForTesting
+ public int getQueueSize() {
+ return queue.size();
+ }
+
+ private class Worker implements Runnable {
+
+ private boolean running = true;
+
+ @Override
+ public void run() {
+ try {
+ while (running) {
+ ReplicationTask task = selectTask();
+ if (task == null) {
+ synchronized (threadPool) {
+ threadPool.wait();
+ }
+ } else {
+ replicator.replicate(task);
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error("Error on doing replication", ex);
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ LOG.error("Error on waiting after failed replication task", e);
+ }
+ }
+ }
+
+ public void stop() {
+ running = false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/042bf74d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
new file mode 100644
index 0000000..9019811
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+/**
+ * The task to download a container from the sources.
+ */
+public class ReplicationTask {
+
+ private volatile Status status = Status.QUEUED;
+
+ private final long containerId;
+
+ private List<DatanodeDetails> sources;
+
+ private final Instant queued = Instant.now();
+
+ public ReplicationTask(long containerId,
+ List<DatanodeDetails> sources) {
+ this.containerId = containerId;
+ this.sources = sources;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReplicationTask that = (ReplicationTask) o;
+ return containerId == that.containerId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(containerId);
+ }
+
+ public long getContainerId() {
+ return containerId;
+ }
+
+ public List<DatanodeDetails> getSources() {
+ return sources;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(
+ Status status) {
+ this.status = status;
+ }
+
+ @Override
+ public String toString() {
+ return "ReplicationTask{" +
+ "status=" + status +
+ ", containerId=" + containerId +
+ ", sources=" + sources +
+ ", queued=" + queued +
+ '}';
+ }
+
+ public Instant getQueued() {
+ return queued;
+ }
+
+ /**
+ * Status of the replication.
+ */
+ public enum Status {
+ QUEUED,
+ DOWNLOADING,
+ FAILED,
+ DONE
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/042bf74d/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
deleted file mode 100644
index 6529922..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
-import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.TestGenericTestUtils;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-/**
- * Test replication command handler.
- */
-public class TestReplicateContainerCommandHandler {
-
- private static final String EXCEPTION_MESSAGE = "Oh my god";
- private ReplicateContainerCommandHandler handler;
- private StubDownloader downloader;
- private ReplicateContainerCommand command;
- private List<Long> importedContainerIds;
-
- @Before
- public void init() {
- importedContainerIds = new ArrayList<>();
-
- OzoneConfiguration conf = new OzoneConfiguration();
- ContainerSet containerSet = Mockito.mock(ContainerSet.class);
- ContainerDispatcher containerDispatcher =
- Mockito.mock(ContainerDispatcher.class);
-
- downloader = new StubDownloader();
-
- handler = new ReplicateContainerCommandHandler(conf, containerSet,
- containerDispatcher, downloader) {
- @Override
- protected void importContainer(long containerID, Path tarFilePath) {
- importedContainerIds.add(containerID);
- }
- };
-
- //the command
- ArrayList<DatanodeDetails> datanodeDetails = new ArrayList<>();
- datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
- datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
-
- command = new ReplicateContainerCommand(1L, datanodeDetails);
- }
-
- @Test
- public void handle() throws TimeoutException, InterruptedException {
- //GIVEN
-
- //WHEN
- handler.handle(command, null, Mockito.mock(StateContext.class), null);
-
- TestGenericTestUtils
- .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
-
- Assert.assertNotNull(downloader.futureByContainers.get(1L));
- downloader.futureByContainers.get(1L).complete(Paths.get("/tmp/test"));
-
- TestGenericTestUtils
- .waitFor(() -> importedContainerIds.size() == 1, 100, 2000);
- }
-
- @Test
- public void handleWithErrors() throws TimeoutException, InterruptedException {
- //GIVEN
- GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
- .captureLogs(ReplicateContainerCommandHandler.LOG);
-
- //WHEN
- handler.handle(command, null, Mockito.mock(StateContext.class), null);
-
- //THEN
- TestGenericTestUtils
- .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
-
- Assert.assertNotNull(downloader.futureByContainers.get(1L));
- downloader.futureByContainers.get(1L)
- .completeExceptionally(new IllegalArgumentException(
- EXCEPTION_MESSAGE));
-
- TestGenericTestUtils
- .waitFor(() -> {
- String output = logCapturer.getOutput();
- return output.contains("unsuccessful") && output
- .contains(EXCEPTION_MESSAGE); },
- 100,
- 2000);
- }
-
- /**
- * Can't handle a command if there are no source replicas.
- */
- @Test(expected = IllegalArgumentException.class)
- public void handleWithoutReplicas()
- throws TimeoutException, InterruptedException {
- //GIVEN
- ReplicateContainerCommand commandWithoutReplicas =
- new ReplicateContainerCommand(1L, new ArrayList<>());
-
- //WHEN
- handler
- .handle(commandWithoutReplicas,
- null,
- Mockito.mock(StateContext.class),
- null);
-
- }
- private static class StubDownloader implements ContainerDownloader {
-
- private Map<Long, CompletableFuture<Path>> futureByContainers =
- new HashMap<>();
-
- @Override
- public void close() {
-
- }
-
- @Override
- public CompletableFuture<Path> getContainerDataFromReplicas(
- long containerId, List<DatanodeDetails> sources) {
- CompletableFuture<Path> future = new CompletableFuture<>();
- futureByContainers.put(containerId, future);
- return future;
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/042bf74d/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
new file mode 100644
index 0000000..d433319
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test the replication supervisor.
+ */
+public class TestReplicationSupervisor {
+
+ private OzoneConfiguration conf = new OzoneConfiguration();
+
+ @Test
+ public void normal() {
+ //GIVEN
+ ContainerSet set = new ContainerSet();
+
+ FakeReplicator replicator = new FakeReplicator(set);
+ ReplicationSupervisor supervisor =
+ new ReplicationSupervisor(set, replicator, 5);
+
+ List<DatanodeDetails> datanodes = IntStream.range(1, 3)
+ .mapToObj(v -> Mockito.mock(DatanodeDetails.class))
+ .collect(Collectors.toList());
+
+ try {
+ supervisor.start();
+ //WHEN
+ supervisor.addTask(new ReplicationTask(1L, datanodes));
+ supervisor.addTask(new ReplicationTask(1L, datanodes));
+ supervisor.addTask(new ReplicationTask(1L, datanodes));
+ supervisor.addTask(new ReplicationTask(2L, datanodes));
+ supervisor.addTask(new ReplicationTask(2L, datanodes));
+ supervisor.addTask(new ReplicationTask(3L, datanodes));
+ try {
+ Thread.sleep(300);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ //THEN
+ System.out.println(replicator.replicated.get(0));
+
+ Assert
+ .assertEquals(3, replicator.replicated.size());
+
+ } finally {
+ supervisor.stop();
+ }
+ }
+
+ @Test
+ public void duplicateMessageAfterAWhile() throws InterruptedException {
+ //GIVEN
+ ContainerSet set = new ContainerSet();
+
+ FakeReplicator replicator = new FakeReplicator(set);
+ ReplicationSupervisor supervisor =
+ new ReplicationSupervisor(set, replicator, 2);
+
+ List<DatanodeDetails> datanodes = IntStream.range(1, 3)
+ .mapToObj(v -> Mockito.mock(DatanodeDetails.class))
+ .collect(Collectors.toList());
+
+ try {
+ supervisor.start();
+ //WHEN
+ supervisor.addTask(new ReplicationTask(1L, datanodes));
+ Thread.sleep(400);
+ supervisor.addTask(new ReplicationTask(1L, datanodes));
+ Thread.sleep(300);
+
+ //THEN
+ System.out.println(replicator.replicated.get(0));
+
+ Assert
+ .assertEquals(1, replicator.replicated.size());
+
+ //the last item is still in the queue as we cleanup the queue during the
+ // selection
+ Assert.assertEquals(1, supervisor.getQueueSize());
+
+ } finally {
+ supervisor.stop();
+ }
+ }
+
+ private class FakeReplicator implements ContainerReplicator {
+
+ private List<ReplicationTask> replicated = new ArrayList<>();
+
+ private ContainerSet containerSet;
+
+ FakeReplicator(ContainerSet set) {
+ this.containerSet = set;
+ }
+
+ @Override
+ public void replicate(ReplicationTask task) {
+ KeyValueContainerData kvcd =
+ new KeyValueContainerData(task.getContainerId(), 100L);
+ KeyValueContainer kvc =
+ new KeyValueContainer(kvcd, conf);
+ try {
+ //download is slow
+ Thread.sleep(100);
+ replicated.add(task);
+ containerSet.addContainer(kvc);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/042bf74d/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
new file mode 100644
index 0000000..5c905e0
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * Tests for the container replication.
+ */
+package org.apache.hadoop.ozone.container.replication;
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org