You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/06 15:45:20 UTC

[ozone] 15/35: HDDS-5961. [Ozone-Streaming] update the usage space of Containers in the stream write (#2833)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit bb41750fdd4945699a6dfe7390f3457bfe28fd06
Author: hao guo <gu...@360.cn>
AuthorDate: Wed Nov 17 15:19:47 2021 +0800

    HDDS-5961. [Ozone-Streaming] update the usage space of Containers in the stream write (#2833)
---
 .../container/common/impl/HddsDispatcher.java      |  18 ++
 .../common/interfaces/ContainerDispatcher.java     |  10 ++
 .../ozone/container/common/interfaces/Handler.java |   5 +
 .../server/ratis/ContainerStateMachine.java        |  21 ++-
 .../transport/server/ratis/StreamDataChannel.java  |  57 -------
 .../ozone/container/keyvalue/KeyValueHandler.java  |  12 ++
 .../keyvalue/impl/ChunkManagerDispatcher.java      |  10 ++
 .../keyvalue/impl/FilePerBlockStrategy.java        |  12 ++
 .../keyvalue/impl/KeyValueStreamDataChannel.java   |  90 ++++++++++
 .../keyvalue/interfaces/ChunkManager.java          |   8 +
 .../src/main/proto/DatanodeClientProtocol.proto    |   1 +
 .../rpc/TestContainerStateMachineStream.java       | 183 +++++++++++++++++++++
 12 files changed, 364 insertions(+), 63 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 2e0f1de496..e81308a4f3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -66,6 +66,7 @@ import com.google.protobuf.ServiceException;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
 
+import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -675,4 +676,21 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
     default: return false;
     }
   }
+
+  @Override
+  public StateMachine.DataChannel getStreamDataChannel(
+          ContainerCommandRequestProto msg)
+          throws StorageContainerException {
+    long containerID = msg.getContainerID();
+    Container container = getContainer(containerID);
+    if (container != null) {
+      Handler handler = getHandler(getContainerType(container));
+      return handler.getStreamDataChannel(container, msg);
+    } else {
+      throw new StorageContainerException(
+              "ContainerID " + containerID + " does not exist",
+              ContainerProtos.Result.CONTAINER_NOT_FOUND);
+    }
+  }
+
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index a2e397d546..d02bae0a35 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import org.apache.ratis.statemachine.StateMachine;
 
 import java.util.Map;
 
