You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ca...@apache.org on 2022/07/04 12:54:18 UTC
[ozone] 28/38: HDDS-6229. [Ozone-Streaming] Data Channel abstraction on datanode (#3023)
This is an automated email from the ASF dual-hosted git repository.
captainzmc pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 3cc2a7e21be12d0a88897a4a95fc5f80f37cd100
Author: hao guo <gu...@360.cn>
AuthorDate: Tue Feb 15 23:35:46 2022 +0800
HDDS-6229. [Ozone-Streaming] Data Channel abstraction on datanode (#3023)
---
.../keyvalue/impl/KeyValueStreamDataChannel.java | 56 ++--------------------
...DataChannel.java => StreamDataChannelBase.java} | 39 ++++++++-------
2 files changed, 27 insertions(+), 68 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
index c0570f5d4d..14ead4ea86 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
@@ -22,69 +22,21 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.ratis.statemachine.StateMachine;
import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
/**
* This class is used to get the DataChannel for streaming.
*/
-class KeyValueStreamDataChannel implements StateMachine.DataChannel {
- private final RandomAccessFile randomAccessFile;
- private final File file;
-
- private final ContainerData containerData;
- private final ContainerMetrics metrics;
-
+class KeyValueStreamDataChannel extends StreamDataChannelBase {
KeyValueStreamDataChannel(File file, ContainerData containerData,
ContainerMetrics metrics)
throws StorageContainerException {
- try {
- this.file = file;
- this.randomAccessFile = new RandomAccessFile(file, "rw");
- } catch (FileNotFoundException e) {
- throw new StorageContainerException("BlockFile not exists with " +
- "container Id " + containerData.getContainerID() +
- " file " + file.getAbsolutePath(),
- ContainerProtos.Result.IO_EXCEPTION);
- }
- this.containerData = containerData;
- this.metrics = metrics;
- }
-
- @Override
- public void force(boolean metadata) throws IOException {
- randomAccessFile.getChannel().force(metadata);
- }
-
- @Override
- public int write(ByteBuffer src) throws IOException {
- int writeBytes = randomAccessFile.getChannel().write(src);
- metrics
- .incContainerBytesStats(ContainerProtos.Type.StreamWrite, writeBytes);
- containerData.updateWriteStats(writeBytes, false);
- return writeBytes;
- }
-
- @Override
- public boolean isOpen() {
- return randomAccessFile.getChannel().isOpen();
- }
-
- @Override
- public void close() throws IOException {
- randomAccessFile.close();
+ super(file, containerData, metrics);
}
@Override
- public String toString() {
- return "KeyValueStreamDataChannel{" +
- "File=" + file.getAbsolutePath() +
- ", containerID=" + containerData.getContainerID() +
- '}';
+ ContainerProtos.Type getType() {
+ return ContainerProtos.Type.StreamWrite;
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
similarity index 77%
copy from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
copy to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
index c0570f5d4d..b31e2ccbf4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
@@ -29,19 +29,21 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
/**
- * This class is used to get the DataChannel for streaming.
+ * For write state machine data.
*/
-class KeyValueStreamDataChannel implements StateMachine.DataChannel {
+abstract class StreamDataChannelBase implements StateMachine.DataChannel {
private final RandomAccessFile randomAccessFile;
+
private final File file;
private final ContainerData containerData;
private final ContainerMetrics metrics;
- KeyValueStreamDataChannel(File file, ContainerData containerData,
- ContainerMetrics metrics)
+ StreamDataChannelBase(File file, ContainerData containerData,
+ ContainerMetrics metrics)
throws StorageContainerException {
try {
this.file = file;
@@ -56,23 +58,20 @@ class KeyValueStreamDataChannel implements StateMachine.DataChannel {
this.metrics = metrics;
}
- @Override
- public void force(boolean metadata) throws IOException {
- randomAccessFile.getChannel().force(metadata);
+ abstract ContainerProtos.Type getType();
+
+ private FileChannel getChannel() {
+ return randomAccessFile.getChannel();
}
@Override
- public int write(ByteBuffer src) throws IOException {
- int writeBytes = randomAccessFile.getChannel().write(src);
- metrics
- .incContainerBytesStats(ContainerProtos.Type.StreamWrite, writeBytes);
- containerData.updateWriteStats(writeBytes, false);
- return writeBytes;
+ public final void force(boolean metadata) throws IOException {
+ getChannel().force(metadata);
}
@Override
- public boolean isOpen() {
- return randomAccessFile.getChannel().isOpen();
+ public final boolean isOpen() {
+ return getChannel().isOpen();
}
@Override
@@ -80,9 +79,17 @@ class KeyValueStreamDataChannel implements StateMachine.DataChannel {
randomAccessFile.close();
}
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ final int writeBytes = getChannel().write(src);
+ metrics.incContainerBytesStats(getType(), writeBytes);
+ containerData.updateWriteStats(writeBytes, false);
+ return writeBytes;
+ }
+
@Override
public String toString() {
- return "KeyValueStreamDataChannel{" +
+ return getClass().getSimpleName() + "{" +
"File=" + file.getAbsolutePath() +
", containerID=" + containerData.getContainerID() +
'}';
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org