You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2021/11/17 07:20:01 UTC
[ozone] branch HDDS-4454 updated: HDDS-5961. [Ozone-Streaming] update the usage space of Containers in the stream write (#2833)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-4454 by this push:
new 869f943 HDDS-5961. [Ozone-Streaming] update the usage space of Containers in the stream write (#2833)
869f943 is described below
commit 869f94354b81bdfdf3b781c63b332fcb630b7a56
Author: hao guo <gu...@360.cn>
AuthorDate: Wed Nov 17 15:19:47 2021 +0800
HDDS-5961. [Ozone-Streaming] update the usage space of Containers in the stream write (#2833)
---
.../container/common/impl/HddsDispatcher.java | 18 ++
.../common/interfaces/ContainerDispatcher.java | 10 ++
.../ozone/container/common/interfaces/Handler.java | 5 +
.../server/ratis/ContainerStateMachine.java | 21 ++-
.../transport/server/ratis/StreamDataChannel.java | 57 -------
.../ozone/container/keyvalue/KeyValueHandler.java | 12 ++
.../keyvalue/impl/ChunkManagerDispatcher.java | 10 ++
.../keyvalue/impl/FilePerBlockStrategy.java | 12 ++
.../keyvalue/impl/KeyValueStreamDataChannel.java | 90 ++++++++++
.../keyvalue/interfaces/ChunkManager.java | 8 +
.../src/main/proto/DatanodeClientProtocol.proto | 1 +
.../rpc/TestContainerStateMachineStream.java | 183 +++++++++++++++++++++
.../container/server/TestContainerServer.java | 1 +
13 files changed, 365 insertions(+), 63 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 72a8107..5e73caf 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -66,6 +66,7 @@ import com.google.protobuf.ServiceException;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
+import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -669,4 +670,21 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
default: return false;
}
}
+
+ @Override
+ public StateMachine.DataChannel getStreamDataChannel(
+ ContainerCommandRequestProto msg)
+ throws StorageContainerException {
+ long containerID = msg.getContainerID();
+ Container container = getContainer(containerID);
+ if (container != null) {
+ Handler handler = getHandler(getContainerType(container));
+ return handler.getStreamDataChannel(container, msg);
+ } else {
+ throw new StorageContainerException(
+ "ContainerID " + containerID + " does not exist",
+ ContainerProtos.Result.CONTAINER_NOT_FOUND);
+ }
+ }
+
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index a2e397d..d02bae0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import org.apache.ratis.statemachine.StateMachine;
import java.util.Map;
@@ -84,4 +85,13 @@ public interface ContainerDispatcher {
* @param clusterId
*/
void setClusterId(String clusterId);
+
+ /**
+ * When uploading using stream, get StreamDataChannel.
+ */
+ default StateMachine.DataChannel getStreamDataChannel(
+ ContainerCommandRequestProto msg) throws StorageContainerException {
+ throw new UnsupportedOperationException(
+ "getStreamDataChannel not supported.");
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index e585234..201a1e2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.Dispatche
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.ratis.statemachine.StateMachine;
/**
* Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
@@ -81,6 +82,10 @@ public abstract class Handler {
}
}
+ public abstract StateMachine.DataChannel getStreamDataChannel(
+ Container container, ContainerCommandRequestProto msg)
+ throws StorageContainerException;
+
/**
* Returns the Id of this datanode.
*
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 9e4a9e9..5b59407 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -23,7 +23,6 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -78,6 +77,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -501,6 +501,19 @@ public class ContainerStateMachine extends BaseStateMachine {
return raftFuture;
}
+ private StateMachine.DataChannel getStreamDataChannel(
+ ContainerCommandRequestProto requestProto,
+ DispatcherContext context) throws StorageContainerException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: getStreamDataChannel {} containerID={} pipelineID={} " +
+ "traceID={}", gid, requestProto.getCmdType(),
+ requestProto.getContainerID(), requestProto.getPipelineID(),
+ requestProto.getTraceID());
+ }
+ runCommand(requestProto, context); // stream init
+ return dispatcher.getStreamDataChannel(requestProto);
+ }
+
@Override
public CompletableFuture<DataStream> stream(RaftClientRequest request) {
return CompletableFuture.supplyAsync(() -> {
@@ -512,11 +525,7 @@ public class ContainerStateMachine extends BaseStateMachine {
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();
-
- ContainerCommandResponseProto response = runCommand(
- requestProto, context);
- final StreamDataChannel channel = new StreamDataChannel(
- Paths.get(response.getMessage()));
+ DataChannel channel = getStreamDataChannel(requestProto, context);
final ExecutorService chunkExecutor = requestProto.hasWriteChunk() ?
getChunkExecutor(requestProto.getWriteChunk()) : null;
return new LocalStream(channel, chunkExecutor);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java
deleted file mode 100644
index 3df66e2..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server.ratis;
-
-import org.apache.ratis.statemachine.StateMachine;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.file.Path;
-
-class StreamDataChannel implements StateMachine.DataChannel {
- private final Path path;
- private final RandomAccessFile randomAccessFile;
-
- StreamDataChannel(Path path) throws FileNotFoundException {
- this.path = path;
- this.randomAccessFile = new RandomAccessFile(path.toFile(), "rw");
- }
-
- @Override
- public void force(boolean metadata) throws IOException {
- randomAccessFile.getChannel().force(metadata);
- }
-
- @Override
- public int write(ByteBuffer src) throws IOException {
- return randomAccessFile.getChannel().write(src);
- }
-
- @Override
- public boolean isOpen() {
- return randomAccessFile.getChannel().isOpen();
- }
-
- @Override
- public void close() throws IOException {
- randomAccessFile.close();
- }
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index a2b82e5..a8202e3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -105,6 +105,7 @@ import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuil
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
+import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -176,6 +177,17 @@ public class KeyValueHandler extends Handler {
}
@Override
+ public StateMachine.DataChannel getStreamDataChannel(
+ Container container, ContainerCommandRequestProto msg)
+ throws StorageContainerException {
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ checkContainerOpen(kvContainer);
+ BlockID blockID = BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID());
+ return chunkManager.getStreamDataChannel(kvContainer,
+ blockID, metrics);
+ }
+
+ @Override
public void stop() {
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
index 92d2606..a5f8535 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
@@ -34,6 +35,7 @@ import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.ratis.statemachine.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +83,14 @@ public class ChunkManagerDispatcher implements ChunkManager {
}
@Override
+ public StateMachine.DataChannel getStreamDataChannel(
+ Container container, BlockID blockID, ContainerMetrics metrics)
+ throws StorageContainerException {
+ return selectHandler(container)
+ .getStreamDataChannel(container, blockID, metrics);
+ }
+
+ @Override
public void finishWriteChunks(KeyValueContainer kvContainer,
BlockData blockData) throws IOException {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index 8b13c93..b510772 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
@@ -42,6 +43,7 @@ import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.ratis.statemachine.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,6 +100,16 @@ public class FilePerBlockStrategy implements ChunkManager {
}
@Override
+ public StateMachine.DataChannel getStreamDataChannel(
+ Container container, BlockID blockID, ContainerMetrics metrics)
+ throws StorageContainerException {
+ checkLayoutVersion(container);
+ File chunkFile = getChunkFile(container, blockID, null);
+ return new KeyValueStreamDataChannel(chunkFile,
+ container.getContainerData(), metrics);
+ }
+
+ @Override
public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
ChunkBuffer data, DispatcherContext dispatcherContext)
throws StorageContainerException {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
new file mode 100644
index 0000000..c0570f5
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+
+/**
+ * This class is used to get the DataChannel for streaming.
+ */
+class KeyValueStreamDataChannel implements StateMachine.DataChannel {
+ private final RandomAccessFile randomAccessFile;
+ private final File file;
+
+ private final ContainerData containerData;
+ private final ContainerMetrics metrics;
+
+ KeyValueStreamDataChannel(File file, ContainerData containerData,
+ ContainerMetrics metrics)
+ throws StorageContainerException {
+ try {
+ this.file = file;
+ this.randomAccessFile = new RandomAccessFile(file, "rw");
+ } catch (FileNotFoundException e) {
+ throw new StorageContainerException("BlockFile not exists with " +
+ "container Id " + containerData.getContainerID() +
+ " file " + file.getAbsolutePath(),
+ ContainerProtos.Result.IO_EXCEPTION);
+ }
+ this.containerData = containerData;
+ this.metrics = metrics;
+ }
+
+ @Override
+ public void force(boolean metadata) throws IOException {
+ randomAccessFile.getChannel().force(metadata);
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ int writeBytes = randomAccessFile.getChannel().write(src);
+ metrics
+ .incContainerBytesStats(ContainerProtos.Type.StreamWrite, writeBytes);
+ containerData.updateWriteStats(writeBytes, false);
+ return writeBytes;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return randomAccessFile.getChannel().isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ randomAccessFile.close();
+ }
+
+ @Override
+ public String toString() {
+ return "KeyValueStreamDataChannel{" +
+ "File=" + file.getAbsolutePath() +
+ ", containerID=" + containerData.getContainerID() +
+ '}';
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index ba06eeb..7a64f07 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -25,9 +25,11 @@ import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.ratis.statemachine.StateMachine;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -109,6 +111,12 @@ public interface ChunkManager {
return null;
}
+ default StateMachine.DataChannel getStreamDataChannel(
+ Container container, BlockID blockID, ContainerMetrics metrics)
+ throws StorageContainerException {
+ return null;
+ }
+
static long getBufferCapacityForChunkRead(ChunkInfo chunkInfo,
long defaultReadBufferCapacity) {
long bufferCapacity = 0;
diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 90ecbfc..05f453f 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -102,6 +102,7 @@ enum Type {
GetCommittedBlockLength = 18;
StreamInit = 19;
+ StreamWrite = 20;
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
new file mode 100644
index 0000000..3b17450
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests the containerStateMachine stream handling.
+ */
+public class TestContainerStateMachineStream {
+
+ /**
+ * Set a timeout for each test.
+ */
+ @Rule
+ public Timeout timeout = Timeout.seconds(300);
+
+ private MiniOzoneCluster cluster;
+ private OzoneConfiguration conf = new OzoneConfiguration();
+ private OzoneClient client;
+ private ObjectStore objectStore;
+ private String volumeName;
+ private String bucketName;
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ *
+ * @throws IOException
+ */
+ @Before
+ public void setup() throws Exception {
+ conf = new OzoneConfiguration();
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamBufferFlushDelay(false);
+ conf.setFromObject(clientConfig);
+
+ conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+ TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
+ TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
+ TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
+ conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1,
+ TimeUnit.SECONDS);
+
+ RatisClientConfig ratisClientConfig =
+ conf.getObject(RatisClientConfig.class);
+ ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(10));
+ ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(10));
+ conf.setFromObject(ratisClientConfig);
+
+ DatanodeRatisServerConfig ratisServerConfig =
+ conf.getObject(DatanodeRatisServerConfig.class);
+ ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
+ ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(10));
+ conf.setFromObject(ratisServerConfig);
+
+ RatisClientConfig.RaftConfig raftClientConfig =
+ conf.getObject(RatisClientConfig.RaftConfig.class);
+ raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
+ raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(10));
+ conf.setFromObject(raftClientConfig);
+
+ conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
+ conf.setQuietMode(false);
+ cluster =
+ MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200)
+ .build();
+ cluster.waitForClusterToBeReady();
+ cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000);
+ //the easiest way to create an open container is creating a key
+ client = OzoneClientFactory.getRpcClient(conf);
+ objectStore = client.getObjectStore();
+
+ volumeName = "testcontainerstatemachinestream";
+ bucketName = "teststreambucket";
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @After
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testContainerStateMachineForStreaming() throws Exception {
+ long size = 1024 * 8;
+
+ OzoneDataStreamOutput key = TestHelper.createStreamKey(
+ "ozone-stream-test.txt", ReplicationType.RATIS, size, objectStore,
+ volumeName, bucketName);
+
+ byte[] data =
+ ContainerTestHelper
+ .getFixedLengthString(UUID.randomUUID().toString(),
+ (int) (size / 2))
+ .getBytes(UTF_8);
+ key.write(ByteBuffer.wrap(data));
+ key.write(ByteBuffer.wrap(data));
+
+ key.flush();
+
+ KeyDataStreamOutput streamOutput =
+ (KeyDataStreamOutput) key.getByteBufStreamOutput();
+ List<OmKeyLocationInfo> locationInfoList =
+ streamOutput.getLocationInfoList();
+
+ key.close();
+
+ OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+ HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo,
+ cluster);
+
+ long bytesUsed = dn.getDatanodeStateMachine()
+ .getContainer().getContainerSet()
+ .getContainer(omKeyLocationInfo.getContainerID()).
+ getContainerData().getBytesUsed();
+
+ Assert.assertTrue(bytesUsed == size);
+ }
+
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index 29f19eb..34055d1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -65,6 +65,7 @@ import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanode
import org.apache.ratis.rpc.RpcType;
import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
+
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.junit.Assert;
import org.junit.BeforeClass;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org