You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/10/08 07:48:50 UTC

[GitHub] [ozone] adoroszlai commented on a change in pull request #2337: HDDS-5265. Leverage getSmallFile in ChunkInputStream

adoroszlai commented on a change in pull request #2337:
URL: https://github.com/apache/ozone/pull/2337#discussion_r724779761



##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
##########
@@ -109,45 +111,83 @@
   private int chunkIndexOfPrevPosition;
 
   private final Function<BlockID, Pipeline> refreshPipelineFunction;
+  private boolean smallBlock = false;
+  private final OzoneClientConfig clientConfig;
 
+  @SuppressWarnings("parameternumber")
   public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
       Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
       XceiverClientFactory xceiverClientFactory,
-      Function<BlockID, Pipeline> refreshPipelineFunction) {
+      Function<BlockID, Pipeline> refreshPipelineFunction,
+      OzoneClientConfig clientConfig) {
     this.blockID = blockId;
     this.length = blockLen;
     this.pipeline = pipeline;
     this.token = token;
     this.verifyChecksum = verifyChecksum;
     this.xceiverClientFactory = xceiverClientFactory;
     this.refreshPipelineFunction = refreshPipelineFunction;
+    if (clientConfig != null) {
+      this.smallBlock = (length <= clientConfig.getSmallBlockThreshold());
+    }
+    this.clientConfig = clientConfig;
+  }
+
+  public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
+      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory,
+      Function<BlockID, Pipeline> refreshPipelineFunction) {
+    this(blockId, blockLen, pipeline, token, verifyChecksum,
+        xceiverClientFactory, refreshPipelineFunction, null);
   }
 
   public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
-                          Token<OzoneBlockTokenIdentifier> token,
-                          boolean verifyChecksum,
-                          XceiverClientFactory xceiverClientFactory
-  ) {
+      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory) {
     this(blockId, blockLen, pipeline, token, verifyChecksum,
-        xceiverClientFactory, null);
+        xceiverClientFactory, null, null);
   }
   /**
    * Initialize the BlockInputStream. Get the BlockData (list of chunks) from
    * the Container and create the ChunkInputStreams for each Chunk in the Block.
    */
   public synchronized void initialize() throws IOException {
-
-    // Pre-check that the stream has not been intialized already
+    // Pre-check that the stream has not been initialized already
     if (initialized) {
       return;
     }
 
+    // Initialize pipeline and client.
+    // irrespective of the container state, we will always read via Standalone
+    // protocol.
+    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+      pipeline = Pipeline.newBuilder(pipeline)
+          .setReplicationConfig(new StandaloneReplicationConfig(
+              ReplicationConfig
+                  .getLegacyFactor(pipeline.getReplicationConfig())))
+          .build();
+    }
+    acquireClient();
+
     List<ChunkInfo> chunks;
-    try {
-      chunks = getChunkInfos();
-    } catch (ContainerNotFoundException ioEx) {
-      refreshPipeline(ioEx);
-      chunks = getChunkInfos();
+    if (smallBlock) {
+      ChunkInfo chunkInfo = ChunkInfo.newBuilder()
+          .setChunkName(blockID.getLocalID() + "_chunk_" + (chunkIndex + 1))
+          .setOffset(0L).setLen(length)
+          .setChecksumData(ContainerProtos.ChecksumData.newBuilder()
+              .setBytesPerChecksum(clientConfig.getBytesPerChecksum())
+              .setType(clientConfig.getChecksumType())
+              .build())
+          .build();
+      chunks = new ArrayList<>();
+      chunks.add(chunkInfo);
+    } else {
+      try {
+        chunks = getChunkInfos();
+      } catch (ContainerNotFoundException ioEx) {
+        refreshPipeline(ioEx);

Review comment:
       @ChenSammi thanks for the update.  Now if pipeline is refreshed, new client gets created, but old client seems to be leaked.  I guess here we could simply call `handleReadError` instead of `refreshPipeline` to release old client.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org