You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/02/10 05:24:07 UTC

[hadoop] branch trunk updated: HDDS-1026. Reads should fail over to alternate replica. Contributed by Shashikant Banerjee.

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 965d26c  HDDS-1026. Reads should fail over to alternate replica. Contributed by Shashikant Banerjee.
965d26c is described below

commit 965d26c9c758bb0211cb918e95fa661194e771d3
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Sun Feb 10 10:53:16 2019 +0530

    HDDS-1026. Reads should fail over to alternate replica. Contributed by Shashikant Banerjee.
---
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |  51 +++++++--
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java |   9 +-
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |  72 +++++++++---
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |   6 +-
 ...ientAsyncReply.java => XceiverClientReply.java} |  23 +++-
 .../apache/hadoop/hdds/scm/XceiverClientSpi.java   |  27 ++++-
 .../hdds/scm/storage/ContainerProtocolCalls.java   |  23 ++--
 .../client/rpc/TestOzoneRpcClientAbstract.java     | 126 +++++++++++++++++----
 8 files changed, 266 insertions(+), 71 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 5c8ca26..6fa54a5 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -57,6 +57,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 /**
  * A Client for the storageContainer protocol.
@@ -198,11 +199,27 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   @Override
   public ContainerCommandResponseProto sendCommand(
       ContainerCommandRequestProto request) throws IOException {
-    return sendCommandWithRetry(request);
+    try {
+      XceiverClientReply reply;
+      reply = sendCommandWithRetry(request, null);
+      ContainerCommandResponseProto responseProto = reply.getResponse().get();
+      return responseProto;
+    } catch (ExecutionException | InterruptedException e) {
+      throw new IOException("Failed to execute command " + request, e);
+    }
   }
 
-  public ContainerCommandResponseProto sendCommandWithRetry(
-      ContainerCommandRequestProto request) throws IOException {
+  @Override
+  public XceiverClientReply sendCommand(
+      ContainerCommandRequestProto request, List<UUID> excludeDns)
+      throws IOException {
+    Preconditions.checkState(HddsUtils.isReadOnly(request));
+    return sendCommandWithRetry(request, excludeDns);
+  }
+
+  private XceiverClientReply sendCommandWithRetry(
+      ContainerCommandRequestProto request, List<UUID> excludeDns)
+      throws IOException {
     ContainerCommandResponseProto responseProto = null;
 
     // In case of an exception or an error, we will try to read from the
@@ -211,13 +228,24 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     // TODO: cache the correct leader info in here, so that any subsequent calls
     // should first go to leader
     List<DatanodeDetails> dns = pipeline.getNodes();
-    for (DatanodeDetails dn : dns) {
+    DatanodeDetails datanode = null;
+    List<DatanodeDetails> healthyDns =
+        excludeDns != null ? dns.stream().filter(dnId -> {
+          for (UUID excludeId : excludeDns) {
+            if (dnId.getUuid().equals(excludeId)) {
+              return false;
+            }
+          }
+          return true;
+        }).collect(Collectors.toList()) : dns;
+    for (DatanodeDetails dn : healthyDns) {
       try {
         LOG.debug("Executing command " + request + " on datanode " + dn);
         // In case the command gets retried on a 2nd datanode,
         // sendCommandAsyncCall will create a new channel and async stub
         // in case these don't exist for the specific datanode.
         responseProto = sendCommandAsync(request, dn).getResponse().get();
+        datanode = dn;
         if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
           break;
         }
@@ -226,14 +254,15 @@ public class XceiverClientGrpc extends XceiverClientSpi {
             .getUuidString(), e);
         if (Status.fromThrowable(e.getCause()).getCode()
             == Status.UNAUTHENTICATED.getCode()) {
-          throw new SCMSecurityException("Failed to authenticate with " +
-              "GRPC XceiverServer with Ozone block token.");
+          throw new SCMSecurityException("Failed to authenticate with "
+              + "GRPC XceiverServer with Ozone block token.");
         }
       }
     }
 
     if (responseProto != null) {
-      return responseProto;
+      return new XceiverClientReply(
+          CompletableFuture.completedFuture(responseProto), datanode.getUuid());
     } else {
       throw new IOException(
           "Failed to execute command " + request + " on the pipeline "
@@ -256,10 +285,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
    * @throws IOException
    */
   @Override
