You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/09/20 17:05:37 UTC

[GitHub] [ozone] sodonnel commented on a change in pull request #2653: HDDS-5550. EC: Adapt KeyInputStream to read EC Blocks

sodonnel commented on a change in pull request #2653:
URL: https://github.com/apache/ozone/pull/2653#discussion_r712356087



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
##########
@@ -19,60 +19,77 @@
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.NotImplementedException;
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
-import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
-import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.function.Function;
 
 /**
  * Class to read data from an EC Block Group.
  */
-public class ECBlockInputStream extends InputStream
-    implements Seekable, CanUnbuffer, ByteBufferReadable {
+public class ECBlockInputStream extends BlockExtendedInputStream {
 
-  private static final int EOF = -1;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ECBlockInputStream.class);
 
   private final ECReplicationConfig repConfig;
+  // TODO - HDDS-5741 - remove hardcoded value
+  private static final int HARDCODED_CHUNK_SIZE = 1024;
   private final int ecChunkSize;
-  private final BlockInputStreamProvider streamProvider;
+  private final BlockInputStreamFactory streamFactory;
   private final boolean verifyChecksum;
+  private final XceiverClientFactory xceiverClientFactory;
+  private final Function<BlockID, Pipeline> refreshFunction;
   private final OmKeyLocationInfo blockInfo;
   private final DatanodeDetails[] dataLocations;
   private final DatanodeDetails[] parityLocations;
-  private final BlockInputStream[] blockStreams;
+  private final BlockExtendedInputStream[] blockStreams;
   private final int maxLocations;
 
-  private int position = 0;
+  private long position = 0;
   private boolean closed = false;
 
+  public ECBlockInputStream(ECReplicationConfig repConfig,
+      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory, Function<BlockID,
+      Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
+
+    this(repConfig, HARDCODED_CHUNK_SIZE, blockInfo, verifyChecksum,
+        xceiverClientFactory, refreshFunction, streamFactory);
+  }
+
+  // TODO - HDDS-5741 - remove this constructor - ecChunkSize should not be
+  //        there
   public ECBlockInputStream(ECReplicationConfig repConfig, int ecChunkSize,
       OmKeyLocationInfo blockInfo, boolean verifyChecksum,
-      BlockInputStreamProvider streamProvider) {
+      XceiverClientFactory xceiverClientFactory, Function<BlockID,
+      Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
     this.repConfig = repConfig;
+    // TODO - HDDS-5741
     this.ecChunkSize = ecChunkSize;
     this.verifyChecksum = verifyChecksum;
     this.blockInfo = blockInfo;
-    this.streamProvider = streamProvider;
+    this.streamFactory = streamFactory;
+    this.xceiverClientFactory = xceiverClientFactory;
+    this.refreshFunction = refreshFunction;
     this.maxLocations = repConfig.getData() + repConfig.getParity();
     this.dataLocations = new DatanodeDetails[repConfig.getData()];
     this.parityLocations = new DatanodeDetails[repConfig.getParity()];
-    this.blockStreams = new BlockInputStream[repConfig.getData()];
+    this.blockStreams = new BlockExtendedInputStream[repConfig.getData()];

Review comment:
       My plan is to split out the reading logic in ECBLockInputStream and delegate calls to “ECBlockInputStreamRecovery and ECBlockInputStreamHealthy (named something like that).
   
   So there will be a class to do a recovery block read and a class to do a straight read, which is really what we have now, but perhaps extracted into another class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org