You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/06 15:45:38 UTC

[ozone] 33/35: HDDS-6137. [Ozone-Streaming] Refactor KeyDataStreamOutput. (#3195)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit b23fbcf9bd9d812912e154a93c726ed134914c84
Author: hao guo <gu...@360.cn>
AuthorDate: Mon Mar 28 16:16:31 2022 +0800

    HDDS-6137. [Ozone-Streaming] Refactor KeyDataStreamOutput. (#3195)
---
 .../hdds/scm/storage/AbstractDataStreamOutput.java | 130 +++++++++++++++++++++
 .../hdds/scm/storage/BlockDataStreamOutput.java    |  36 +-----
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |  32 +++++
 .../ozone/container/ContainerTestHelper.java       |  12 ++
 .../server/ratis/ContainerStateMachine.java        |  35 ++++--
 .../ozone/container/keyvalue/KeyValueHandler.java  |  33 +++---
 .../keyvalue/impl/KeyValueStreamDataChannel.java   |   2 +-
 .../client/io/BlockDataStreamOutputEntryPool.java  |  26 -----
 .../ozone/client/io/KeyDataStreamOutput.java       | 121 ++-----------------
 .../client/rpc/TestBlockDataStreamOutput.java      |   4 +-
 .../rpc/TestContainerStateMachineStream.java       |  59 ++++++++--
 11 files changed, 279 insertions(+), 211 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
new file mode 100644
index 0000000000..e29670d781
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Map;
+
+/**
+ * This class is used for error handling methods.
+ */
+public abstract class AbstractDataStreamOutput
+    implements ByteBufferStreamOutput {
+
+  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+  private int retryCount;
+  private boolean isException;
+
+  protected AbstractDataStreamOutput(
+      Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap) {
+    this.retryPolicyMap = retryPolicyMap;
+    this.isException = false;
+    this.retryCount = 0;
+  }
+
+  @VisibleForTesting
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  protected void resetRetryCount() {
+    retryCount = 0;
+  }
+
+  protected boolean isException() {
+    return isException;
+  }
+
+  /**
+   * Checks if the provided exception signifies retry failure in ratis client.
+   * In case of retry failure, ratis client throws RaftRetryFailureException
+   * and all succeeding operations are failed with AlreadyClosedException.
+   */
+  protected boolean checkForRetryFailure(Throwable t) {
+    return t instanceof RaftRetryFailureException
+        || t instanceof AlreadyClosedException;
+  }
+
+  // Every container specific exception from datatnode will be seen as
+  // StorageContainerException
+  protected boolean checkIfContainerToExclude(Throwable t) {
+    return t instanceof StorageContainerException;
+  }
+
+  protected void setExceptionAndThrow(IOException ioe) throws IOException {
+    isException = true;
+    throw ioe;
+  }
+
+  protected void handleRetry(IOException exception) throws IOException {
+    RetryPolicy retryPolicy = retryPolicyMap
+        .get(HddsClientUtils.checkForException(exception).getClass());
+    if (retryPolicy == null) {
+      retryPolicy = retryPolicyMap.get(Exception.class);
+    }
+    handleRetry(exception, retryPolicy);
+  }
+
+  protected void handleRetry(IOException exception, RetryPolicy retryPolicy)
+      throws IOException {
+    RetryPolicy.RetryAction action = null;
+    try {
+      action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
+    } catch (Exception e) {
+      setExceptionAndThrow(new IOException(e));
+    }
+    if (action != null &&
+        action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+      String msg = "";
+      if (action.reason != null) {
+        msg = "Retry request failed. " + action.reason;
+      }
+      setExceptionAndThrow(new IOException(msg, exception));
+    }
+
+    // Throw the exception if the thread is interrupted
+    if (Thread.currentThread().isInterrupted()) {
+      setExceptionAndThrow(exception);
+    }
+    Preconditions.checkNotNull(action);
+    Preconditions.checkArgument(
+        action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
+    if (action.delayMillis > 0) {
+      try {
+        Thread.sleep(action.delayMillis);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        IOException ioe = (IOException) new InterruptedIOException(
+            "Interrupted: action=" + action + ", retry policy=" + retryPolicy)
+            .initCause(e);
+        setExceptionAndThrow(ioe);
+      }
+    }
+    retryCount++;
+  }
+}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 3df5eb0e12..d5b9dd9d81 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
 import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -44,8 +45,6 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.RoutingTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -208,44 +207,13 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
     if (isDatastreamPipelineMode) {
       return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
           .stream(message.getContent().asReadOnlyByteBuffer(),
-              getRoutingTable(pipeline));
+              RatisHelper.getRoutingTable(pipeline));
     } else {
       return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
           .stream(message.getContent().asReadOnlyByteBuffer());
     }
   }
 
-  public RoutingTable getRoutingTable(Pipeline pipeline) {
-    RaftPeerId primaryId = null;
-    List<RaftPeerId> raftPeers = new ArrayList<>();
-
-    for (DatanodeDetails dn : pipeline.getNodes()) {
-      final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString());
-      try {
-        if (dn == pipeline.getFirstNode()) {
-          primaryId = raftPeerId;
-        }
-      } catch (IOException e) {
-        LOG.error("Can not get FirstNode from the pipeline: {} with " +
-            "exception: {}", pipeline.toString(), e.getLocalizedMessage());
-        return null;
-      }
-      raftPeers.add(raftPeerId);
-    }
-
-    RoutingTable.Builder builder = RoutingTable.newBuilder();
-    RaftPeerId previousId = primaryId;
-    for (RaftPeerId peerId : raftPeers) {
-      if (peerId.equals(primaryId)) {
-        continue;
-      }
-      builder.addSuccessor(previousId, peerId);
-      previousId = peerId;
-    }
-
-    return builder.build();
-  }
-
   public BlockID getBlockID() {
     return blockID.get();
   }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index ca3170985e..b381d7a3a1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -53,6 +53,7 @@ import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
