You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/02/10 05:24:07 UTC
[hadoop] branch trunk updated: HDDS-1026. Reads should fail over to
alternate replica. Contributed by Shashikant Banerjee.
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 965d26c HDDS-1026. Reads should fail over to alternate replica. Contributed by Shashikant Banerjee.
965d26c is described below
commit 965d26c9c758bb0211cb918e95fa661194e771d3
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Sun Feb 10 10:53:16 2019 +0530
HDDS-1026. Reads should fail over to alternate replica. Contributed by Shashikant Banerjee.
---
.../apache/hadoop/hdds/scm/XceiverClientGrpc.java | 51 +++++++--
.../apache/hadoop/hdds/scm/XceiverClientRatis.java | 9 +-
.../hadoop/hdds/scm/storage/BlockInputStream.java | 72 +++++++++---
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 6 +-
...ientAsyncReply.java => XceiverClientReply.java} | 23 +++-
.../apache/hadoop/hdds/scm/XceiverClientSpi.java | 27 ++++-
.../hdds/scm/storage/ContainerProtocolCalls.java | 23 ++--
.../client/rpc/TestOzoneRpcClientAbstract.java | 126 +++++++++++++++++----
8 files changed, 266 insertions(+), 71 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 5c8ca26..6fa54a5 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -57,6 +57,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
/**
* A Client for the storageContainer protocol.
@@ -198,11 +199,27 @@ public class XceiverClientGrpc extends XceiverClientSpi {
@Override
public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException {
- return sendCommandWithRetry(request);
+ try {
+ XceiverClientReply reply;
+ reply = sendCommandWithRetry(request, null);
+ ContainerCommandResponseProto responseProto = reply.getResponse().get();
+ return responseProto;
+ } catch (ExecutionException | InterruptedException e) {
+ throw new IOException("Failed to execute command " + request, e);
+ }
}
- public ContainerCommandResponseProto sendCommandWithRetry(
- ContainerCommandRequestProto request) throws IOException {
+ @Override
+ public XceiverClientReply sendCommand(
+ ContainerCommandRequestProto request, List<UUID> excludeDns)
+ throws IOException {
+ Preconditions.checkState(HddsUtils.isReadOnly(request));
+ return sendCommandWithRetry(request, excludeDns);
+ }
+
+ private XceiverClientReply sendCommandWithRetry(
+ ContainerCommandRequestProto request, List<UUID> excludeDns)
+ throws IOException {
ContainerCommandResponseProto responseProto = null;
// In case of an exception or an error, we will try to read from the
@@ -211,13 +228,24 @@ public class XceiverClientGrpc extends XceiverClientSpi {
// TODO: cache the correct leader info in here, so that any subsequent calls
// should first go to leader
List<DatanodeDetails> dns = pipeline.getNodes();
- for (DatanodeDetails dn : dns) {
+ DatanodeDetails datanode = null;
+ List<DatanodeDetails> healthyDns =
+ excludeDns != null ? dns.stream().filter(dnId -> {
+ for (UUID excludeId : excludeDns) {
+ if (dnId.getUuid().equals(excludeId)) {
+ return false;
+ }
+ }
+ return true;
+ }).collect(Collectors.toList()) : dns;
+ for (DatanodeDetails dn : healthyDns) {
try {
LOG.debug("Executing command " + request + " on datanode " + dn);
// In case the command gets retried on a 2nd datanode,
// sendCommandAsyncCall will create a new channel and async stub
// in case these don't exist for the specific datanode.
responseProto = sendCommandAsync(request, dn).getResponse().get();
+ datanode = dn;
if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
break;
}
@@ -226,14 +254,15 @@ public class XceiverClientGrpc extends XceiverClientSpi {
.getUuidString(), e);
if (Status.fromThrowable(e.getCause()).getCode()
== Status.UNAUTHENTICATED.getCode()) {
- throw new SCMSecurityException("Failed to authenticate with " +
- "GRPC XceiverServer with Ozone block token.");
+ throw new SCMSecurityException("Failed to authenticate with "
+ + "GRPC XceiverServer with Ozone block token.");
}
}
}
if (responseProto != null) {
- return responseProto;
+ return new XceiverClientReply(
+ CompletableFuture.completedFuture(responseProto), datanode.getUuid());
} else {
throw new IOException(
"Failed to execute command " + request + " on the pipeline "
@@ -256,10 +285,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
* @throws IOException
*/
@Override
- public XceiverClientAsyncReply sendCommandAsync(
+ public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request)
throws IOException, ExecutionException, InterruptedException {
- XceiverClientAsyncReply asyncReply =
+ XceiverClientReply asyncReply =
sendCommandAsync(request, pipeline.getFirstNode());
// TODO : for now make this API sync in nature as async requests are
@@ -272,7 +301,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
return asyncReply;
}
- private XceiverClientAsyncReply sendCommandAsync(
+ private XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request, DatanodeDetails dn)
throws IOException, ExecutionException, InterruptedException {
if (closed) {
@@ -327,7 +356,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
});
requestObserver.onNext(request);
requestObserver.onCompleted();
- return new XceiverClientAsyncReply(replyFuture);
+ return new XceiverClientReply(replyFuture);
}
private void reconnect(DatanodeDetails dn, String encodedToken)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 338a198..c697b09 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -204,7 +204,6 @@ public final class XceiverClientRatis extends XceiverClientSpi {
return minIndex.isPresent() ? minIndex.getAsLong() : 0;
}
-
@Override
public long watchForCommit(long index, long timeout)
throws InterruptedException, ExecutionException, TimeoutException,
@@ -254,7 +253,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
commitInfoMap.remove(address);
LOG.info(
"Could not commit " + index + " to all the nodes. Server " + address
- + " has failed" + "Committed by majority.");
+ + " has failed." + " Committed by majority.");
}
return index;
}
@@ -266,9 +265,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
* @return Response to the command
*/
@Override
- public XceiverClientAsyncReply sendCommandAsync(
+ public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request) {
- XceiverClientAsyncReply asyncReply = new XceiverClientAsyncReply(null);
+ XceiverClientReply asyncReply = new XceiverClientReply(null);
CompletableFuture<RaftClientReply> raftClientReply =
sendRequestAsync(request);
CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
@@ -291,6 +290,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
if (response.getResult() == ContainerProtos.Result.SUCCESS) {
updateCommitInfosMap(reply.getCommitInfos());
asyncReply.setLogIndex(reply.getLogIndex());
+ asyncReply.setDatanode(
+ RatisHelper.toDatanodeId(reply.getReplierId()));
}
return response;
} catch (InvalidProtocolBufferException e) {
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 5303efd..001feda 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hdds.scm.storage;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.common.Checksum;
@@ -35,8 +38,11 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
/**
* An {@link InputStream} used by the REST service in combination with the
@@ -204,27 +210,57 @@ public class BlockInputStream extends InputStream implements Seekable {
// On every chunk read chunkIndex should be increased so as to read the
// next chunk
chunkIndex += 1;
- final ReadChunkResponseProto readChunkResponse;
+ XceiverClientReply reply;
+ ReadChunkResponseProto readChunkResponse = null;
final ChunkInfo chunkInfo = chunks.get(chunkIndex);
- try {
- readChunkResponse = ContainerProtocolCalls
- .readChunk(xceiverClient, chunkInfo, blockID, traceID);
- } catch (IOException e) {
- if (e instanceof StorageContainerException) {
- throw e;
+ List<UUID> excludeDns = null;
+ ByteString byteString;
+ List<DatanodeDetails> dnList = xceiverClient.getPipeline().getNodes();
+ while (true) {
+ try {
+ reply = ContainerProtocolCalls
+ .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
+ ContainerProtos.ContainerCommandResponseProto response;
+ response = reply.getResponse().get();
+ ContainerProtocolCalls.validateContainerResponse(response);
+ readChunkResponse = response.getReadChunk();
+ } catch (IOException e) {
+ if (e instanceof StorageContainerException) {
+ throw e;
+ }
+ throw new IOException("Unexpected OzoneException: " + e.toString(), e);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new IOException(
+ "Failed to execute ReadChunk command for chunk " + chunkInfo
+ .getChunkName(), e);
+ }
+ byteString = readChunkResponse.getData();
+ try {
+ if (byteString.size() != chunkInfo.getLen()) {
+ // Bytes read from chunk should be equal to chunk size.
+ throw new IOException(String
+ .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
+ chunkInfo.getChunkName(), chunkInfo.getLen(),
+ byteString.size()));
+ }
+ ChecksumData checksumData =
+ ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
+ Checksum.verifyChecksum(byteString, checksumData);
+ break;
+ } catch (IOException ioe) {
+ // we will end up in this situation only if the checksum mismatch
+ // happens or the length of the chunk mismatches.
+ // In this case, read should be retried on a different replica.
+ // TODO: Inform SCM of a possible corrupt container replica here
+ if (excludeDns == null) {
+ excludeDns = new ArrayList<>();
+ }
+ excludeDns.add(reply.getDatanode());
+ if (excludeDns.size() == dnList.size()) {
+ throw ioe;
+ }
}
- throw new IOException("Unexpected OzoneException: " + e.toString(), e);
- }
- ByteString byteString = readChunkResponse.getData();
- if (byteString.size() != chunkInfo.getLen()) {
- // Bytes read from chunk should be equal to chunk size.
- throw new IOException(String
- .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
- chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size()));
}
- ChecksumData checksumData = ChecksumData.getFromProtoBuf(
- chunkInfo.getChecksumData());
- Checksum.verifyChecksum(byteString, checksumData);
buffers = byteString.asReadOnlyByteBufferList();
bufferIndex = 0;
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 6cc0b54..4c6cd7b 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
@@ -379,7 +379,7 @@ public class BlockOutputStream extends OutputStream {
CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> flushFuture;
try {
- XceiverClientAsyncReply asyncReply =
+ XceiverClientReply asyncReply =
putBlockAsync(xceiverClient, containerBlockData.build(), requestId);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse();
@@ -598,7 +598,7 @@ public class BlockOutputStream extends OutputStream {
traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo
.getChunkName();
try {
- XceiverClientAsyncReply asyncReply =
+ XceiverClientReply asyncReply =
writeChunkAsync(xceiverClient, chunkInfo, blockID, data, requestId);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse();
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java
similarity index 79%
rename from hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java
rename to hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java
index 40d97bf..5678555 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java
@@ -21,20 +21,29 @@ package org.apache.hadoop.hdds.scm;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
+
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
/**
* This class represents the Async reply from XceiverClient.
*/
-public class XceiverClientAsyncReply {
+public class XceiverClientReply {
private CompletableFuture<ContainerCommandResponseProto> response;
private Long logIndex;
+ private UUID dnId;
- public XceiverClientAsyncReply(
+ public XceiverClientReply(
CompletableFuture<ContainerCommandResponseProto> response) {
- this.logIndex = (long)0;
+ this(response, null);
+ }
+
+ public XceiverClientReply(
+ CompletableFuture<ContainerCommandResponseProto> response, UUID dnId) {
+ this.logIndex = (long) 0;
this.response = response;
+ this.dnId = dnId;
}
public CompletableFuture<ContainerCommandResponseProto> getResponse() {
@@ -49,6 +58,14 @@ public class XceiverClientAsyncReply {
this.logIndex = logIndex;
}
+ public UUID getDatanode() {
+ return dnId;
+ }
+
+ public void setDatanode(UUID datanodeId) {
+ this.dnId = datanodeId;
+ }
+
public void setResponse(
CompletableFuture<ContainerCommandResponseProto> response) {
this.response = response;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index 9da74af..0d4b3a4 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import java.io.Closeable;
import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -104,7 +106,7 @@ public abstract class XceiverClientSpi implements Closeable {
public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException {
try {
- XceiverClientAsyncReply reply;
+ XceiverClientReply reply;
reply = sendCommandAsync(request);
ContainerCommandResponseProto responseProto = reply.getResponse().get();
return responseProto;
@@ -114,13 +116,34 @@ public abstract class XceiverClientSpi implements Closeable {
}
/**
+ * Sends a given command to server and gets the reply back along with
+ * the server associated info.
+ * @param request Request
+ * @param excludeDns list of servers on which the command won't be sent to.
+ * @return Response to the command
+ * @throws IOException
+ */
+ public XceiverClientReply sendCommand(
+ ContainerCommandRequestProto request, List<UUID> excludeDns)
+ throws IOException {
+ try {
+ XceiverClientReply reply;
+ reply = sendCommandAsync(request);
+ reply.getResponse().get();
+ return reply;
+ } catch (ExecutionException | InterruptedException e) {
+ throw new IOException("Failed to command " + request, e);
+ }
+ }
+
+ /**
* Sends a given command to server gets a waitable future back.
*
* @param request Request
* @return Response to the command
* @throws IOException
*/
- public abstract XceiverClientAsyncReply
+ public abstract XceiverClientReply
sendCommandAsync(ContainerCommandRequestProto request)
throws IOException, ExecutionException, InterruptedException;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 114b6e6..6296831 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hdds.scm.storage;
-import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers
.BlockNotCommittedException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
@@ -58,8 +58,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ReadChunkResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadContainerResponseProto;
@@ -72,6 +70,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
import org.apache.hadoop.hdds.client.BlockID;
import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
/**
@@ -199,7 +199,7 @@ public final class ContainerProtocolCalls {
* @throws InterruptedException
* @throws ExecutionException
*/
- public static XceiverClientAsyncReply putBlockAsync(
+ public static XceiverClientReply putBlockAsync(
XceiverClientSpi xceiverClient, BlockData containerBlockData,
String traceID)
throws IOException, InterruptedException, ExecutionException {
@@ -217,7 +217,6 @@ public final class ContainerProtocolCalls {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
- xceiverClient.sendCommand(request);
return xceiverClient.sendCommandAsync(request);
}
@@ -228,11 +227,13 @@ public final class ContainerProtocolCalls {
* @param chunk information about chunk to read
* @param blockID ID of the block
* @param traceID container protocol call args
+ * @param excludeDns datamode to exclude while executing the command
* @return container protocol read chunk response
* @throws IOException if there is an I/O error while performing the call
*/
- public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient,
- ChunkInfo chunk, BlockID blockID, String traceID) throws IOException {
+ public static XceiverClientReply readChunk(XceiverClientSpi xceiverClient,
+ ChunkInfo chunk, BlockID blockID, String traceID, List<UUID> excludeDns)
+ throws IOException {
ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
@@ -251,9 +252,9 @@ public final class ContainerProtocolCalls {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
- ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
- validateContainerResponse(response);
- return response.getReadChunk();
+ XceiverClientReply reply =
+ xceiverClient.sendCommand(request, excludeDns);
+ return reply;
}
/**
@@ -302,7 +303,7 @@ public final class ContainerProtocolCalls {
* @param traceID container protocol call args
* @throws IOException if there is an I/O error while performing the call
*/
- public static XceiverClientAsyncReply writeChunkAsync(
+ public static XceiverClientReply writeChunkAsync(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
ByteString data, String traceID)
throws IOException, ExecutionException, InterruptedException {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index c9c1c1c..fcf9465 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
+
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
@@ -879,7 +880,7 @@ public abstract class TestOzoneRpcClientAbstract {
// Write data into a key
OzoneOutputStream out = bucket.createKey(keyName,
- value.getBytes().length, ReplicationType.STAND_ALONE,
+ value.getBytes().length, ReplicationType.RATIS,
ReplicationFactor.ONE, new HashMap<>());
out.write(value.getBytes());
out.close();
@@ -889,8 +890,6 @@ public abstract class TestOzoneRpcClientAbstract {
OzoneKey key = bucket.getKey(keyName);
long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
.getContainerID();
- long localID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
- .getLocalID();
// Get the container by traversing the datanodes. Atleast one of the
// datanode must have this container.
@@ -903,15 +902,114 @@ public abstract class TestOzoneRpcClientAbstract {
}
}
Assert.assertNotNull("Container not found", container);
+ corruptData(container, key);
+
+ // Try reading the key. Since the chunk file is corrupted, it should
+ // throw a checksum mismatch exception.
+ try {
+ OzoneInputStream is = bucket.readKey(keyName);
+ is.read(new byte[100]);
+ fail("Reading corrupted data should fail.");
+ } catch (OzoneChecksumException e) {
+ GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
+ }
+ }
+
+ /**
+ * Tests reading a corrputed chunk file throws checksum exception.
+ * @throws IOException
+ */
+ @Test
+ public void testReadKeyWithCorruptedDataWithMutiNodes() throws IOException {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String value = "sample value";
+ byte[] data = value.getBytes();
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+ String keyName = UUID.randomUUID().toString();
+
+ // Write data into a key
+ OzoneOutputStream out = bucket.createKey(keyName,
+ value.getBytes().length, ReplicationType.RATIS,
+ ReplicationFactor.THREE, new HashMap<>());
+ out.write(value.getBytes());
+ out.close();
+
+ // We need to find the location of the chunk file corresponding to the
+ // data we just wrote.
+ OzoneKey key = bucket.getKey(keyName);
+ List<OzoneKeyLocation> keyLocation =
+ ((OzoneKeyDetails) key).getOzoneKeyLocations();
+ Assert.assertTrue("Key location not found in OM", !keyLocation.isEmpty());
+ long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
+ .getContainerID();
+
+ // Get the container by traversing the datanodes.
+ List<Container> containerList = new ArrayList<>();
+ Container container;
+ for (HddsDatanodeService hddsDatanode : cluster.getHddsDatanodes()) {
+ container = hddsDatanode.getDatanodeStateMachine().getContainer()
+ .getContainerSet().getContainer(containerID);
+ if (container != null) {
+ containerList.add(container);
+ if (containerList.size() == 3) {
+ break;
+ }
+ }
+ }
+ Assert.assertTrue("Container not found", !containerList.isEmpty());
+ corruptData(containerList.get(0), key);
+ // Try reading the key. Read will fail on the first node and will eventually
+ // failover to next replica
+ try {
+ OzoneInputStream is = bucket.readKey(keyName);
+ byte[] b = new byte[data.length];
+ is.read(b);
+ Assert.assertTrue(Arrays.equals(b, data));
+ } catch (OzoneChecksumException e) {
+ fail("Reading corrupted data should not fail.");
+ }
+ corruptData(containerList.get(1), key);
+ // Try reading the key. Read will fail on the first node and will eventually
+ // failover to next replica
+ try {
+ OzoneInputStream is = bucket.readKey(keyName);
+ byte[] b = new byte[data.length];
+ is.read(b);
+ Assert.assertTrue(Arrays.equals(b, data));
+ } catch (OzoneChecksumException e) {
+ fail("Reading corrupted data should not fail.");
+ }
+ corruptData(containerList.get(2), key);
+ // Try reading the key. Read will fail here as all the replica are corrupt
+ try {
+ OzoneInputStream is = bucket.readKey(keyName);
+ byte[] b = new byte[data.length];
+ is.read(b);
+ fail("Reading corrupted data should fail.");
+ } catch (OzoneChecksumException e) {
+ GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
+ }
+ }
+
+ private void corruptData(Container container, OzoneKey key)
+ throws IOException {
+ long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
+ .getContainerID();
+ long localID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
+ .getLocalID();
// From the containerData, get the block iterator for all the blocks in
// the container.
KeyValueContainerData containerData =
(KeyValueContainerData) container.getContainerData();
- String containerPath = new File(containerData.getMetadataPath())
- .getParent();
- KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
- containerID, new File(containerPath));
+ String containerPath =
+ new File(containerData.getMetadataPath()).getParent();
+ KeyValueBlockIterator keyValueBlockIterator =
+ new KeyValueBlockIterator(containerID, new File(containerPath));
// Find the block corresponding to the key we put. We use the localID of
// the BlockData to identify out key.
@@ -926,8 +1024,8 @@ public abstract class TestOzoneRpcClientAbstract {
// Get the location of the chunk file
String chunkName = blockData.getChunks().get(0).getChunkName();
- String containreBaseDir = container.getContainerData().getVolume()
- .getHddsRootDir().getPath();
+ String containreBaseDir =
+ container.getContainerData().getVolume().getHddsRootDir().getPath();
File chunksLocationPath = KeyValueContainerLocationUtil
.getChunksLocationPath(containreBaseDir, SCM_ID, containerID);
File chunkFile = new File(chunksLocationPath, chunkName);
@@ -935,16 +1033,6 @@ public abstract class TestOzoneRpcClientAbstract {
// Corrupt the contents of the chunk file
String newData = new String("corrupted data");
FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes());
-
- // Try reading the key. Since the chunk file is corrupted, it should
- // throw a checksum mismatch exception.
- try {
- OzoneInputStream is = bucket.readKey(keyName);
- is.read(new byte[100]);
- fail("Reading corrupted data should fail.");
- } catch (OzoneChecksumException e) {
- GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
- }
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org