You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/08/05 08:13:57 UTC

[hadoop] branch trunk updated: HDDS-1798. Propagate failure in writeStateMachineData to Ratis. Contributed by Supratim Deka (#1113)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f8ea6e1  HDDS-1798. Propagate failure in writeStateMachineData to Ratis. Contributed by Supratim Deka (#1113)
f8ea6e1 is described below

commit f8ea6e1ce132c65d5ce11597818bbf972717711a
Author: supratimdeka <46...@users.noreply.github.com>
AuthorDate: Mon Aug 5 13:43:41 2019 +0530

    HDDS-1798. Propagate failure in writeStateMachineData to Ratis. Contributed by Supratim Deka (#1113)
---
 .../common/transport/server/ratis/CSMMetrics.java  | 10 ++++
 .../server/ratis/ContainerStateMachine.java        | 47 ++++++++++++----
 .../client/rpc/TestContainerStateMachine.java      |  8 +--
 .../rpc/TestContainerStateMachineFailures.java     | 64 +++++++++++-----------
 4 files changed, 78 insertions(+), 51 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
index ebbec4d..ccf57cb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
@@ -52,6 +52,7 @@ public class CSMMetrics {
 
   // Failure Metrics
   private @Metric MutableCounterLong numWriteStateMachineFails;
+  private @Metric MutableCounterLong numWriteDataFails;
   private @Metric MutableCounterLong numQueryStateMachineFails;
   private @Metric MutableCounterLong numApplyTransactionFails;
   private @Metric MutableCounterLong numReadStateMachineFails;
@@ -97,6 +98,10 @@ public class CSMMetrics {
     numWriteStateMachineFails.incr();
   }
 
+  public void incNumWriteDataFails() {
+    numWriteDataFails.incr();
+  }
+
   public void incNumQueryStateMachineFails() {
     numQueryStateMachineFails.incr();
   }
@@ -142,6 +147,11 @@ public class CSMMetrics {
   }
 
   @VisibleForTesting
+  public long getNumWriteDataFails() {
+    return numWriteDataFails.value();
+  }
+
+  @VisibleForTesting
   public long getNumQueryStateMachineFails() {
     return numQueryStateMachineFails.value();
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 768b37b..f4d4744 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -137,8 +137,8 @@ public class ContainerStateMachine extends BaseStateMachine {
   private final ContainerDispatcher dispatcher;
   private ThreadPoolExecutor chunkExecutor;
   private final XceiverServerRatis ratisServer;
-  private final ConcurrentHashMap<Long, CompletableFuture<Message>>
-      writeChunkFutureMap;
+  private final ConcurrentHashMap<Long,
+      CompletableFuture<ContainerCommandResponseProto>> writeChunkFutureMap;
 
   // keeps track of the containers created per pipeline
   private final Set<Long> createContainerSet;
@@ -385,9 +385,15 @@ public class ContainerStateMachine extends BaseStateMachine {
     return response;
   }
 
+  private ContainerCommandResponseProto runCommandGetResponse(
+      ContainerCommandRequestProto requestProto,
+      DispatcherContext context) {
+    return dispatchCommand(requestProto, context);
+  }
+
   private Message runCommand(ContainerCommandRequestProto requestProto,
       DispatcherContext context) {
-    return dispatchCommand(requestProto, context)::toByteString;
+    return runCommandGetResponse(requestProto, context)::toByteString;
   }
 
   private ExecutorService getCommandExecutor(
@@ -417,8 +423,11 @@ public class ContainerStateMachine extends BaseStateMachine {
             .build();
     // ensure the write chunk happens asynchronously in writeChunkExecutor pool
     // thread.
-    CompletableFuture<Message> writeChunkFuture = CompletableFuture
-        .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);
+    CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
+        CompletableFuture.supplyAsync(() ->
+            runCommandGetResponse(requestProto, context), chunkExecutor);
+
+    CompletableFuture<Message> raftFuture = new CompletableFuture<>();
 
     writeChunkFutureMap.put(entryIndex, writeChunkFuture);
     LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " +
@@ -427,15 +436,29 @@ public class ContainerStateMachine extends BaseStateMachine {
     // Remove the future once it finishes execution from the
     // writeChunkFutureMap.
     writeChunkFuture.thenApply(r -> {
-      metrics.incNumBytesWrittenCount(
-          requestProto.getWriteChunk().getChunkData().getLen());
+      if (r.getResult() != ContainerProtos.Result.SUCCESS) {
+        StorageContainerException sce =
+            new StorageContainerException(r.getMessage(), r.getResult());
+        LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" +
+            write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
+            write.getChunkData().getChunkName() + " Error message: " +
+            r.getMessage() + " Container Result: " + r.getResult());
+        metrics.incNumWriteDataFails();
+        raftFuture.completeExceptionally(sce);
+      } else {
+        metrics.incNumBytesWrittenCount(
+            requestProto.getWriteChunk().getChunkData().getLen());
+        LOG.debug(gid +
+            ": writeChunk writeStateMachineData  completed: blockId" +
+            write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
+            write.getChunkData().getChunkName());
+        raftFuture.complete(r::toByteString);
+      }
+
       writeChunkFutureMap.remove(entryIndex);
-      LOG.debug(gid + ": writeChunk writeStateMachineData  completed: blockId" +
-           write.getBlockID() + " logIndex " + entryIndex + " chunkName "
-          + write.getChunkData().getChunkName());
       return r;
     });
-    return writeChunkFuture;
+    return raftFuture;
   }
 
   /*
@@ -544,7 +567,7 @@ public class ContainerStateMachine extends BaseStateMachine {
    */
   @Override
   public CompletableFuture<Void> flushStateMachineData(long index) {
-    List<CompletableFuture<Message>> futureList =
+    List<CompletableFuture<ContainerCommandResponseProto>> futureList =
         writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index)
             .map(Map.Entry::getValue).collect(Collectors.toList());
     return CompletableFuture.allOf(
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 13e3eff..2c3cfab 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -139,13 +139,7 @@ public class TestContainerStateMachine {
             .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
             .getContainerPath()));
 
-    try {
-      key.close();
-      Assert.fail();
-    } catch (IOException ioe) {
-      Assert.assertTrue(ioe.getMessage().contains(
-          "Requested operation not allowed as ContainerState is UNHEALTHY"));
-    }
+    key.close();
     // Make sure the container is marked unhealthy
     Assert.assertTrue(
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 744f687..469eeb0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
@@ -58,6 +59,8 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
     HDDS_SCM_WATCHER_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
     OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
+    OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
@@ -96,6 +99,8 @@ public class TestContainerStateMachineFailures {
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
+        TimeUnit.SECONDS);
     conf.setQuietMode(false);
     cluster =
         MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
@@ -126,10 +131,14 @@ public class TestContainerStateMachineFailures {
         objectStore.getVolume(volumeName).getBucket(bucketName)
             .createKey("ratis", 1024, ReplicationType.RATIS,
                 ReplicationFactor.ONE, new HashMap<>());
+    byte[] testData = "ratis".getBytes();
+    long written = 0;
     // First write and flush creates a container in the datanode
-    key.write("ratis".getBytes());
+    key.write(testData);
+    written += testData.length;
     key.flush();
-    key.write("ratis".getBytes());
+    key.write(testData);
+    written += testData.length;
 
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
@@ -148,13 +157,7 @@ public class TestContainerStateMachineFailures {
             .getContainer().getContainerSet()
             .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
             .getContainerPath()));
-    try {
-      key.close();
-      Assert.fail();
-    } catch (IOException ioe) {
-      Assert.assertTrue(ioe.getMessage().contains(
-          "Requested operation not allowed as ContainerState is UNHEALTHY"));
-    }
+    key.close();
     long containerID = omKeyLocationInfo.getContainerID();
 
     // Make sure the container is marked unhealthy
@@ -170,27 +173,28 @@ public class TestContainerStateMachineFailures {
     HddsDispatcher dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
     Assert.assertTrue(dispatcher.getMissingContainerSet().isEmpty());
 
-    // restart the hdds datanode and see if the container is listed in the
-    // in the missing container set and not in the regular set
+    // restart the hdds datanode, container should not in the regular set
     cluster.restartHddsDatanode(0, true);
     ozoneContainer = cluster.getHddsDatanodes().get(0)
         .getDatanodeStateMachine().getContainer();
-    dispatcher = (HddsDispatcher) ozoneContainer.getDispatcher();
-
     Assert
         .assertNull(ozoneContainer.getContainerSet().getContainer(containerID));
-    Assert.assertTrue(dispatcher.getMissingContainerSet()
-        .contains(containerID));
-    ContainerProtos.ContainerCommandRequestProto.Builder request =
-        ContainerProtos.ContainerCommandRequestProto.newBuilder();
-    request.setCmdType(ContainerProtos.Type.CreateContainer);
-    request.setContainerID(containerID);
-    request.setCreateContainer(
-        ContainerProtos.CreateContainerRequestProto.getDefaultInstance());
-    request.setDatanodeUuid(
-        cluster.getHddsDatanodes().get(0).getDatanodeDetails().getUuidString());
-    Assert.assertEquals(ContainerProtos.Result.CONTAINER_MISSING,
-        dispatcher.dispatch(request.build(), null).getResult());
+
+    OzoneKeyDetails keyDetails = objectStore.getVolume(volumeName)
+        .getBucket(bucketName).getKey("ratis");
+
+    /**
+     * Ensure length of data stored in key is equal to number of bytes written.
+     */
+    Assert.assertTrue("Number of bytes stored in the key is not equal " +
+        "to number of bytes written.", keyDetails.getDataSize() == written);
+
+    /**
+     * Pending data from the second write should get written to a new container
+     * during key.close() because the first container is UNHEALTHY by that time
+     */
+    Assert.assertTrue("Expect Key to be stored in 2 separate containers",
+        keyDetails.getOzoneKeyLocations().size() == 2);
   }
 
   @Test
@@ -224,13 +228,9 @@ public class TestContainerStateMachineFailures {
         (KeyValueContainerData) containerData;
     // delete the container db file
     FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath()));
-    try {
-      key.close();
-      Assert.fail();
-    } catch (IOException ioe) {
-      Assert.assertTrue(ioe.getMessage().contains(
-          "Requested operation not allowed as ContainerState is UNHEALTHY"));
-    }
+
+    key.close();
+
     long containerID = omKeyLocationInfo.getContainerID();
 
     // Make sure the container is marked unhealthy


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org