You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/06/22 11:01:48 UTC

[GitHub] [ozone] adoroszlai commented on a change in pull request #2337: HDDS-5265. Leverage getSmallFile in ChunkInputStream

adoroszlai commented on a change in pull request #2337:
URL: https://github.com/apache/ozone/pull/2337#discussion_r655872160



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
##########
@@ -1315,7 +1315,7 @@ private OzoneInputStream createInputStream(
     if (feInfo == null) {
       LengthInputStream lengthInputStream = KeyInputStream
           .getFromOmKeyInfo(keyInfo, xceiverClientManager,
-              clientConfig.isChecksumVerify(), retryFunction);
+              clientConfig.isChecksumVerify(), retryFunction, clientConfig);

Review comment:
       Parameter `verifyChecksum` could be removed by getting it via `clientConfig.isChecksumVerify()` only in `BlockInputStream` constructor.

##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
##########
@@ -109,45 +111,83 @@
   private int chunkIndexOfPrevPosition;
 
   private final Function<BlockID, Pipeline> refreshPipelineFunction;
+  private boolean smallBlock = false;
+  private final OzoneClientConfig clientConfig;
 
+  @SuppressWarnings("parameternumber")
   public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
       Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
       XceiverClientFactory xceiverClientFactory,
-      Function<BlockID, Pipeline> refreshPipelineFunction) {
+      Function<BlockID, Pipeline> refreshPipelineFunction,
+      OzoneClientConfig clientConfig) {
     this.blockID = blockId;
     this.length = blockLen;
     this.pipeline = pipeline;
     this.token = token;
     this.verifyChecksum = verifyChecksum;
     this.xceiverClientFactory = xceiverClientFactory;
     this.refreshPipelineFunction = refreshPipelineFunction;
+    if (clientConfig != null) {
+      this.smallBlock = (length <= clientConfig.getSmallBlockThreshold());
+    }
+    this.clientConfig = clientConfig;
+  }
+
+  public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
+      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory,
+      Function<BlockID, Pipeline> refreshPipelineFunction) {
+    this(blockId, blockLen, pipeline, token, verifyChecksum,
+        xceiverClientFactory, refreshPipelineFunction, null);
   }
 
   public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
-                          Token<OzoneBlockTokenIdentifier> token,
-                          boolean verifyChecksum,
-                          XceiverClientFactory xceiverClientFactory
-  ) {
+      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory) {
     this(blockId, blockLen, pipeline, token, verifyChecksum,
-        xceiverClientFactory, null);
+        xceiverClientFactory, null, null);

Review comment:
       Can we pass `new OzoneClientConfig()` instead of `null`?  This way we can avoid null checks and use the default configuration seamlessly.

##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
##########
@@ -109,45 +111,83 @@
   private int chunkIndexOfPrevPosition;
 
   private final Function<BlockID, Pipeline> refreshPipelineFunction;
+  private boolean smallBlock = false;
+  private final OzoneClientConfig clientConfig;
 
+  @SuppressWarnings("parameternumber")
   public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
       Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
       XceiverClientFactory xceiverClientFactory,
-      Function<BlockID, Pipeline> refreshPipelineFunction) {
+      Function<BlockID, Pipeline> refreshPipelineFunction,
+      OzoneClientConfig clientConfig) {
     this.blockID = blockId;
     this.length = blockLen;
     this.pipeline = pipeline;
     this.token = token;
     this.verifyChecksum = verifyChecksum;
     this.xceiverClientFactory = xceiverClientFactory;
     this.refreshPipelineFunction = refreshPipelineFunction;
+    if (clientConfig != null) {
+      this.smallBlock = (length <= clientConfig.getSmallBlockThreshold());
+    }
+    this.clientConfig = clientConfig;
+  }
+
+  public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
+      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory,
+      Function<BlockID, Pipeline> refreshPipelineFunction) {
+    this(blockId, blockLen, pipeline, token, verifyChecksum,
+        xceiverClientFactory, refreshPipelineFunction, null);
   }
 
   public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
-                          Token<OzoneBlockTokenIdentifier> token,
-                          boolean verifyChecksum,
-                          XceiverClientFactory xceiverClientFactory
-  ) {
+      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory) {
     this(blockId, blockLen, pipeline, token, verifyChecksum,
-        xceiverClientFactory, null);
+        xceiverClientFactory, null, null);
   }
   /**
    * Initialize the BlockInputStream. Get the BlockData (list of chunks) from
    * the Container and create the ChunkInputStreams for each Chunk in the Block.
    */
   public synchronized void initialize() throws IOException {
-
-    // Pre-check that the stream has not been intialized already
+    // Pre-check that the stream has not been initialized already
     if (initialized) {
       return;
     }
 
+    // Initialize pipeline and client.
+    // irrespective of the container state, we will always read via Standalone
+    // protocol.
+    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+      pipeline = Pipeline.newBuilder(pipeline)
+          .setReplicationConfig(new StandaloneReplicationConfig(
+              ReplicationConfig
+                  .getLegacyFactor(pipeline.getReplicationConfig())))
+          .build();
+    }
+    acquireClient();
+
     List<ChunkInfo> chunks;
-    try {
-      chunks = getChunkInfos();
-    } catch (ContainerNotFoundException ioEx) {
-      refreshPipeline(ioEx);
-      chunks = getChunkInfos();
+    if (smallBlock) {
+      ChunkInfo chunkInfo = ChunkInfo.newBuilder()
+          .setChunkName(blockID.getLocalID() + "_chunk_" + (chunkIndex + 1))
+          .setOffset(0L).setLen(length)
+          .setChecksumData(ContainerProtos.ChecksumData.newBuilder()
+              .setBytesPerChecksum(clientConfig.getBytesPerChecksum())
+              .setType(clientConfig.getChecksumType())
+              .build())
+          .build();
+      chunks = new ArrayList<>();
+      chunks.add(chunkInfo);
+    } else {
+      try {
+        chunks = getChunkInfos();
+      } catch (ContainerNotFoundException ioEx) {
+        refreshPipeline(ioEx);

Review comment:
       `refreshPipeline` may update the pipeline, but client still references the old one.  New one may need to be acquired.  Also, new pipeline needs to be converted to standalone, too.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
##########
@@ -85,12 +87,22 @@
   public Timeout timeout = Timeout.seconds(300);
 
   @Parameterized.Parameters
-  public static Iterable<Object[]> parameters() {
-    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  public static Collection<Object[]> layouts() {
+    return Arrays.asList(new Object[][] {
+        {ChunkLayOutVersion.FILE_PER_CHUNK, 0},
+        {ChunkLayOutVersion.FILE_PER_CHUNK, BYTES_PER_CHECKSUM},
+        {ChunkLayOutVersion.FILE_PER_CHUNK, CHUNK_SIZE},
+        {ChunkLayOutVersion.FILE_PER_CHUNK, BLOCK_SIZE},
+        {ChunkLayOutVersion.FILE_PER_BLOCK, 0},
+        {ChunkLayOutVersion.FILE_PER_BLOCK, BYTES_PER_CHECKSUM},
+        {ChunkLayOutVersion.FILE_PER_BLOCK, CHUNK_SIZE},
+        {ChunkLayOutVersion.FILE_PER_BLOCK, BLOCK_SIZE}

Review comment:
       This increases integration test run time significantly (almost *an hour* for only `TestChunkInputStream` and `TestKeyInputStream` on my machine).  Are all these combinations really necessary?  Do all test cases depend on the value of `blockThreshold`?
   
   Nit: manually enumerating the parameter matrix seems too verbose.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org