-  public XceiverClientAsyncReply sendCommandAsync(
+  public XceiverClientReply sendCommandAsync(
       ContainerCommandRequestProto request)
       throws IOException, ExecutionException, InterruptedException {
-    XceiverClientAsyncReply asyncReply =
+    XceiverClientReply asyncReply =
         sendCommandAsync(request, pipeline.getFirstNode());
 
     // TODO : for now make this API sync in nature as async requests are
@@ -272,7 +301,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     return asyncReply;
   }
 
-  private XceiverClientAsyncReply sendCommandAsync(
+  private XceiverClientReply sendCommandAsync(
       ContainerCommandRequestProto request, DatanodeDetails dn)
       throws IOException, ExecutionException, InterruptedException {
     if (closed) {
@@ -327,7 +356,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
             });
     requestObserver.onNext(request);
     requestObserver.onCompleted();
-    return new XceiverClientAsyncReply(replyFuture);
+    return new XceiverClientReply(replyFuture);
   }
 
   private void reconnect(DatanodeDetails dn, String encodedToken)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 338a198..c697b09 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -204,7 +204,6 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     return minIndex.isPresent() ? minIndex.getAsLong() : 0;
   }
 
-
   @Override
   public long watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException,
@@ -254,7 +253,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
       commitInfoMap.remove(address);
       LOG.info(
           "Could not commit " + index + " to all the nodes. Server " + address
-              + " has failed" + "Committed by majority.");
+              + " has failed." + " Committed by majority.");
     }
     return index;
   }
@@ -266,9 +265,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
    * @return Response to the command
    */
   @Override
