You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/06 15:45:33 UTC

[ozone] 28/35: HDDS-6229. [Ozone-Streaming] Data Channel abstraction on datanode (#3023)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit b2810432208965ef08f2d5f7e68af21214f859e8
Author: hao guo <gu...@360.cn>
AuthorDate: Tue Feb 15 23:35:46 2022 +0800

    HDDS-6229. [Ozone-Streaming] Data Channel abstraction on datanode (#3023)
---
 .../keyvalue/impl/KeyValueStreamDataChannel.java   | 56 ++--------------------
 ...DataChannel.java => StreamDataChannelBase.java} | 39 ++++++++-------
 2 files changed, 27 insertions(+), 68 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
index c0570f5d4d..14ead4ea86 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
@@ -22,69 +22,21 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.ratis.statemachine.StateMachine;
 
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
 
 /**
  * This class is used to get the DataChannel for streaming.
  */
-class KeyValueStreamDataChannel implements StateMachine.DataChannel {
-  private final RandomAccessFile randomAccessFile;
-  private final File file;
-
-  private final ContainerData containerData;
-  private final ContainerMetrics metrics;
-
+class KeyValueStreamDataChannel extends StreamDataChannelBase {
   KeyValueStreamDataChannel(File file, ContainerData containerData,
                             ContainerMetrics metrics)
       throws StorageContainerException {
-    try {
-      this.file = file;
-      this.randomAccessFile = new RandomAccessFile(file, "rw");
-    } catch (FileNotFoundException e) {
-      throw new StorageContainerException("BlockFile not exists with " +
-          "container Id " + containerData.getContainerID() +
-          " file " + file.getAbsolutePath(),
-          ContainerProtos.Result.IO_EXCEPTION);
-    }
-    this.containerData = containerData;
-    this.metrics = metrics;
-  }
-
-  @Override
-  public void force(boolean metadata) throws IOException {
-    randomAccessFile.getChannel().force(metadata);
-  }
-
-  @Override
-  public int write(ByteBuffer src) throws IOException {
-    int writeBytes = randomAccessFile.getChannel().write(src);
-    metrics
-        .incContainerBytesStats(ContainerProtos.Type.StreamWrite, writeBytes);
-    containerData.updateWriteStats(writeBytes, false);
-    return writeBytes;
-  }
-
-  @Override
-  public boolean isOpen() {
-    return randomAccessFile.getChannel().isOpen();
-  }
-
-  @Override
-  public void close() throws IOException {
-    randomAccessFile.close();
+    super(file, containerData, metrics);
   }
 
   @Override
-  public String toString() {
-    return "KeyValueStreamDataChannel{" +
-        "File=" + file.getAbsolutePath() +
-        ", containerID=" + containerData.getContainerID() +
-        '}';
+  ContainerProtos.Type getType() {
+    return ContainerProtos.Type.StreamWrite;
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
similarity index 77%
copy from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
copy to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
index c0570f5d4d..b31e2ccbf4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
@@ -29,19 +29,21 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 
 /**
- * This class is used to get the DataChannel for streaming.
+ * For write state machine data.
  */
-class KeyValueStreamDataChannel implements StateMachine.DataChannel {
+abstract class StreamDataChannelBase implements StateMachine.DataChannel {
   private final RandomAccessFile randomAccessFile;
+
   private final File file;
 
   private final ContainerData containerData;
   private final ContainerMetrics metrics;
 
-  KeyValueStreamDataChannel(File file, ContainerData containerData,
-                            ContainerMetrics metrics)
+  StreamDataChannelBase(File file, ContainerData containerData,
+                        ContainerMetrics metrics)
       throws StorageContainerException {
     try {
       this.file = file;
@@ -56,23 +58,20 @@ class KeyValueStreamDataChannel implements StateMachine.DataChannel {
     this.metrics = metrics;
   }
 
-  @Override
-  public void force(boolean metadata) throws IOException {
-    randomAccessFile.getChannel().force(metadata);
+  abstract ContainerProtos.Type getType();
+
+  private FileChannel getChannel() {
+    return randomAccessFile.getChannel();
   }
 
   @Override
-  public int write(ByteBuffer src) throws IOException {
-    int writeBytes = randomAccessFile.getChannel().write(src);
-    metrics
-        .incContainerBytesStats(ContainerProtos.Type.StreamWrite, writeBytes);
-    containerData.updateWriteStats(writeBytes, false);
-    return writeBytes;
+  public final void force(boolean metadata) throws IOException {
+    getChannel().force(metadata);
   }
 
   @Override
-  public boolean isOpen() {
-    return randomAccessFile.getChannel().isOpen();
+  public final boolean isOpen() {
+    return getChannel().isOpen();
   }
 
   @Override
@@ -80,9 +79,17 @@ class KeyValueStreamDataChannel implements StateMachine.DataChannel {
     randomAccessFile.close();
   }
 
+  @Override
+  public int write(ByteBuffer src) throws IOException {
+    final int writeBytes = getChannel().write(src);
+    metrics.incContainerBytesStats(getType(), writeBytes);
+    containerData.updateWriteStats(writeBytes, false);
+    return writeBytes;
+  }
+
   @Override
   public String toString() {
-    return "KeyValueStreamDataChannel{" +
+    return getClass().getSimpleName() + "{" +
         "File=" + file.getAbsolutePath() +
         ", containerID=" + containerData.getContainerID() +
         '}';


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org