@@ -391,6 +392,37 @@ public final class RatisHelper {
         .min(Long::compareTo).orElse(null);
   }
 
+  public static RoutingTable getRoutingTable(Pipeline pipeline) {
+    RaftPeerId primaryId = null;
+    List<RaftPeerId> raftPeers = new ArrayList<>();
+
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString());
+      try {
+        if (dn == pipeline.getFirstNode()) {
+          primaryId = raftPeerId;
+        }
+      } catch (IOException e) {
+        LOG.error("Can not get FirstNode from the pipeline: {} with " +
+            "exception: {}", pipeline.toString(), e.getLocalizedMessage());
+        return null;
+      }
+      raftPeers.add(raftPeerId);
+    }
+
+    RoutingTable.Builder builder = RoutingTable.newBuilder();
+    RaftPeerId previousId = primaryId;
+    for (RaftPeerId peerId : raftPeers) {
+      if (peerId.equals(primaryId)) {
+        continue;
+      }
+      builder.addSuccessor(previousId, peerId);
+      previousId = peerId;
+    }
+
+    return builder.build();
+  }
+
   private static <U> Class<? extends U> getClass(String name,
       Class<U> xface) {
     try {
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index d90ae6173c..d91c2e9116 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -594,6 +594,18 @@ public final class ContainerTestHelper {
     return String.format("%1$" + length + "s", string);
   }
 
+  public static byte[] generateData(int length, boolean random) {
+    final byte[] data = new byte[length];
+    if (random) {
+      ThreadLocalRandom.current().nextBytes(data);
+    } else {
+      for (int i = 0; i < length; i++) {
+        data[i] = (byte) i;
+      }
+    }
+    return data;
+  }
+
   /**
    * Construct fake protobuf messages for various types of requests.
    * This is tedious, however necessary to test. Protobuf classes are final
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index e06d9f9fc0..138cddd486 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.util.Time;
 
@@ -549,19 +550,29 @@ public class ContainerStateMachine extends BaseStateMachine {
 
   @Override
   public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
-    return CompletableFuture.supplyAsync(() -> {
-      if (stream == null) {
-        return JavaUtils.completeExceptionally(
-            new IllegalStateException("DataStream is null"));
-      }
-      if (stream.getDataChannel().isOpen()) {
-        return JavaUtils.completeExceptionally(
-            new IllegalStateException(
-                "DataStream: " + stream + " is not closed properly"));
-      } else {
-        return CompletableFuture.completedFuture(null);
+    if (stream == null) {
+      return JavaUtils.completeExceptionally(new IllegalStateException(
+          "DataStream is null"));
+    }
+    final DataChannel dataChannel = stream.getDataChannel();
+    if (dataChannel.isOpen()) {
+      return JavaUtils.completeExceptionally(new IllegalStateException(
+          "DataStream: " + stream + " is not closed properly"));
+    }
+
+    final CompletableFuture<ContainerCommandResponseProto> f;
+    if (dataChannel instanceof KeyValueStreamDataChannel) {
+      f = CompletableFuture.completedFuture(null);
+    } else {
+      return JavaUtils.completeExceptionally(new IllegalStateException(
+          "Unexpected DataChannel " + dataChannel.getClass()));
+    }
+    return f.whenComplete((res, e) -> {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("PutBlock {} Term: {} Index: {}",
+            res.getResult(), entry.getTerm(), entry.getIndex());
       }
-    }, executor);
+    });
   }
 
   private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index c72d789909..82e52b3d19 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -181,13 +181,21 @@ public class KeyValueHandler extends Handler {
 
   @Override
   public StateMachine.DataChannel getStreamDataChannel(
-          Container container, ContainerCommandRequestProto msg)
-          throws StorageContainerException {
+      Container container, ContainerCommandRequestProto msg)
+      throws StorageContainerException {
     KeyValueContainer kvContainer = (KeyValueContainer) container;
     checkContainerOpen(kvContainer);
-    BlockID blockID = BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID());
-    return chunkManager.getStreamDataChannel(kvContainer,
-            blockID, metrics);
+
+    if (msg.hasWriteChunk()) {
+      BlockID blockID =
+          BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID());
+
+      return chunkManager.getStreamDataChannel(kvContainer,
+          blockID, metrics);
+    } else {
+      throw new StorageContainerException("Malformed request.",
+          ContainerProtos.Result.IO_EXCEPTION);
+    }
   }
 
   @Override
@@ -268,10 +276,14 @@ public class KeyValueHandler extends Handler {
   ContainerCommandResponseProto handleStreamInit(
       ContainerCommandRequestProto request, KeyValueContainer kvContainer,
       DispatcherContext dispatcherContext) {
-    if (!request.hasWriteChunk()) {
+    final BlockID blockID;
+    if (request.hasWriteChunk()) {
+      WriteChunkRequestProto writeChunk = request.getWriteChunk();
+      blockID = BlockID.getFromProtobuf(writeChunk.getBlockID());
+    } else {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Malformed Write Chunk request. trace ID: {}",
-            request.getTraceID());
+        LOG.debug("Malformed {} request. trace ID: {}",
+            request.getCmdType(), request.getTraceID());
       }
       return malformedRequest(request);
     }
@@ -279,13 +291,8 @@ public class KeyValueHandler extends Handler {
     String path = null;
     try {
       checkContainerOpen(kvContainer);
-
-      WriteChunkRequestProto writeChunk = request.getWriteChunk();
-      BlockID blockID = BlockID.getFromProtobuf(writeChunk.getBlockID());
-
       path = chunkManager
           .streamInit(kvContainer, blockID);
-
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
     }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
index 14ead4ea86..66723031f0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
@@ -28,7 +28,7 @@ import java.io.File;
 /**
  * This class is used to get the DataChannel for streaming.
  */
-class KeyValueStreamDataChannel extends StreamDataChannelBase {
+public class KeyValueStreamDataChannel extends StreamDataChannelBase {
   KeyValueStreamDataChannel(File file, ContainerData containerData,
                             ContainerMetrics metrics)
       throws StorageContainerException {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
index 00cda7844a..e51242cc10 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -18,10 +18,8 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -88,30 +86,6 @@ public class BlockDataStreamOutputEntryPool {
     this.bufferList = new ArrayList<>();
   }
 
-  /**
-   * A constructor for testing purpose only.
-   *
-   * @see KeyDataStreamOutput#KeyDataStreamOutput()
-   */
-  @VisibleForTesting
-  BlockDataStreamOutputEntryPool() {
-    streamEntries = new ArrayList<>();
-    omClient = null;
-    keyArgs = null;
-    xceiverClientFactory = null;
-    config =
-        new OzoneConfiguration().getObject(OzoneClientConfig.class);
-    config.setStreamBufferSize(0);
-    config.setStreamBufferMaxSize(0);
-    config.setStreamBufferFlushSize(0);
-    config.setStreamBufferFlushDelay(false);
-    requestID = null;
-    int chunkSize = 0;
-    currentStreamIndex = 0;
-    openID = -1;
-    excludeList = new ExcludeList();
-  }
-
   /**
    * When a key is opened, it is possible that there are some blocks already
    * allocated to it for this open session. In this case, to make use of these
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index 2540e42e24..dc5c3a016d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -28,31 +28,22 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.hdds.scm.storage.AbstractDataStreamOutput;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
-import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /**
  * Maintaining a list of BlockInputStream. Write based on offset.
@@ -63,7 +54,7 @@ import java.util.stream.Collectors;
  *
  * TODO : currently not support multi-thread access.
  */
-public class KeyDataStreamOutput implements ByteBufferStreamOutput {
+public class KeyDataStreamOutput extends AbstractDataStreamOutput {
 
   private OzoneClientConfig config;
 
@@ -79,34 +70,16 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
 
   private boolean closed;
   private FileEncryptionInfo feInfo;
-  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
-  private int retryCount;
+
   // how much of data is actually written yet to underlying stream
   private long offset;
   // how much data has been ingested into the stream
   private long writeOffset;
-  // whether an exception is encountered while write and whole write could
-  // not succeed
-  private boolean isException;
+
   private final BlockDataStreamOutputEntryPool blockDataStreamOutputEntryPool;
 
   private long clientID;
 
-  /**
-   * A constructor for testing purpose only.
-   */
-  @VisibleForTesting
-  public KeyDataStreamOutput() {
-    closed = false;
-    this.retryPolicyMap = HddsClientUtils.getExceptionList()
-        .stream()
-        .collect(Collectors.toMap(Function.identity(),
-            e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
-    retryCount = 0;
-    offset = 0;
-    blockDataStreamOutputEntryPool = new BlockDataStreamOutputEntryPool();
-  }
-
   @VisibleForTesting
   public List<BlockDataStreamOutputEntry> getStreamEntries() {
     return blockDataStreamOutputEntryPool.getStreamEntries();
@@ -122,11 +95,6 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
     return blockDataStreamOutputEntryPool.getLocationInfoList();
   }
 
-  @VisibleForTesting
-  public int getRetryCount() {
-    return retryCount;
-  }
-
   @VisibleForTesting
   public long getClientID() {
     return clientID;
@@ -142,6 +110,8 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
       String uploadID, int partNumber, boolean isMultipart,
       boolean unsafeByteBufferConversion
   ) {
+    super(HddsClientUtils.getRetryPolicyByException(
+        config.getMaxRetryCount(), config.getRetryInterval()));
     this.config = config;
     OmKeyInfo info = handler.getKeyInfo();
     blockDataStreamOutputEntryPool =
@@ -158,10 +128,6 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
     // Retrieve the file encryption key info, null if file is not in
     // encrypted bucket.
     this.feInfo = info.getFileEncryptionInfo();
-    this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
-        config.getMaxRetryCount(), config.getRetryInterval());
-    this.retryCount = 0;
-    this.isException = false;
     this.writeOffset = 0;
     this.clientID = handler.getId();
   }
@@ -322,9 +288,10 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
     if (bufferedDataLen > 0) {
       // If the data is still cached in the underlying stream, we need to
       // allocate new block and write this data in the datanode.
-      handleRetry(exception, bufferedDataLen);
+      handleRetry(exception);
+      handleWrite(null, 0, bufferedDataLen, true);
       // reset the retryCount after handling the exception
-      retryCount = 0;
+      resetRetryCount();
     }
   }
 
@@ -333,74 +300,6 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
     closed = true;
   }
 
-  private void handleRetry(IOException exception, long len) throws IOException {
-    RetryPolicy retryPolicy = retryPolicyMap
-        .get(HddsClientUtils.checkForException(exception).getClass());
-    if (retryPolicy == null) {
-      retryPolicy = retryPolicyMap.get(Exception.class);
-    }
-    RetryPolicy.RetryAction action = null;
-    try {
-      action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
-    } catch (Exception e) {
-      setExceptionAndThrow(new IOException(e));
-    }
-    if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
-      String msg = "";
-      if (action.reason != null) {
-        msg = "Retry request failed. " + action.reason;
-        LOG.error(msg, exception);
-      }
-      setExceptionAndThrow(new IOException(msg, exception));
-    }
-
-    // Throw the exception if the thread is interrupted
-    if (Thread.currentThread().isInterrupted()) {
-      LOG.warn("Interrupted while trying for retry");
-      setExceptionAndThrow(exception);
-    }
-    Preconditions.checkArgument(
-        action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
-    if (action.delayMillis > 0) {
-      try {
-        Thread.sleep(action.delayMillis);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        IOException ioe =  (IOException) new InterruptedIOException(
-            "Interrupted: action=" + action + ", retry policy=" + retryPolicy)
-            .initCause(e);
-        setExceptionAndThrow(ioe);
-      }
-    }
-    retryCount++;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Retrying Write request. Already tried {} time(s); " +
-          "retry policy is {} ", retryCount, retryPolicy);
-    }
-    handleWrite(null, 0, len, true);
-  }
-
-  private void setExceptionAndThrow(IOException ioe) throws IOException {
-    isException = true;
-    throw ioe;
-  }
-
-  /**
-   * Checks if the provided exception signifies retry failure in ratis client.
-   * In case of retry failure, ratis client throws RaftRetryFailureException
-   * and all succeeding operations are failed with AlreadyClosedException.
-   */
-  private boolean checkForRetryFailure(Throwable t) {
-    return t instanceof RaftRetryFailureException
-        || t instanceof AlreadyClosedException;
-  }
-
-  // Every container specific exception from datatnode will be seen as
-  // StorageContainerException
-  private boolean checkIfContainerToExclude(Throwable t) {
-    return t instanceof StorageContainerException;
-  }
-
   @Override
   public void flush() throws IOException {
     checkNotClosed();
@@ -485,7 +384,7 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
     closed = true;
     try {
       handleFlushOrClose(StreamAction.CLOSE);
-      if (!isException) {
+      if (!isException()) {
         Preconditions.checkArgument(writeOffset == offset);
       }
       blockDataStreamOutputEntryPool.commitKey(offset);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 6225e25268..65f7348740 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -161,7 +161,7 @@ public class TestBlockDataStreamOutput {
   private void testWrite(int dataLength) throws Exception {
     String keyName = getKeyName();
     OzoneDataStreamOutput key = createKey(
-        keyName, ReplicationType.RATIS, 0);
+        keyName, ReplicationType.RATIS, dataLength);
     byte[] data =
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
             .getBytes(UTF_8);
@@ -174,7 +174,7 @@ public class TestBlockDataStreamOutput {
   private void testWriteWithFailure(int dataLength) throws Exception {
     String keyName = getKeyName();
     OzoneDataStreamOutput key = createKey(
-        keyName, ReplicationType.RATIS, 0);
+        keyName, ReplicationType.RATIS, dataLength);
     byte[] data =
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
             .getBytes(UTF_8);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
index f4c756bccd..ad9eca6af7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
@@ -17,7 +17,7 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
-
+import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -46,10 +46,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
@@ -74,6 +72,11 @@ public class TestContainerStateMachineStream {
   private String volumeName;
   private String bucketName;
 
+  private static final int CHUNK_SIZE = 100;
+  private static final int FLUSH_SIZE = 2 * CHUNK_SIZE;
+  private static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;
+  private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;
+
   /**
    * Create a MiniDFSCluster for testing.
    *
@@ -118,8 +121,15 @@ public class TestContainerStateMachineStream {
     conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
     conf.setQuietMode(false);
     cluster =
-        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200)
+        MiniOzoneCluster.newBuilder(conf)
+            .setNumDatanodes(3)
+            .setHbInterval(200)
             .setDataStreamMinPacketSize(1024)
+            .setBlockSize(BLOCK_SIZE)
+            .setChunkSize(CHUNK_SIZE)
+            .setStreamBufferFlushSize(FLUSH_SIZE)
+            .setStreamBufferMaxSize(MAX_FLUSH_SIZE)
+            .setStreamBufferSizeUnit(StorageUnit.BYTES)
             .build();
     cluster.waitForClusterToBeReady();
     cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000);
@@ -146,20 +156,14 @@ public class TestContainerStateMachineStream {
 
   @Test
   public void testContainerStateMachineForStreaming() throws Exception {
-    long size = 1024 * 8;
+    long size = CHUNK_SIZE + 1;
 
     OzoneDataStreamOutput key = TestHelper.createStreamKey(
         "ozone-stream-test.txt", ReplicationType.RATIS, size, objectStore,
         volumeName, bucketName);
 
-    byte[] data =
-        ContainerTestHelper
-            .getFixedLengthString(UUID.randomUUID().toString(),
-                (int) (size / 2))
-            .getBytes(UTF_8);
+    byte[] data = ContainerTestHelper.generateData((int) size, true);
     key.write(ByteBuffer.wrap(data));
-    key.write(ByteBuffer.wrap(data));
-
     key.flush();
 
     KeyDataStreamOutput streamOutput =
@@ -181,4 +185,35 @@ public class TestContainerStateMachineStream {
     Assert.assertTrue(bytesUsed == size);
   }
 
+
+  @Test
+  public void testContainerStateMachineForStreamingSmallFile()
+      throws Exception {
+    long size = CHUNK_SIZE - 1;
+
+    OzoneDataStreamOutput key = TestHelper.createStreamKey(
+        "ozone-stream-test-small-file.txt", ReplicationType.RATIS, size,
+        objectStore, volumeName, bucketName);
+
+    byte[] data = ContainerTestHelper.generateData((int) size, true);
+    key.write(ByteBuffer.wrap(data));
+    key.flush();
+
+    KeyDataStreamOutput streamOutput =
+        (KeyDataStreamOutput) key.getByteBufStreamOutput();
+    List<OmKeyLocationInfo> locationInfoList =
+        streamOutput.getLocationInfoList();
+    key.close();
+    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+    HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+        cluster);
+
+    long bytesUsed = dn.getDatanodeStateMachine()
+        .getContainer().getContainerSet()
+        .getContainer(omKeyLocationInfo.getContainerID()).
+            getContainerData().getBytesUsed();
+
+    Assert.assertTrue(bytesUsed == size);
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org