You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/09/12 10:25:05 UTC
[58/90] [abbrv] hadoop git commit: HDDS-432. Replication of closed
containers is not working. Contributed by Elek, Marton.
HDDS-432. Replication of closed containers is not working.
Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9c238ffc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9c238ffc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9c238ffc
Branch: refs/heads/YARN-3409
Commit: 9c238ffc301c9aa1ae0f811c065e7426b1e23540
Parents: a406f6f
Author: Anu Engineer <ae...@apache.org>
Authored: Tue Sep 11 17:00:04 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue Sep 11 17:00:04 2018 -0700
----------------------------------------------------------------------
.../ReplicateContainerCommandHandler.java | 15 ++-
.../TestReplicateContainerCommandHandler.java | 19 +++-
.../replication/ReplicationManager.java | 18 +++-
.../replication/TestReplicationManager.java | 104 +++++++++++--------
4 files changed, 105 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c238ffc/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index d1895a8..cb677c2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -19,11 +19,13 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -44,6 +46,7 @@ import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,13 +100,19 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
ReplicateContainerCommand replicateCommand =
(ReplicateContainerCommand) command;
try {
-
+ List<DatanodeDetails> sourceDatanodes =
+ replicateCommand.getSourceDatanodes();
long containerID = replicateCommand.getContainerID();
+
+ Preconditions.checkArgument(sourceDatanodes.size() > 0,
+ String.format("Replication command is received for container %d "
+ + "but the size of source datanodes was 0.", containerID));
+
LOG.info("Starting replication of container {} from {}", containerID,
- replicateCommand.getSourceDatanodes());
+ sourceDatanodes);
CompletableFuture<Path> tempTarFile = downloader
.getContainerDataFromReplicas(containerID,
- replicateCommand.getSourceDatanodes());
+ sourceDatanodes);
CompletableFuture<Void> result =
tempTarFile.thenAccept(path -> {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c238ffc/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
index 6a14d33..6529922 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
@@ -106,7 +106,6 @@ public class TestReplicateContainerCommandHandler {
handler.handle(command, null, Mockito.mock(StateContext.class), null);
//THEN
-
TestGenericTestUtils
.waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
@@ -124,6 +123,24 @@ public class TestReplicateContainerCommandHandler {
2000);
}
+ /**
+ * Can't handle a command if there are no source replicas.
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void handleWithoutReplicas()
+ throws TimeoutException, InterruptedException {
+ //GIVEN
+ ReplicateContainerCommand commandWithoutReplicas =
+ new ReplicateContainerCommand(1L, new ArrayList<>());
+
+ //WHEN
+ handler
+ .handle(commandWithoutReplicas,
+ null,
+ Mockito.mock(StateContext.class),
+ null);
+
+ }
private static class StubDownloader implements ContainerDownloader {
private Map<Long, CompletableFuture<Path>> futureByContainers =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c238ffc/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 4a980f7..ddecdbc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ThreadFactory;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -116,7 +117,15 @@ public class ReplicationManager implements Runnable {
//check the current replication
List<DatanodeDetails> datanodesWithReplicas =
- getCurrentReplicas(request);
+ new ArrayList<>(getCurrentReplicas(request));
+
+ if (datanodesWithReplicas.size() == 0) {
+ LOG.warn(
+ "Container {} should be replicated but can't find any existing "
+ + "replicas",
+ containerID);
+ return;
+ }
ReplicationRequest finalRequest = request;
@@ -165,11 +174,10 @@ public class ReplicationManager implements Runnable {
}
@VisibleForTesting
- protected List<DatanodeDetails> getCurrentReplicas(ReplicationRequest request)
+ protected Set<DatanodeDetails> getCurrentReplicas(ReplicationRequest request)
throws IOException {
- //TODO: replication information is not yet available after HDDS-175,
- // should be fixed after HDDS-228
- return new ArrayList<>();
+ return containerStateManager
+ .getContainerReplicas(new ContainerID(request.getContainerId()));
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c238ffc/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index da05913..06beb7c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.scm.container.replication;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
@@ -26,27 +28,22 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
- .ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
- .ReplicationRequestToRepeat;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationRequestToRepeat;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import com.google.common.base.Preconditions;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents
- .TRACK_REPLICATE_COMMAND;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.TRACK_REPLICATE_COMMAND;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -69,6 +66,8 @@ public class TestReplicationManager {
private ContainerPlacementPolicy containerPlacementPolicy;
private List<DatanodeDetails> listOfDatanodeDetails;
+ private LeaseManager<Long> leaseManager;
+ private ReplicationManager replicationManager;
@Before
public void initReplicationManager() throws IOException {
@@ -86,7 +85,6 @@ public class TestReplicationManager {
containerStateManager = Mockito.mock(ContainerStateManager.class);
- //container with 2 replicas
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(LifeCycleState.CLOSED)
.build();
@@ -94,6 +92,16 @@ public class TestReplicationManager {
when(containerStateManager.getContainer(anyObject()))
.thenReturn(containerInfo);
+ when(containerStateManager.getContainerReplicas(new ContainerID(1L)))
+ .thenReturn(new HashSet<>(Arrays.asList(
+ listOfDatanodeDetails.get(0),
+ listOfDatanodeDetails.get(1)
+ )));
+
+
+ when(containerStateManager.getContainerReplicas(new ContainerID(3L)))
+ .thenReturn(new HashSet<>());
+
queue = new EventQueue();
trackReplicationEvents = new ArrayList<>();
@@ -104,32 +112,53 @@ public class TestReplicationManager {
queue.addHandler(SCMEvents.DATANODE_COMMAND,
(event, publisher) -> copyEvents.add(event));
+ leaseManager = new LeaseManager<>("Test", 100000L);
+
+ replicationManager = new ReplicationManager(containerPlacementPolicy,
+ containerStateManager, queue, leaseManager);
+
+
+
+ }
+
+ /**
+ * Container should be replicated but no source replicas.
+ */
+ @Test()
+ public void testNoExistingReplicas() throws InterruptedException {
+ try {
+ leaseManager.start();
+ replicationManager.start();
+
+ //WHEN
+ queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+ new ReplicationRequest(3L, (short) 2, System.currentTimeMillis(),
+ (short) 3));
+
+ Thread.sleep(500L);
+ queue.processAll(1000L);
+
+ //THEN
+ Assert.assertEquals(0, trackReplicationEvents.size());
+ Assert.assertEquals(0, copyEvents.size());
+
+ } finally {
+ if (leaseManager != null) {
+ leaseManager.shutdown();
+ }
+ }
}
@Test
public void testEventSending() throws InterruptedException, IOException {
-
//GIVEN
-
- LeaseManager<Long> leaseManager = new LeaseManager<>("Test", 100000L);
try {
leaseManager.start();
- ReplicationManager replicationManager =
- new ReplicationManager(containerPlacementPolicy,
- containerStateManager,
- queue, leaseManager) {
- @Override
- protected List<DatanodeDetails> getCurrentReplicas(
- ReplicationRequest request) throws IOException {
- return listOfDatanodeDetails.subList(0, 2);
- }
- };
replicationManager.start();
//WHEN
-
queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
new ReplicationRequest(1L, (short) 2, System.currentTimeMillis(),
(short) 3));
@@ -138,7 +167,6 @@ public class TestReplicationManager {
queue.processAll(1000L);
//THEN
-
Assert.assertEquals(1, trackReplicationEvents.size());
Assert.assertEquals(1, copyEvents.size());
} finally {
@@ -150,22 +178,14 @@ public class TestReplicationManager {
@Test
public void testCommandWatcher() throws InterruptedException, IOException {
+ LeaseManager<Long> rapidLeaseManager =
+ new LeaseManager<>("Test", 1000L);
- Logger.getRootLogger().setLevel(Level.DEBUG);
- LeaseManager<Long> leaseManager = new LeaseManager<>("Test", 1000L);
+ replicationManager = new ReplicationManager(containerPlacementPolicy,
+ containerStateManager, queue, rapidLeaseManager);
try {
- leaseManager.start();
-
- ReplicationManager replicationManager =
- new ReplicationManager(containerPlacementPolicy,
- containerStateManager, queue, leaseManager) {
- @Override
- protected List<DatanodeDetails> getCurrentReplicas(
- ReplicationRequest request) throws IOException {
- return listOfDatanodeDetails.subList(0, 2);
- }
- };
+ rapidLeaseManager.start();
replicationManager.start();
queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
@@ -192,8 +212,8 @@ public class TestReplicationManager {
Assert.assertEquals(2, copyEvents.size());
} finally {
- if (leaseManager != null) {
- leaseManager.shutdown();
+ if (rapidLeaseManager != null) {
+ rapidLeaseManager.shutdown();
}
}
}
@@ -209,7 +229,7 @@ public class TestReplicationManager {
ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
PipelineID.randomId());
pipeline.addMember(leader);
- for (; i.hasNext(); ) {
+ while (i.hasNext()) {
pipeline.addMember(i.next());
}
return pipeline;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org