You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/09/12 10:25:05 UTC

[58/90] [abbrv] hadoop git commit: HDDS-432. Replication of closed containers is not working. Contributed by Elek, Marton.

HDDS-432. Replication of closed containers is not working.
Contributed by Elek, Marton.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9c238ffc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9c238ffc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9c238ffc

Branch: refs/heads/YARN-3409
Commit: 9c238ffc301c9aa1ae0f811c065e7426b1e23540
Parents: a406f6f
Author: Anu Engineer <ae...@apache.org>
Authored: Tue Sep 11 17:00:04 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue Sep 11 17:00:04 2018 -0700

----------------------------------------------------------------------
 .../ReplicateContainerCommandHandler.java       |  15 ++-
 .../TestReplicateContainerCommandHandler.java   |  19 +++-
 .../replication/ReplicationManager.java         |  18 +++-
 .../replication/TestReplicationManager.java     | 104 +++++++++++--------
 4 files changed, 105 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c238ffc/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index d1895a8..cb677c2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -19,11 +19,13 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 import java.io.FileInputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -44,6 +46,7 @@ import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,13 +100,19 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
     ReplicateContainerCommand replicateCommand =
         (ReplicateContainerCommand) command;
     try {
-
+      List<DatanodeDetails> sourceDatanodes =
+          replicateCommand.getSourceDatanodes();
       long containerID = replicateCommand.getContainerID();
+
+      Preconditions.checkArgument(sourceDatanodes.size() > 0,
+          String.format("Replication command is received for container %d "
+              + "but the size of source datanodes was 0.", containerID));
+
       LOG.info("Starting replication of container {} from {}", containerID,
-          replicateCommand.getSourceDatanodes());
+          sourceDatanodes);
       CompletableFuture<Path> tempTarFile = downloader
           .getContainerDataFromReplicas(containerID,
-              replicateCommand.getSourceDatanodes());
+              sourceDatanodes);
 
       CompletableFuture<Void> result =
           tempTarFile.thenAccept(path -> {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c238ffc/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
index 6a14d33..6529922 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
@@ -106,7 +106,6 @@ public class TestReplicateContainerCommandHandler {
     handler.handle(command, null, Mockito.mock(StateContext.class), null);
 
     //THEN
-
     TestGenericTestUtils
         .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
 
@@ -124,6 +123,24 @@ public class TestReplicateContainerCommandHandler {
             2000);
   }
 
+  /**
+   * Can't handle a command if there are no source replicas.
+   */
+  @Test(expected = IllegalArgumentException.class)
+  public void handleWithoutReplicas()
+      throws TimeoutException, InterruptedException {
+    //GIVEN
+    ReplicateContainerCommand commandWithoutReplicas =
+        new ReplicateContainerCommand(1L, new ArrayList<>());
+
+    //WHEN
+    handler
+        .handle(commandWithoutReplicas,
+            null,
+            Mockito.mock(StateContext.class),
+            null);
+
+  }
   private static class StubDownloader implements ContainerDownloader {
 
     private Map<Long, CompletableFuture<Path>> futureByContainers =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c238ffc/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 4a980f7..ddecdbc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ThreadFactory;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -116,7 +117,15 @@ public class ReplicationManager implements Runnable {
 
         //check the current replication
         List<DatanodeDetails> datanodesWithReplicas =
-            getCurrentReplicas(request);
+            new ArrayList<>(getCurrentReplicas(request));
+
+        if (datanodesWithReplicas.size() == 0) {
+          LOG.warn(
+              "Container {} should be replicated but can't find any existing "
+                  + "replicas",
+              containerID);
+          return;
+        }
 
         ReplicationRequest finalRequest = request;
 
@@ -165,11 +174,10 @@ public class ReplicationManager implements Runnable {
   }
 
   @VisibleForTesting
-  protected List<DatanodeDetails> getCurrentReplicas(ReplicationRequest request)
+  protected Set<DatanodeDetails> getCurrentReplicas(ReplicationRequest request)
       throws IOException {
-    //TODO: replication information is not yet available after HDDS-175,
-    // should be fixed after HDDS-228
-    return new ArrayList<>();
+    return containerStateManager
+        .getContainerReplicas(new ContainerID(request.getContainerId()));
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c238ffc/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index da05913..06beb7c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.scm.container.replication;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
@@ -26,27 +28,22 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
-    .ReplicationRequestToRepeat;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationRequestToRepeat;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 
 import com.google.common.base.Preconditions;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents
-    .TRACK_REPLICATE_COMMAND;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.TRACK_REPLICATE_COMMAND;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -69,6 +66,8 @@ public class TestReplicationManager {
 
   private ContainerPlacementPolicy containerPlacementPolicy;
   private List<DatanodeDetails> listOfDatanodeDetails;
+  private LeaseManager<Long> leaseManager;
+  private ReplicationManager replicationManager;
 
   @Before
   public void initReplicationManager() throws IOException {
@@ -86,7 +85,6 @@ public class TestReplicationManager {
 
     containerStateManager = Mockito.mock(ContainerStateManager.class);
 
-    //container with 2 replicas
     ContainerInfo containerInfo = new ContainerInfo.Builder()
         .setState(LifeCycleState.CLOSED)
         .build();
@@ -94,6 +92,16 @@ public class TestReplicationManager {
     when(containerStateManager.getContainer(anyObject()))
         .thenReturn(containerInfo);
 
+    when(containerStateManager.getContainerReplicas(new ContainerID(1L)))
+        .thenReturn(new HashSet<>(Arrays.asList(
+            listOfDatanodeDetails.get(0),
+            listOfDatanodeDetails.get(1)
+        )));
+
+
+    when(containerStateManager.getContainerReplicas(new ContainerID(3L)))
+        .thenReturn(new HashSet<>());
+
     queue = new EventQueue();
 
     trackReplicationEvents = new ArrayList<>();
@@ -104,32 +112,53 @@ public class TestReplicationManager {
     queue.addHandler(SCMEvents.DATANODE_COMMAND,
         (event, publisher) -> copyEvents.add(event));
 
+    leaseManager = new LeaseManager<>("Test", 100000L);
+
+    replicationManager = new ReplicationManager(containerPlacementPolicy,
+        containerStateManager, queue, leaseManager);
+
+
+
+  }
+
+  /**
+   * Container should be replicated but no source replicas.
+   */
+  @Test()
+  public void testNoExistingReplicas() throws InterruptedException {
+    try {
+      leaseManager.start();
+      replicationManager.start();
+
+      //WHEN
+      queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+          new ReplicationRequest(3L, (short) 2, System.currentTimeMillis(),
+              (short) 3));
+
+      Thread.sleep(500L);
+      queue.processAll(1000L);
+
+      //THEN
+      Assert.assertEquals(0, trackReplicationEvents.size());
+      Assert.assertEquals(0, copyEvents.size());
+
+    } finally {
+      if (leaseManager != null) {
+        leaseManager.shutdown();
+      }
+    }
   }
 
   @Test
   public void testEventSending() throws InterruptedException, IOException {
 
-
     //GIVEN
-
-    LeaseManager<Long> leaseManager = new LeaseManager<>("Test", 100000L);
     try {
       leaseManager.start();
 
-      ReplicationManager replicationManager =
-          new ReplicationManager(containerPlacementPolicy,
-              containerStateManager,
-              queue, leaseManager) {
-            @Override
-            protected List<DatanodeDetails> getCurrentReplicas(
-                ReplicationRequest request) throws IOException {
-              return listOfDatanodeDetails.subList(0, 2);
-            }
-          };
       replicationManager.start();
 
       //WHEN
-
       queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
           new ReplicationRequest(1L, (short) 2, System.currentTimeMillis(),
               (short) 3));
@@ -138,7 +167,6 @@ public class TestReplicationManager {
       queue.processAll(1000L);
 
       //THEN
-
       Assert.assertEquals(1, trackReplicationEvents.size());
       Assert.assertEquals(1, copyEvents.size());
     } finally {
@@ -150,22 +178,14 @@ public class TestReplicationManager {
 
   @Test
   public void testCommandWatcher() throws InterruptedException, IOException {
+    LeaseManager<Long> rapidLeaseManager =
+        new LeaseManager<>("Test", 1000L);
 
-    Logger.getRootLogger().setLevel(Level.DEBUG);
-    LeaseManager<Long> leaseManager = new LeaseManager<>("Test", 1000L);
+    replicationManager = new ReplicationManager(containerPlacementPolicy,
+        containerStateManager, queue, rapidLeaseManager);
 
     try {
-      leaseManager.start();
-
-      ReplicationManager replicationManager =
-          new ReplicationManager(containerPlacementPolicy,
-              containerStateManager, queue, leaseManager) {
-            @Override
-            protected List<DatanodeDetails> getCurrentReplicas(
-                ReplicationRequest request) throws IOException {
-              return listOfDatanodeDetails.subList(0, 2);
-            }
-          };
+      rapidLeaseManager.start();
       replicationManager.start();
 
       queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
@@ -192,8 +212,8 @@ public class TestReplicationManager {
       Assert.assertEquals(2, copyEvents.size());
 
     } finally {
-      if (leaseManager != null) {
-        leaseManager.shutdown();
+      if (rapidLeaseManager != null) {
+        rapidLeaseManager.shutdown();
       }
     }
   }
@@ -209,7 +229,7 @@ public class TestReplicationManager {
             ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
             PipelineID.randomId());
     pipeline.addMember(leader);
-    for (; i.hasNext(); ) {
+    while (i.hasNext()) {
       pipeline.addMember(i.next());
     }
     return pipeline;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org