You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/11/07 18:12:25 UTC
[ozone] 32/40: HDDS-6137. [Ozone-Streaming] Refactor KeyDataStreamOutput. (#3195)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 06bc9689ca3cf870355b64467041229b0223d738
Author: hao guo <gu...@360.cn>
AuthorDate: Mon Mar 28 16:16:31 2022 +0800
HDDS-6137. [Ozone-Streaming] Refactor KeyDataStreamOutput. (#3195)
(cherry picked from commit 88c4d59d9b780ad29cbf0bd1f7a3981a3b697c98)
(cherry picked from commit 0a432aa0eb57bd25d63e656d0ec6e96a05689ac7)
(cherry picked from commit 99b414df25ff29d4ec357e5cb96ce08ee4627611)
---
.../hdds/scm/storage/AbstractDataStreamOutput.java | 130 +++++++++++++++++++++
.../hdds/scm/storage/BlockDataStreamOutput.java | 36 +-----
.../org/apache/hadoop/hdds/ratis/RatisHelper.java | 32 +++++
.../ozone/container/ContainerTestHelper.java | 12 ++
.../server/ratis/ContainerStateMachine.java | 35 ++++--
.../ozone/container/keyvalue/KeyValueHandler.java | 33 +++---
.../keyvalue/impl/KeyValueStreamDataChannel.java | 2 +-
.../client/io/BlockDataStreamOutputEntryPool.java | 26 -----
.../ozone/client/io/KeyDataStreamOutput.java | 121 ++-----------------
.../client/rpc/TestBlockDataStreamOutput.java | 4 +-
.../rpc/TestContainerStateMachineStream.java | 59 ++++++++--
11 files changed, 279 insertions(+), 211 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
new file mode 100644
index 0000000000..e29670d781
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Map;
+
+/**
+ * This class is used for error handling methods.
+ */
+public abstract class AbstractDataStreamOutput
+ implements ByteBufferStreamOutput {
+
+ private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+ private int retryCount;
+ private boolean isException;
+
+ protected AbstractDataStreamOutput(
+ Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap) {
+ this.retryPolicyMap = retryPolicyMap;
+ this.isException = false;
+ this.retryCount = 0;
+ }
+
+ @VisibleForTesting
+ public int getRetryCount() {
+ return retryCount;
+ }
+
+ protected void resetRetryCount() {
+ retryCount = 0;
+ }
+
+ protected boolean isException() {
+ return isException;
+ }
+
+ /**
+ * Checks if the provided exception signifies retry failure in ratis client.
+ * In case of retry failure, ratis client throws RaftRetryFailureException
+ * and all succeeding operations are failed with AlreadyClosedException.
+ */
+ protected boolean checkForRetryFailure(Throwable t) {
+ return t instanceof RaftRetryFailureException
+ || t instanceof AlreadyClosedException;
+ }
+
+ // Every container specific exception from datatnode will be seen as
+ // StorageContainerException
+ protected boolean checkIfContainerToExclude(Throwable t) {
+ return t instanceof StorageContainerException;
+ }
+
+ protected void setExceptionAndThrow(IOException ioe) throws IOException {
+ isException = true;
+ throw ioe;
+ }
+
+ protected void handleRetry(IOException exception) throws IOException {
+ RetryPolicy retryPolicy = retryPolicyMap
+ .get(HddsClientUtils.checkForException(exception).getClass());
+ if (retryPolicy == null) {
+ retryPolicy = retryPolicyMap.get(Exception.class);
+ }
+ handleRetry(exception, retryPolicy);
+ }
+
+ protected void handleRetry(IOException exception, RetryPolicy retryPolicy)
+ throws IOException {
+ RetryPolicy.RetryAction action = null;
+ try {
+ action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
+ } catch (Exception e) {
+ setExceptionAndThrow(new IOException(e));
+ }
+ if (action != null &&
+ action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+ String msg = "";
+ if (action.reason != null) {
+ msg = "Retry request failed. " + action.reason;
+ }
+ setExceptionAndThrow(new IOException(msg, exception));
+ }
+
+ // Throw the exception if the thread is interrupted
+ if (Thread.currentThread().isInterrupted()) {
+ setExceptionAndThrow(exception);
+ }
+ Preconditions.checkNotNull(action);
+ Preconditions.checkArgument(
+ action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
+ if (action.delayMillis > 0) {
+ try {
+ Thread.sleep(action.delayMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ IOException ioe = (IOException) new InterruptedIOException(
+ "Interrupted: action=" + action + ", retry policy=" + retryPolicy)
+ .initCause(e);
+ setExceptionAndThrow(ioe);
+ }
+ }
+ retryCount++;
+ }
+}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 3df5eb0e12..d5b9dd9d81 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -44,8 +45,6 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.RoutingTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -208,44 +207,13 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
if (isDatastreamPipelineMode) {
return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
.stream(message.getContent().asReadOnlyByteBuffer(),
- getRoutingTable(pipeline));
+ RatisHelper.getRoutingTable(pipeline));
} else {
return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
.stream(message.getContent().asReadOnlyByteBuffer());
}
}
- public RoutingTable getRoutingTable(Pipeline pipeline) {
- RaftPeerId primaryId = null;
- List<RaftPeerId> raftPeers = new ArrayList<>();
-
- for (DatanodeDetails dn : pipeline.getNodes()) {
- final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString());
- try {
- if (dn == pipeline.getFirstNode()) {
- primaryId = raftPeerId;
- }
- } catch (IOException e) {
- LOG.error("Can not get FirstNode from the pipeline: {} with " +
- "exception: {}", pipeline.toString(), e.getLocalizedMessage());
- return null;
- }
- raftPeers.add(raftPeerId);
- }
-
- RoutingTable.Builder builder = RoutingTable.newBuilder();
- RaftPeerId previousId = primaryId;
- for (RaftPeerId peerId : raftPeers) {
- if (peerId.equals(primaryId)) {
- continue;
- }
- builder.addSuccessor(previousId, peerId);
- previousId = peerId;
- }
-
- return builder.build();
- }
-
public BlockID getBlockID() {
return blockID.get();
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index c50a184285..e431c67df7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -55,6 +55,7 @@ import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
@@ -425,4 +426,35 @@ public final class RatisHelper {
throw new RuntimeException(e);
}
}
+
+ public static RoutingTable getRoutingTable(Pipeline pipeline) {
+ RaftPeerId primaryId = null;
+ List<RaftPeerId> raftPeers = new ArrayList<>();
+
+ for (DatanodeDetails dn : pipeline.getNodes()) {
+ final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString());
+ try {
+ if (dn == pipeline.getFirstNode()) {
+ primaryId = raftPeerId;
+ }
+ } catch (IOException e) {
+ LOG.error("Can not get FirstNode from the pipeline: {} with " +
+ "exception: {}", pipeline.toString(), e.getLocalizedMessage());
+ return null;
+ }
+ raftPeers.add(raftPeerId);
+ }
+
+ RoutingTable.Builder builder = RoutingTable.newBuilder();
+ RaftPeerId previousId = primaryId;
+ for (RaftPeerId peerId : raftPeers) {
+ if (peerId.equals(primaryId)) {
+ continue;
+ }
+ builder.addSuccessor(previousId, peerId);
+ previousId = peerId;
+ }
+
+ return builder.build();
+ }
}
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 09fff2371e..14ccad18b9 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -546,6 +546,18 @@ public final class ContainerTestHelper {
return String.format("%1$" + length + "s", string);
}
+ public static byte[] generateData(int length, boolean random) {
+ final byte[] data = new byte[length];
+ if (random) {
+ ThreadLocalRandom.current().nextBytes(data);
+ } else {
+ for (int i = 0; i < length; i++) {
+ data[i] = (byte) i;
+ }
+ }
+ return data;
+ }
+
/**
* Construct fake protobuf messages for various types of requests.
* This is tedious, however necessary to test. Protobuf classes are final
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 3ef9477d97..916d3e7f5b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.util.Time;
@@ -549,19 +550,29 @@ public class ContainerStateMachine extends BaseStateMachine {
@Override
public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
- return CompletableFuture.supplyAsync(() -> {
- if (stream == null) {
- return JavaUtils.completeExceptionally(
- new IllegalStateException("DataStream is null"));
- }
- if (stream.getDataChannel().isOpen()) {
- return JavaUtils.completeExceptionally(
- new IllegalStateException(
- "DataStream: " + stream + " is not closed properly"));
- } else {
- return CompletableFuture.completedFuture(null);
+ if (stream == null) {
+ return JavaUtils.completeExceptionally(new IllegalStateException(
+ "DataStream is null"));
+ }
+ final DataChannel dataChannel = stream.getDataChannel();
+ if (dataChannel.isOpen()) {
+ return JavaUtils.completeExceptionally(new IllegalStateException(
+ "DataStream: " + stream + " is not closed properly"));
+ }
+
+ final CompletableFuture<ContainerCommandResponseProto> f;
+ if (dataChannel instanceof KeyValueStreamDataChannel) {
+ f = CompletableFuture.completedFuture(null);
+ } else {
+ return JavaUtils.completeExceptionally(new IllegalStateException(
+ "Unexpected DataChannel " + dataChannel.getClass()));
+ }
+ return f.whenComplete((res, e) -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PutBlock {} Term: {} Index: {}",
+ res.getResult(), entry.getTerm(), entry.getIndex());
}
- }, executor);
+ });
}
private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index eafea02216..c387eb5af6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -185,13 +185,21 @@ public class KeyValueHandler extends Handler {
@Override
public StateMachine.DataChannel getStreamDataChannel(
- Container container, ContainerCommandRequestProto msg)
- throws StorageContainerException {
+ Container container, ContainerCommandRequestProto msg)
+ throws StorageContainerException {
KeyValueContainer kvContainer = (KeyValueContainer) container;
checkContainerOpen(kvContainer);
- BlockID blockID = BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID());
- return chunkManager.getStreamDataChannel(kvContainer,
- blockID, metrics);
+
+ if (msg.hasWriteChunk()) {
+ BlockID blockID =
+ BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID());
+
+ return chunkManager.getStreamDataChannel(kvContainer,
+ blockID, metrics);
+ } else {
+ throw new StorageContainerException("Malformed request.",
+ ContainerProtos.Result.IO_EXCEPTION);
+ }
}
@Override
@@ -274,10 +282,14 @@ public class KeyValueHandler extends Handler {
ContainerCommandResponseProto handleStreamInit(
ContainerCommandRequestProto request, KeyValueContainer kvContainer,
DispatcherContext dispatcherContext) {
- if (!request.hasWriteChunk()) {
+ final BlockID blockID;
+ if (request.hasWriteChunk()) {
+ WriteChunkRequestProto writeChunk = request.getWriteChunk();
+ blockID = BlockID.getFromProtobuf(writeChunk.getBlockID());
+ } else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Malformed Write Chunk request. trace ID: {}",
- request.getTraceID());
+ LOG.debug("Malformed {} request. trace ID: {}",
+ request.getCmdType(), request.getTraceID());
}
return malformedRequest(request);
}
@@ -285,13 +297,8 @@ public class KeyValueHandler extends Handler {
String path = null;
try {
checkContainerOpen(kvContainer);
-
- WriteChunkRequestProto writeChunk = request.getWriteChunk();
- BlockID blockID = BlockID.getFromProtobuf(writeChunk.getBlockID());
-
path = chunkManager
.streamInit(kvContainer, blockID);
-
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
index 14ead4ea86..66723031f0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
@@ -28,7 +28,7 @@ import java.io.File;
/**
* This class is used to get the DataChannel for streaming.
*/
-class KeyValueStreamDataChannel extends StreamDataChannelBase {
+public class KeyValueStreamDataChannel extends StreamDataChannelBase {
KeyValueStreamDataChannel(File file, ContainerData containerData,
ContainerMetrics metrics)
throws StorageContainerException {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
index 00cda7844a..e51242cc10 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -18,10 +18,8 @@
*/
package org.apache.hadoop.ozone.client.io;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -88,30 +86,6 @@ public class BlockDataStreamOutputEntryPool {
this.bufferList = new ArrayList<>();
}
- /**
- * A constructor for testing purpose only.
- *
- * @see KeyDataStreamOutput#KeyDataStreamOutput()
- */
- @VisibleForTesting
- BlockDataStreamOutputEntryPool() {
- streamEntries = new ArrayList<>();
- omClient = null;
- keyArgs = null;
- xceiverClientFactory = null;
- config =
- new OzoneConfiguration().getObject(OzoneClientConfig.class);
- config.setStreamBufferSize(0);
- config.setStreamBufferMaxSize(0);
- config.setStreamBufferFlushSize(0);
- config.setStreamBufferFlushDelay(false);
- requestID = null;
- int chunkSize = 0;
- currentStreamIndex = 0;
- openID = -1;
- excludeList = new ExcludeList();
- }
-
/**
* When a key is opened, it is possible that there are some blocks already
* allocated to it for this open session. In this case, to make use of these
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index 2540e42e24..dc5c3a016d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -28,31 +28,22 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.hdds.scm.storage.AbstractDataStreamOutput;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
-import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
/**
* Maintaining a list of BlockInputStream. Write based on offset.
@@ -63,7 +54,7 @@ import java.util.stream.Collectors;
*
* TODO : currently not support multi-thread access.
*/
-public class KeyDataStreamOutput implements ByteBufferStreamOutput {
+public class KeyDataStreamOutput extends AbstractDataStreamOutput {
private OzoneClientConfig config;
@@ -79,34 +70,16 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
private boolean closed;
private FileEncryptionInfo feInfo;
- private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
- private int retryCount;
+
// how much of data is actually written yet to underlying stream
private long offset;
// how much data has been ingested into the stream
private long writeOffset;
- // whether an exception is encountered while write and whole write could
- // not succeed
- private boolean isException;
+
private final BlockDataStreamOutputEntryPool blockDataStreamOutputEntryPool;
private long clientID;
- /**
- * A constructor for testing purpose only.
- */
- @VisibleForTesting
- public KeyDataStreamOutput() {
- closed = false;
- this.retryPolicyMap = HddsClientUtils.getExceptionList()
- .stream()
- .collect(Collectors.toMap(Function.identity(),
- e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
- retryCount = 0;
- offset = 0;
- blockDataStreamOutputEntryPool = new BlockDataStreamOutputEntryPool();
- }
-
@VisibleForTesting
public List<BlockDataStreamOutputEntry> getStreamEntries() {
return blockDataStreamOutputEntryPool.getStreamEntries();
@@ -122,11 +95,6 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
return blockDataStreamOutputEntryPool.getLocationInfoList();
}
- @VisibleForTesting
- public int getRetryCount() {
- return retryCount;
- }
-
@VisibleForTesting
public long getClientID() {
return clientID;
@@ -142,6 +110,8 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
String uploadID, int partNumber, boolean isMultipart,
boolean unsafeByteBufferConversion
) {
+ super(HddsClientUtils.getRetryPolicyByException(
+ config.getMaxRetryCount(), config.getRetryInterval()));
this.config = config;
OmKeyInfo info = handler.getKeyInfo();
blockDataStreamOutputEntryPool =
@@ -158,10 +128,6 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
// Retrieve the file encryption key info, null if file is not in
// encrypted bucket.
this.feInfo = info.getFileEncryptionInfo();
- this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
- config.getMaxRetryCount(), config.getRetryInterval());
- this.retryCount = 0;
- this.isException = false;
this.writeOffset = 0;
this.clientID = handler.getId();
}
@@ -322,9 +288,10 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
if (bufferedDataLen > 0) {
// If the data is still cached in the underlying stream, we need to
// allocate new block and write this data in the datanode.
- handleRetry(exception, bufferedDataLen);
+ handleRetry(exception);
+ handleWrite(null, 0, bufferedDataLen, true);
// reset the retryCount after handling the exception
- retryCount = 0;
+ resetRetryCount();
}
}
@@ -333,74 +300,6 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
closed = true;
}
- private void handleRetry(IOException exception, long len) throws IOException {
- RetryPolicy retryPolicy = retryPolicyMap
- .get(HddsClientUtils.checkForException(exception).getClass());
- if (retryPolicy == null) {
- retryPolicy = retryPolicyMap.get(Exception.class);
- }
- RetryPolicy.RetryAction action = null;
- try {
- action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
- } catch (Exception e) {
- setExceptionAndThrow(new IOException(e));
- }
- if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
- String msg = "";
- if (action.reason != null) {
- msg = "Retry request failed. " + action.reason;
- LOG.error(msg, exception);
- }
- setExceptionAndThrow(new IOException(msg, exception));
- }
-
- // Throw the exception if the thread is interrupted
- if (Thread.currentThread().isInterrupted()) {
- LOG.warn("Interrupted while trying for retry");
- setExceptionAndThrow(exception);
- }
- Preconditions.checkArgument(
- action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
- if (action.delayMillis > 0) {
- try {
- Thread.sleep(action.delayMillis);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- IOException ioe = (IOException) new InterruptedIOException(
- "Interrupted: action=" + action + ", retry policy=" + retryPolicy)
- .initCause(e);
- setExceptionAndThrow(ioe);
- }
- }
- retryCount++;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Retrying Write request. Already tried {} time(s); " +
- "retry policy is {} ", retryCount, retryPolicy);
- }
- handleWrite(null, 0, len, true);
- }
-
- private void setExceptionAndThrow(IOException ioe) throws IOException {
- isException = true;
- throw ioe;
- }
-
- /**
- * Checks if the provided exception signifies retry failure in ratis client.
- * In case of retry failure, ratis client throws RaftRetryFailureException
- * and all succeeding operations are failed with AlreadyClosedException.
- */
- private boolean checkForRetryFailure(Throwable t) {
- return t instanceof RaftRetryFailureException
- || t instanceof AlreadyClosedException;
- }
-
- // Every container specific exception from datatnode will be seen as
- // StorageContainerException
- private boolean checkIfContainerToExclude(Throwable t) {
- return t instanceof StorageContainerException;
- }
-
@Override
public void flush() throws IOException {
checkNotClosed();
@@ -485,7 +384,7 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
closed = true;
try {
handleFlushOrClose(StreamAction.CLOSE);
- if (!isException) {
+ if (!isException()) {
Preconditions.checkArgument(writeOffset == offset);
}
blockDataStreamOutputEntryPool.commitKey(offset);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 6225e25268..65f7348740 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -161,7 +161,7 @@ public class TestBlockDataStreamOutput {
private void testWrite(int dataLength) throws Exception {
String keyName = getKeyName();
OzoneDataStreamOutput key = createKey(
- keyName, ReplicationType.RATIS, 0);
+ keyName, ReplicationType.RATIS, dataLength);
byte[] data =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
@@ -174,7 +174,7 @@ public class TestBlockDataStreamOutput {
private void testWriteWithFailure(int dataLength) throws Exception {
String keyName = getKeyName();
OzoneDataStreamOutput key = createKey(
- keyName, ReplicationType.RATIS, 0);
+ keyName, ReplicationType.RATIS, dataLength);
byte[] data =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
index f4c756bccd..ad9eca6af7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
@@ -17,7 +17,7 @@
package org.apache.hadoop.ozone.client.rpc;
-
+import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -46,10 +46,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
@@ -74,6 +72,11 @@ public class TestContainerStateMachineStream {
private String volumeName;
private String bucketName;
+ private static final int CHUNK_SIZE = 100;
+ private static final int FLUSH_SIZE = 2 * CHUNK_SIZE;
+ private static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;
+ private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;
+
/**
* Create a MiniDFSCluster for testing.
*
@@ -118,8 +121,15 @@ public class TestContainerStateMachineStream {
conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
conf.setQuietMode(false);
cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200)
+ MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(3)
+ .setHbInterval(200)
.setDataStreamMinPacketSize(1024)
+ .setBlockSize(BLOCK_SIZE)
+ .setChunkSize(CHUNK_SIZE)
+ .setStreamBufferFlushSize(FLUSH_SIZE)
+ .setStreamBufferMaxSize(MAX_FLUSH_SIZE)
+ .setStreamBufferSizeUnit(StorageUnit.BYTES)
.build();
cluster.waitForClusterToBeReady();
cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000);
@@ -146,20 +156,14 @@ public class TestContainerStateMachineStream {
@Test
public void testContainerStateMachineForStreaming() throws Exception {
- long size = 1024 * 8;
+ long size = CHUNK_SIZE + 1;
OzoneDataStreamOutput key = TestHelper.createStreamKey(
"ozone-stream-test.txt", ReplicationType.RATIS, size, objectStore,
volumeName, bucketName);
- byte[] data =
- ContainerTestHelper
- .getFixedLengthString(UUID.randomUUID().toString(),
- (int) (size / 2))
- .getBytes(UTF_8);
+ byte[] data = ContainerTestHelper.generateData((int) size, true);
key.write(ByteBuffer.wrap(data));
- key.write(ByteBuffer.wrap(data));
-
key.flush();
KeyDataStreamOutput streamOutput =
@@ -181,4 +185,35 @@ public class TestContainerStateMachineStream {
Assert.assertTrue(bytesUsed == size);
}
+
+ @Test
+ public void testContainerStateMachineForStreamingSmallFile()
+ throws Exception {
+ long size = CHUNK_SIZE - 1;
+
+ OzoneDataStreamOutput key = TestHelper.createStreamKey(
+ "ozone-stream-test-small-file.txt", ReplicationType.RATIS, size,
+ objectStore, volumeName, bucketName);
+
+ byte[] data = ContainerTestHelper.generateData((int) size, true);
+ key.write(ByteBuffer.wrap(data));
+ key.flush();
+
+ KeyDataStreamOutput streamOutput =
+ (KeyDataStreamOutput) key.getByteBufStreamOutput();
+ List<OmKeyLocationInfo> locationInfoList =
+ streamOutput.getLocationInfoList();
+ key.close();
+ OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+ HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+ cluster);
+
+ long bytesUsed = dn.getDatanodeStateMachine()
+ .getContainer().getContainerSet()
+ .getContainer(omKeyLocationInfo.getContainerID()).
+ getContainerData().getBytesUsed();
+
+ Assert.assertTrue(bytesUsed == size);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org