@@ -84,4 +85,13 @@ public interface ContainerDispatcher {
    * @param clusterId
    */
   void setClusterId(String clusterId);
+
+  /**
+   * When uploading using stream, get StreamDataChannel.
+   */
+  default StateMachine.DataChannel getStreamDataChannel(
+      ContainerCommandRequestProto msg) throws StorageContainerException {
+    throw new UnsupportedOperationException(
+        "getStreamDataChannel not supported.");
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 1dbd588d33..7401aa545d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.Dispatche
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.ratis.statemachine.StateMachine;
 
 /**
  * Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
@@ -80,6 +81,10 @@ public abstract class Handler {
     }
   }
 
+  public abstract StateMachine.DataChannel getStreamDataChannel(
+          Container container, ContainerCommandRequestProto msg)
+          throws StorageContainerException;
+
   /**
    * Returns the Id of this datanode.
    *
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index ed5659d1d7..e06d9f9fc0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collection;
-import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -82,6 +81,7 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -513,6 +513,19 @@ public class ContainerStateMachine extends BaseStateMachine {
     return raftFuture;
   }
 
+  private StateMachine.DataChannel getStreamDataChannel(
+          ContainerCommandRequestProto requestProto,
+          DispatcherContext context) throws StorageContainerException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("{}: getStreamDataChannel {} containerID={} pipelineID={} " +
+                      "traceID={}", gid, requestProto.getCmdType(),
+              requestProto.getContainerID(), requestProto.getPipelineID(),
+              requestProto.getTraceID());
+    }
+    runCommand(requestProto, context);  // stream init
+    return dispatcher.getStreamDataChannel(requestProto);
+  }
+
   @Override
   public CompletableFuture<DataStream> stream(RaftClientRequest request) {
     return CompletableFuture.supplyAsync(() -> {
@@ -524,11 +537,7 @@ public class ContainerStateMachine extends BaseStateMachine {
                 .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
                 .setContainer2BCSIDMap(container2BCSIDMap)
                 .build();
-
-        ContainerCommandResponseProto response = runCommand(
-            requestProto, context);
-        final StreamDataChannel channel = new StreamDataChannel(
-            Paths.get(response.getMessage()));
+        DataChannel channel = getStreamDataChannel(requestProto, context);
         final ExecutorService chunkExecutor = requestProto.hasWriteChunk() ?
             getChunkExecutor(requestProto.getWriteChunk()) : null;
         return new LocalStream(channel, chunkExecutor);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java
deleted file mode 100644
index 3df66e26dc..0000000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server.ratis;
-
-import org.apache.ratis.statemachine.StateMachine;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.file.Path;
-
-class StreamDataChannel implements StateMachine.DataChannel {
-  private final Path path;
-  private final RandomAccessFile randomAccessFile;
-
-  StreamDataChannel(Path path) throws FileNotFoundException {
-    this.path = path;
-    this.randomAccessFile = new RandomAccessFile(path.toFile(), "rw");
-  }
-
-  @Override
-  public void force(boolean metadata) throws IOException {
-    randomAccessFile.getChannel().force(metadata);
-  }
-
-  @Override
-  public int write(ByteBuffer src) throws IOException {
-    return randomAccessFile.getChannel().write(src);
-  }
-
-  @Override
-  public boolean isOpen() {
-    return randomAccessFile.getChannel().isOpen();
-  }
-
-  @Override
-  public void close() throws IOException {
-    randomAccessFile.close();
-  }
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 44e5f38a90..c72d789909 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -105,6 +105,7 @@ import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuil
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
 import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
 
+import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -178,6 +179,17 @@ public class KeyValueHandler extends Handler {
     return volumeChoosingPolicy;
   }
 
+  @Override
+  public StateMachine.DataChannel getStreamDataChannel(
+          Container container, ContainerCommandRequestProto msg)
+          throws StorageContainerException {
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    checkContainerOpen(kvContainer);
+    BlockID blockID = BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID());
+    return chunkManager.getStreamDataChannel(kvContainer,
+            blockID, metrics);
+  }
+
   @Override
   public void stop() {
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
index 3e2ab46470..92f6327447 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
@@ -33,6 +34,7 @@ import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 
+import org.apache.ratis.statemachine.StateMachine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,6 +81,14 @@ public class ChunkManagerDispatcher implements ChunkManager {
         .streamInit(container, blockID);
   }
 
+  @Override
+  public StateMachine.DataChannel getStreamDataChannel(
+          Container container, BlockID blockID, ContainerMetrics metrics)
+          throws StorageContainerException {
+    return selectHandler(container)
+            .getStreamDataChannel(container, blockID, metrics);
+  }
+
   @Override
   public void finishWriteChunks(KeyValueContainer kvContainer,
       BlockData blockData) throws IOException {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index 9efc6bc351..23db342da0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
@@ -42,6 +43,7 @@ import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 
+import org.apache.ratis.statemachine.StateMachine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,6 +99,16 @@ public class FilePerBlockStrategy implements ChunkManager {
     return chunkFile.getAbsolutePath();
   }
 
+  @Override
+  public StateMachine.DataChannel getStreamDataChannel(
+          Container container, BlockID blockID, ContainerMetrics metrics)
+          throws StorageContainerException {
+    checkLayoutVersion(container);
+    File chunkFile = getChunkFile(container, blockID, null);
+    return new KeyValueStreamDataChannel(chunkFile,
+        container.getContainerData(), metrics);
+  }
+
   @Override
   public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
       ChunkBuffer data, DispatcherContext dispatcherContext)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
new file mode 100644
index 0000000000..c0570f5d4d
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+
+/**
+ * This class is used to get the DataChannel for streaming.
+ */
+class KeyValueStreamDataChannel implements StateMachine.DataChannel {
+  private final RandomAccessFile randomAccessFile;
+  private final File file;
+
+  private final ContainerData containerData;
+  private final ContainerMetrics metrics;
+
+  KeyValueStreamDataChannel(File file, ContainerData containerData,
+                            ContainerMetrics metrics)
+      throws StorageContainerException {
+    try {
+      this.file = file;
+      this.randomAccessFile = new RandomAccessFile(file, "rw");
+    } catch (FileNotFoundException e) {
+      throw new StorageContainerException("BlockFile not exists with " +
+          "container Id " + containerData.getContainerID() +
+          " file " + file.getAbsolutePath(),
+          ContainerProtos.Result.IO_EXCEPTION);
+    }
+    this.containerData = containerData;
+    this.metrics = metrics;
+  }
+
+  @Override
+  public void force(boolean metadata) throws IOException {
+    randomAccessFile.getChannel().force(metadata);
+  }
+
+  @Override
+  public int write(ByteBuffer src) throws IOException {
+    int writeBytes = randomAccessFile.getChannel().write(src);
+    metrics
+        .incContainerBytesStats(ContainerProtos.Type.StreamWrite, writeBytes);
+    containerData.updateWriteStats(writeBytes, false);
+    return writeBytes;
+  }
+
+  @Override
+  public boolean isOpen() {
+    return randomAccessFile.getChannel().isOpen();
+  }
+
+  @Override
+  public void close() throws IOException {
+    randomAccessFile.close();
+  }
+
+  @Override
+  public String toString() {
+    return "KeyValueStreamDataChannel{" +
+        "File=" + file.getAbsolutePath() +
+        ", containerID=" + containerData.getContainerID() +
+        '}';
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index ba06eebd69..7a64f07628 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -25,9 +25,11 @@ import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.ratis.statemachine.StateMachine;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -109,6 +111,12 @@ public interface ChunkManager {
     return null;
   }
 
+  default StateMachine.DataChannel getStreamDataChannel(
+          Container container, BlockID blockID, ContainerMetrics metrics)
+          throws StorageContainerException {
+    return null;
+  }
+
   static long getBufferCapacityForChunkRead(ChunkInfo chunkInfo,
       long defaultReadBufferCapacity) {
     long bufferCapacity = 0;
diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 62e8b9a55e..b87a0e8df5 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -102,6 +102,7 @@ enum Type {
   GetCommittedBlockLength = 18;
 
   StreamInit = 19;
+  StreamWrite = 20;
 }
 
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
new file mode 100644
index 0000000000..3b17450376
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests the containerStateMachine stream handling.
+ */
+public class TestContainerStateMachineStream {
+
+  /**
+   * Set a timeout for each test.
+   */
+  @Rule
+  public Timeout timeout = Timeout.seconds(300);
+
+  private MiniOzoneCluster cluster;
+  private OzoneConfiguration conf = new OzoneConfiguration();
+  private OzoneClient client;
+  private ObjectStore objectStore;
+  private String volumeName;
+  private String bucketName;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  @Before
+  public void setup() throws Exception {
+    conf = new OzoneConfiguration();
+
+    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+    clientConfig.setStreamBufferFlushDelay(false);
+    conf.setFromObject(clientConfig);
+
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1,
+        TimeUnit.SECONDS);
+
+    RatisClientConfig ratisClientConfig =
+        conf.getObject(RatisClientConfig.class);
+    ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(10));
+    ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(10));
+    conf.setFromObject(ratisClientConfig);
+
+    DatanodeRatisServerConfig ratisServerConfig =
+        conf.getObject(DatanodeRatisServerConfig.class);
+    ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
+    ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(10));
+    conf.setFromObject(ratisServerConfig);
+
+    RatisClientConfig.RaftConfig raftClientConfig =
+        conf.getObject(RatisClientConfig.RaftConfig.class);
+    raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
+    raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(10));
+    conf.setFromObject(raftClientConfig);
+
+    conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
+    conf.setQuietMode(false);
+    cluster =
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200)
+            .build();
+    cluster.waitForClusterToBeReady();
+    cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000);
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getRpcClient(conf);
+    objectStore = client.getObjectStore();
+
+    volumeName = "testcontainerstatemachinestream";
+    bucketName = "teststreambucket";
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testContainerStateMachineForStreaming() throws Exception {
+    long size = 1024 * 8;
+
+    OzoneDataStreamOutput key = TestHelper.createStreamKey(
+        "ozone-stream-test.txt", ReplicationType.RATIS, size, objectStore,
+        volumeName, bucketName);
+
+    byte[] data =
+        ContainerTestHelper
+            .getFixedLengthString(UUID.randomUUID().toString(),
+                (int) (size / 2))
+            .getBytes(UTF_8);
+    key.write(ByteBuffer.wrap(data));
+    key.write(ByteBuffer.wrap(data));
+
+    key.flush();
+
+    KeyDataStreamOutput streamOutput =
+        (KeyDataStreamOutput) key.getByteBufStreamOutput();
+    List<OmKeyLocationInfo> locationInfoList =
+        streamOutput.getLocationInfoList();
+
+    key.close();
+
+    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+    HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+        cluster);
+
+    long bytesUsed = dn.getDatanodeStateMachine()
+        .getContainer().getContainerSet()
+        .getContainer(omKeyLocationInfo.getContainerID()).
+            getContainerData().getBytesUsed();
+
+    Assert.assertTrue(bytesUsed == size);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org