-  public XceiverClientAsyncReply sendCommandAsync(
+  public XceiverClientReply sendCommandAsync(
       ContainerCommandRequestProto request) {
-    XceiverClientAsyncReply asyncReply = new XceiverClientAsyncReply(null);
+    XceiverClientReply asyncReply = new XceiverClientReply(null);
     CompletableFuture<RaftClientReply> raftClientReply =
         sendRequestAsync(request);
     CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
@@ -291,6 +290,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
                 if (response.getResult() == ContainerProtos.Result.SUCCESS) {
                   updateCommitInfosMap(reply.getCommitInfos());
                   asyncReply.setLogIndex(reply.getLogIndex());
+                  asyncReply.setDatanode(
+                      RatisHelper.toDatanodeId(reply.getReplierId()));
                 }
                 return response;
               } catch (InvalidProtocolBufferException e) {
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 5303efd..001feda 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.ozone.common.Checksum;
@@ -35,8 +38,11 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 
 /**
  * An {@link InputStream} used by the REST service in combination with the
@@ -204,27 +210,57 @@ public class BlockInputStream extends InputStream implements Seekable {
     // On every chunk read chunkIndex should be increased so as to read the
     // next chunk
     chunkIndex += 1;
-    final ReadChunkResponseProto readChunkResponse;
+    XceiverClientReply reply;
+    ReadChunkResponseProto readChunkResponse = null;
     final ChunkInfo chunkInfo = chunks.get(chunkIndex);
-    try {
-      readChunkResponse = ContainerProtocolCalls
-          .readChunk(xceiverClient, chunkInfo, blockID, traceID);
-    } catch (IOException e) {
-      if (e instanceof StorageContainerException) {
-        throw e;
+    List<UUID> excludeDns = null;
+    ByteString byteString;
+    List<DatanodeDetails> dnList = xceiverClient.getPipeline().getNodes();
+    while (true) {
+      try {
+        reply = ContainerProtocolCalls
+            .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
+        ContainerProtos.ContainerCommandResponseProto response;
+        response = reply.getResponse().get();
+        ContainerProtocolCalls.validateContainerResponse(response);
+        readChunkResponse = response.getReadChunk();
+      } catch (IOException e) {
+        if (e instanceof StorageContainerException) {
+          throw e;
+        }
+        throw new IOException("Unexpected OzoneException: " + e.toString(), e);
+      } catch (ExecutionException | InterruptedException e) {
+        throw new IOException(
+            "Failed to execute ReadChunk command for chunk  " + chunkInfo
+                .getChunkName(), e);
+      }
+      byteString = readChunkResponse.getData();
+      try {
+        if (byteString.size() != chunkInfo.getLen()) {
+          // Bytes read from chunk should be equal to chunk size.
+          throw new IOException(String
+              .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
+                  chunkInfo.getChunkName(), chunkInfo.getLen(),
+                  byteString.size()));
+        }
+        ChecksumData checksumData =
+            ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
+        Checksum.verifyChecksum(byteString, checksumData);
+        break;
+      } catch (IOException ioe) {
+        // we will end up in this situation only if the checksum mismatch
+        // happens or the length of the chunk mismatches.
+        // In this case, read should be retried on a different replica.
+        // TODO: Inform SCM of a possible corrupt container replica here
+        if (excludeDns == null) {
+          excludeDns = new ArrayList<>();
+        }
+        excludeDns.add(reply.getDatanode());
+        if (excludeDns.size() == dnList.size()) {
+          throw ioe;
+        }
       }
-      throw new IOException("Unexpected OzoneException: " + e.toString(), e);
-    }
-    ByteString byteString = readChunkResponse.getData();
-    if (byteString.size() != chunkInfo.getLen()) {
-      // Bytes read from chunk should be equal to chunk size.
-      throw new IOException(String
-          .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
-              chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size()));
     }
-    ChecksumData checksumData = ChecksumData.getFromProtoBuf(
-        chunkInfo.getChecksumData());
-    Checksum.verifyChecksum(byteString, checksumData);
 
     buffers = byteString.asReadOnlyByteBufferList();
     bufferIndex = 0;
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 6cc0b54..4c6cd7b 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.hdds.scm.storage;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.common.Checksum;
@@ -379,7 +379,7 @@ public class BlockOutputStream extends OutputStream {
     CompletableFuture<ContainerProtos.
         ContainerCommandResponseProto> flushFuture;
     try {
-      XceiverClientAsyncReply asyncReply =
+      XceiverClientReply asyncReply =
           putBlockAsync(xceiverClient, containerBlockData.build(), requestId);
       CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
           asyncReply.getResponse();
@@ -598,7 +598,7 @@ public class BlockOutputStream extends OutputStream {
         traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo
             .getChunkName();
     try {
-      XceiverClientAsyncReply asyncReply =
+      XceiverClientReply asyncReply =
           writeChunkAsync(xceiverClient, chunkInfo, blockID, data, requestId);
       CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
           asyncReply.getResponse();
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java
similarity index 79%
rename from hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java
rename to hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java
index 40d97bf..5678555 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java
@@ -21,20 +21,29 @@ package org.apache.hadoop.hdds.scm;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
+
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * This class represents the Async reply from XceiverClient.
  */
-public class XceiverClientAsyncReply {
+public class XceiverClientReply {
 
   private CompletableFuture<ContainerCommandResponseProto> response;
   private Long logIndex;
+  private UUID dnId;
 
-  public XceiverClientAsyncReply(
+  public XceiverClientReply(
       CompletableFuture<ContainerCommandResponseProto> response) {
-    this.logIndex = (long)0;
+    this(response, null);
+  }
+
+  public XceiverClientReply(
+      CompletableFuture<ContainerCommandResponseProto> response, UUID dnId) {
+    this.logIndex = (long) 0;
     this.response = response;
+    this.dnId = dnId;
   }
 
   public CompletableFuture<ContainerCommandResponseProto> getResponse() {
@@ -49,6 +58,14 @@ public class XceiverClientAsyncReply {
     this.logIndex = logIndex;
   }
 
+  public UUID getDatanode() {
+    return dnId;
+  }
+
+  public void setDatanode(UUID datanodeId) {
+    this.dnId = datanodeId;
+  }
+
   public void setResponse(
       CompletableFuture<ContainerCommandResponseProto> response) {
     this.response = response;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index 9da74af..0d4b3a4 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -104,7 +106,7 @@ public abstract class XceiverClientSpi implements Closeable {
   public ContainerCommandResponseProto sendCommand(
       ContainerCommandRequestProto request) throws IOException {
     try {
-      XceiverClientAsyncReply reply;
+      XceiverClientReply reply;
       reply = sendCommandAsync(request);
       ContainerCommandResponseProto responseProto = reply.getResponse().get();
       return responseProto;
@@ -114,13 +116,34 @@ public abstract class XceiverClientSpi implements Closeable {
   }
 
   /**
+   * Sends a given command to server and gets the reply back along with
+   * the server associated info.
+   * @param request Request
+   * @param excludeDns list of servers on which the command won't be sent to.
+   * @return Response to the command
+   * @throws IOException
+   */
+  public XceiverClientReply sendCommand(
+      ContainerCommandRequestProto request, List<UUID> excludeDns)
+      throws IOException {
+    try {
+      XceiverClientReply reply;
+      reply = sendCommandAsync(request);
+      reply.getResponse().get();
+      return reply;
+    } catch (ExecutionException | InterruptedException e) {
+      throw new IOException("Failed to command " + request, e);
+    }
+  }
+
+  /**
    * Sends a given command to server gets a waitable future back.
    *
    * @param request Request
    * @return Response to the command
    * @throws IOException
    */
-  public abstract XceiverClientAsyncReply
+  public abstract XceiverClientReply
       sendCommandAsync(ContainerCommandRequestProto request)
       throws IOException, ExecutionException, InterruptedException;
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 114b6e6..6296831 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
-import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .BlockNotCommittedException;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
@@ -58,8 +58,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ReadChunkRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ReadChunkResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ReadContainerRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ReadContainerResponseProto;
@@ -72,6 +70,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
 import org.apache.hadoop.hdds.client.BlockID;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 
 /**
@@ -199,7 +199,7 @@ public final class ContainerProtocolCalls  {
    * @throws InterruptedException
    * @throws ExecutionException
    */
-  public static XceiverClientAsyncReply putBlockAsync(
+  public static XceiverClientReply putBlockAsync(
       XceiverClientSpi xceiverClient, BlockData containerBlockData,
       String traceID)
       throws IOException, InterruptedException, ExecutionException {
@@ -217,7 +217,6 @@ public final class ContainerProtocolCalls  {
       builder.setEncodedToken(encodedToken);
     }
     ContainerCommandRequestProto request = builder.build();
-    xceiverClient.sendCommand(request);
     return xceiverClient.sendCommandAsync(request);
   }
 
@@ -228,11 +227,13 @@ public final class ContainerProtocolCalls  {
    * @param chunk information about chunk to read
    * @param blockID ID of the block
    * @param traceID container protocol call args
+   * @param excludeDns datamode to exclude while executing the command
    * @return container protocol read chunk response
    * @throws IOException if there is an I/O error while performing the call
    */
-  public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient,
-      ChunkInfo chunk, BlockID blockID, String traceID) throws IOException {
+  public static XceiverClientReply readChunk(XceiverClientSpi xceiverClient,
+      ChunkInfo chunk, BlockID blockID, String traceID, List<UUID> excludeDns)
+      throws IOException {
     ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
         .newBuilder()
         .setBlockID(blockID.getDatanodeBlockIDProtobuf())
@@ -251,9 +252,9 @@ public final class ContainerProtocolCalls  {
       builder.setEncodedToken(encodedToken);
     }
     ContainerCommandRequestProto request = builder.build();
-    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
-    validateContainerResponse(response);
-    return response.getReadChunk();
+    XceiverClientReply reply =
+        xceiverClient.sendCommand(request, excludeDns);
+    return reply;
   }
 
   /**
@@ -302,7 +303,7 @@ public final class ContainerProtocolCalls  {
    * @param traceID container protocol call args
    * @throws IOException if there is an I/O error while performing the call
    */
-  public static XceiverClientAsyncReply writeChunkAsync(
+  public static XceiverClientReply writeChunkAsync(
       XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
       ByteString data, String traceID)
       throws IOException, ExecutionException, InterruptedException {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index c9c1c1c..fcf9465 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
@@ -879,7 +880,7 @@ public abstract class TestOzoneRpcClientAbstract {
 
     // Write data into a key
     OzoneOutputStream out = bucket.createKey(keyName,
-        value.getBytes().length, ReplicationType.STAND_ALONE,
+        value.getBytes().length, ReplicationType.RATIS,
         ReplicationFactor.ONE, new HashMap<>());
     out.write(value.getBytes());
     out.close();
@@ -889,8 +890,6 @@ public abstract class TestOzoneRpcClientAbstract {
     OzoneKey key = bucket.getKey(keyName);
     long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
         .getContainerID();
-    long localID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
-        .getLocalID();
 
     // Get the container by traversing the datanodes. Atleast one of the
     // datanode must have this container.
@@ -903,15 +902,114 @@ public abstract class TestOzoneRpcClientAbstract {
       }
     }
     Assert.assertNotNull("Container not found", container);
+    corruptData(container, key);
+
+    // Try reading the key. Since the chunk file is corrupted, it should
+    // throw a checksum mismatch exception.
+    try {
+      OzoneInputStream is = bucket.readKey(keyName);
+      is.read(new byte[100]);
+      fail("Reading corrupted data should fail.");
+    } catch (OzoneChecksumException e) {
+      GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
+    }
+  }
+
+  /**
+   * Tests reading a corrputed chunk file throws checksum exception.
+   * @throws IOException
+   */
+  @Test
+  public void testReadKeyWithCorruptedDataWithMutiNodes() throws IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
 
+    String value = "sample value";
+    byte[] data = value.getBytes();
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    String keyName = UUID.randomUUID().toString();
+
+    // Write data into a key
+    OzoneOutputStream out = bucket.createKey(keyName,
+        value.getBytes().length, ReplicationType.RATIS,
+        ReplicationFactor.THREE, new HashMap<>());
+    out.write(value.getBytes());
+    out.close();
+
+    // We need to find the location of the chunk file corresponding to the
+    // data we just wrote.
+    OzoneKey key = bucket.getKey(keyName);
+    List<OzoneKeyLocation> keyLocation =
+        ((OzoneKeyDetails) key).getOzoneKeyLocations();
+    Assert.assertTrue("Key location not found in OM", !keyLocation.isEmpty());
+    long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
+        .getContainerID();
+
+    // Get the container by traversing the datanodes.
+    List<Container> containerList = new ArrayList<>();
+    Container container;
+    for (HddsDatanodeService hddsDatanode : cluster.getHddsDatanodes()) {
+      container = hddsDatanode.getDatanodeStateMachine().getContainer()
+          .getContainerSet().getContainer(containerID);
+      if (container != null) {
+        containerList.add(container);
+        if (containerList.size() == 3) {
+          break;
+        }
+      }
+    }
+    Assert.assertTrue("Container not found", !containerList.isEmpty());
+    corruptData(containerList.get(0), key);
+    // Try reading the key. Read will fail on the first node and will eventually
+    // failover to next replica
+    try {
+      OzoneInputStream is = bucket.readKey(keyName);
+      byte[] b = new byte[data.length];
+      is.read(b);
+      Assert.assertTrue(Arrays.equals(b, data));
+    } catch (OzoneChecksumException e) {
+      fail("Reading corrupted data should not fail.");
+    }
+    corruptData(containerList.get(1), key);
+    // Try reading the key. Read will fail on the first node and will eventually
+    // failover to next replica
+    try {
+      OzoneInputStream is = bucket.readKey(keyName);
+      byte[] b = new byte[data.length];
+      is.read(b);
+      Assert.assertTrue(Arrays.equals(b, data));
+    } catch (OzoneChecksumException e) {
+      fail("Reading corrupted data should not fail.");
+    }
+    corruptData(containerList.get(2), key);
+    // Try reading the key. Read will fail here as all the replica are corrupt
+    try {
+      OzoneInputStream is = bucket.readKey(keyName);
+      byte[] b = new byte[data.length];
+      is.read(b);
+      fail("Reading corrupted data should fail.");
+    } catch (OzoneChecksumException e) {
+      GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
+    }
+  }
+
+  private void corruptData(Container container, OzoneKey key)
+      throws IOException {
+    long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
+        .getContainerID();
+    long localID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
+        .getLocalID();
     // From the containerData, get the block iterator for all the blocks in
     // the container.
     KeyValueContainerData containerData =
         (KeyValueContainerData) container.getContainerData();
-    String containerPath = new File(containerData.getMetadataPath())
-        .getParent();
-    KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
-        containerID, new File(containerPath));
+    String containerPath =
+        new File(containerData.getMetadataPath()).getParent();
+    KeyValueBlockIterator keyValueBlockIterator =
+        new KeyValueBlockIterator(containerID, new File(containerPath));
 
     // Find the block corresponding to the key we put. We use the localID of
     // the BlockData to identify out key.
@@ -926,8 +1024,8 @@ public abstract class TestOzoneRpcClientAbstract {
 
     // Get the location of the chunk file
     String chunkName = blockData.getChunks().get(0).getChunkName();
-    String containreBaseDir = container.getContainerData().getVolume()
-        .getHddsRootDir().getPath();
+    String containreBaseDir =
+        container.getContainerData().getVolume().getHddsRootDir().getPath();
     File chunksLocationPath = KeyValueContainerLocationUtil
         .getChunksLocationPath(containreBaseDir, SCM_ID, containerID);
     File chunkFile = new File(chunksLocationPath, chunkName);
@@ -935,16 +1033,6 @@ public abstract class TestOzoneRpcClientAbstract {
     // Corrupt the contents of the chunk file
     String newData = new String("corrupted data");
     FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes());
-
-    // Try reading the key. Since the chunk file is corrupted, it should
-    // throw a checksum mismatch exception.
-    try {
-      OzoneInputStream is = bucket.readKey(keyName);
-      is.read(new byte[100]);
-      fail("Reading corrupted data should fail.");
-    } catch (OzoneChecksumException e) {
-      GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
-    }
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org