You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/10/02 05:31:41 UTC

[GitHub] [ozone] fapifta opened a new pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

fapifta opened a new pull request #2702:
URL: https://github.com/apache/ozone/pull/2702


   ## What changes were proposed in this pull request?
   Accomodate ECBlockOutputStreamEntry to handle the streams required for data and parity blocks.
   Add changes to ECKeyoutputStream and ECBlockOutputStreamEntryPool, so that it uses entries as expected with this change.
   This change will help to utilize preallocations properly, and simplifies the OMKeyLocationInfo assembly during commitKey, but does not touch how we handle the write and buffering of chuncks in ECKeyoutputStream.
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/HDDS-5755
   
   ## How was this patch tested?
   Executed EC relevant tests, and new tests to check functionality.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722111946



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams =
+        new ECBlockOutputStream[nodes.size()];
+    for (int i = 0; i< getPipeline().getNodes().size(); i++) {
+      blockOutputStreams[i] = new ECBlockOutputStream(
+          getBlockID(),
+          getXceiverClientManager(),
+          createSingleECBlockPipeline(ecPipeline, nodes.get(i), i+1),
+          getBufferPool(),
+          getConf(),
+          getToken());
+    }
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      blockOutputStreams[i].flush();
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }

Review comment:
       Initialization became a two stage process here, In the base BlockOutputStreamEntry class, there is the checkStream() method, which is the only place we are calling createOutputStream(). The createOutputStream method in one go initializes data+parity ECBlockOutputStreams, and places them into the blockOutputStreams array.
   So that this class is initialized, if we have any non-null element in the blockOutputStreams array, therefore isInitialized checks if the first element of that array is non-null, and returns true if it is non-null.
   
   The ECBlockOutputStreams themselves are cheap to create, and unless anything is written to one, they do not initiate any communication towards the DataNodes, nor do anything at close or flush.
   With that I believe, we are fine initializing all the streams at once at createStream, and throw away some at the end without being used if we do not need it, this might be an optimization later, but I am unsure yet if we need that gain those microseconds at the time when we start writing after a block allocation, while we can spare the complexity it brings in if we instantiate the streams on demand.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r727504537



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+  private long length;
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+    this.length = replicationConfig.getData() * length;
+  }
+
+  @Override
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      blockOutputStreams =
+          new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+    }
+    if (blockOutputStreams[currentStreamIdx] == null) {
+      createOutputStream();
+    }
   }
 
   @Override
   void createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+        getBlockID(),
+        getXceiverClientManager(),
+        createSingleECBlockPipeline(
+            ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+        getBufferPool(),
+        getConf(),
+        getToken());
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    checkState(blockOutputStreams[currentStreamIdx] != null);
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
   }
 
-  public ECBlockOutputStream getOutputStream() {
-    return out;
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  public void resetToFirstEntry(){
+    currentStreamIdx = 0;
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      if (blockOutputStreams[i] != null) {
+        blockOutputStreams[i].flush();
+      }
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }
+    return blockStreams().allMatch(BlockOutputStream::isClosed);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for (ECBlockOutputStream stream : blockOutputStreams) {
+      if (stream != null) {
+        stream.close();
+      }
+    }
+    updateBlockID(underlyingBlockID());
+  }
+
+  @Override
+  long getTotalAckDataLength() {
+    if (!isInitialized()) {
+      return 0;
+    }
+    // blockID is the same for EC blocks inside one block group managed by
+    // this entry.
+    updateBlockID(underlyingBlockID());
+    //TODO: A future implementation might require something like this, but
+    // currently as ECBlockOutputStream is inheriting from BlockOutputStream
+    // this method returns 0 all the time from the unrelying streams.
+    // After we have a confirmed ack mechanism, like there is in
+    // RatisBlockOutputStream, we should revisit this part, and decide if we
+    // want to filter out parity here for example.

Review comment:
       I replaced the comment, and removed code from here... Also updated javadoc of the class, and moved the comment about blockID into underlyingBlockID()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao merged pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
umamaheswararao merged pull request #2702:
URL: https://github.com/apache/ozone/pull/2702


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722104338



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();

Review comment:
       ECBlockOutputStreamEntry will manage the individual streams. In order for it to be able to create the streams and also to handle preallocations we need to have the full EC Pipeline here, as that will provide us all the necessary data to create the internal single replica pipelines (with that the BlockOutputStreams) to write the data to, and to report the key location info back to OM at the end of the write during commitKey.
   
   I think there is only one change is necessary here, instead of getPipeline() in the next line I should have use the ecPipeline to reference the full pipeline, making it obvious where the nodes are coming from, and to differentiate from the single replica pipelines created by the createSingleECBlockPipeline() method which provides them to the internal block output streams 8 lines below from here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722005530



##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
##########
@@ -199,64 +191,6 @@ public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception {
     }
   }
 
-

Review comment:
       This test is not feasible in the new code structure.
   This test reaches out to ECBlockOutputStreams that were handling a single BlockOutputStreams earlier, and based on their Pipeline information it acquires a client from their XCeiverClientFactory, then checks if the acquired clients are distinct clients and none of them is equal to each other.
   In the new code sturcture, the ECBlockOutputStreamEntry knows about the full EC Pipeline, and manages the data and parity streams together. Individual pipelines and streams are hidden outside.
   
   The two new tests in the new ECBlockOutputStreamEntryTest class is entitled to test for the same, those tests are there to ensure that the Pipelines that are created by the ECBlockOutputStreamEntry#createSingleECBlockPipeline method are distinct in case the DataNodeDetails defines the same host but different port for EC replication, and I also added a test that ensures that if the port is also the same, then we reuse the client (the second test).
   
   I believe this was important for tests where we need to ensure the tests work in a MiniCluster as well, where the DataNodeDetails differ in just the DataNode port, while in the Ratis replication case it was not really important as we do not differentiate blocks there while in EC we need to know the order of data blocks and parity, and we also need to communicate with all nodes and we can not share a client between them properly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#issuecomment-941237977


   As we discussed today, it seems that the ECBlockInputStream change is unnecessary, and shed light on a bug in the mock cluster side in case of mock based tests, where the original pipeline we got from the allocateBlock, did not contain replicaIndicies in the pipeline, while in a real cluster it is there for EC.
   I have removed the check from ECBlockInputStream, and added the indexes to the pipeline in the last commit.
   
   @umamaheswararao as we discussed probably this is the last change necessary, but please kindly do the review and let me know if you see anything else with the current patch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r727476347



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+  private long length;
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+    this.length = replicationConfig.getData() * length;
+  }
+
+  @Override
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      blockOutputStreams =
+          new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+    }
+    if (blockOutputStreams[currentStreamIdx] == null) {
+      createOutputStream();
+    }
   }
 
   @Override
   void createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+        getBlockID(),
+        getXceiverClientManager(),
+        createSingleECBlockPipeline(
+            ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+        getBufferPool(),
+        getConf(),
+        getToken());
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    checkState(blockOutputStreams[currentStreamIdx] != null);
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
   }
 
-  public ECBlockOutputStream getOutputStream() {
-    return out;
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  public void resetToFirstEntry(){
+    currentStreamIdx = 0;
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }

Review comment:
       Not sure what do you mean?

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+  private long length;
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+    this.length = replicationConfig.getData() * length;
+  }
+
+  @Override
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      blockOutputStreams =
+          new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+    }
+    if (blockOutputStreams[currentStreamIdx] == null) {
+      createOutputStream();
+    }
   }
 
   @Override
   void createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+        getBlockID(),
+        getXceiverClientManager(),
+        createSingleECBlockPipeline(
+            ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+        getBufferPool(),
+        getConf(),
+        getToken());
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    checkState(blockOutputStreams[currentStreamIdx] != null);
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
   }
 
-  public ECBlockOutputStream getOutputStream() {
-    return out;
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  public void resetToFirstEntry(){
+    currentStreamIdx = 0;
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }

Review comment:
       ah... spaces... though this way it requires wrapping... I was hesitant where to wrap though to help readability... :)

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+  private long length;
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+    this.length = replicationConfig.getData() * length;
+  }
+
+  @Override
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      blockOutputStreams =
+          new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+    }
+    if (blockOutputStreams[currentStreamIdx] == null) {
+      createOutputStream();
+    }
   }
 
   @Override
   void createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+        getBlockID(),
+        getXceiverClientManager(),
+        createSingleECBlockPipeline(
+            ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+        getBufferPool(),
+        getConf(),
+        getToken());
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    checkState(blockOutputStreams[currentStreamIdx] != null);
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
   }
 
-  public ECBlockOutputStream getOutputStream() {
-    return out;
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  public void resetToFirstEntry(){
+    currentStreamIdx = 0;
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      if (blockOutputStreams[i] != null) {
+        blockOutputStreams[i].flush();
+      }
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }
+    return blockStreams().allMatch(BlockOutputStream::isClosed);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for (ECBlockOutputStream stream : blockOutputStreams) {
+      if (stream != null) {
+        stream.close();
+      }
+    }
+    updateBlockID(underlyingBlockID());
+  }
+
+  @Override
+  long getTotalAckDataLength() {
+    if (!isInitialized()) {
+      return 0;
+    }
+    // blockID is the same for EC blocks inside one block group managed by
+    // this entry.
+    updateBlockID(underlyingBlockID());
+    //TODO: A future implementation might require something like this, but
+    // currently as ECBlockOutputStream is inheriting from BlockOutputStream
+    // this method returns 0 all the time from the unrelying streams.
+    // After we have a confirmed ack mechanism, like there is in
+    // RatisBlockOutputStream, we should revisit this part, and decide if we
+    // want to filter out parity here for example.

Review comment:
       I replaced the comment, and removed code from here... Also updated javadoc of the class, and moved the comment about blockID into underlyingBlockID()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r727503526



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+  private long length;
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+    this.length = replicationConfig.getData() * length;
+  }
+
+  @Override
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      blockOutputStreams =
+          new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+    }
+    if (blockOutputStreams[currentStreamIdx] == null) {
+      createOutputStream();
+    }
   }
 
   @Override
   void createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+        getBlockID(),
+        getXceiverClientManager(),
+        createSingleECBlockPipeline(
+            ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+        getBufferPool(),
+        getConf(),
+        getToken());
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    checkState(blockOutputStreams[currentStreamIdx] != null);
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
   }
 
-  public ECBlockOutputStream getOutputStream() {
-    return out;
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  public void resetToFirstEntry(){
+    currentStreamIdx = 0;
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }

Review comment:
       ah... spaces... though this way it requires wrapping... I was hesitant where to wrap though to help readability... :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722115875



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams =
+        new ECBlockOutputStream[nodes.size()];
+    for (int i = 0; i< getPipeline().getNodes().size(); i++) {
+      blockOutputStreams[i] = new ECBlockOutputStream(
+          getBlockID(),
+          getXceiverClientManager(),
+          createSingleECBlockPipeline(ecPipeline, nodes.get(i), i+1),
+          getBufferPool(),
+          getConf(),
+          getToken());
+    }
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      blockOutputStreams[i].flush();
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }
+    return blockStreams().allMatch(BlockOutputStream::isClosed);
   }
 
   @Override
-  ECBlockOutputStream createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
-    return this.out;
+  public void close() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for (ECBlockOutputStream stream : blockOutputStreams) {
+      stream.close();
+    }
+    updateBlockID(underlyingBlockID());
+  }
+
+  @Override
+  long getTotalAckDataLength() {
+    if (!isInitialized()) {
+      return 0;
+    }
+    // blockID is the same for EC blocks inside one block group managed by
+    // this entry.
+    updateBlockID(underlyingBlockID());
+    //TODO: A future implementation might require something like this, but
+    // currently as ECBlockOutputStream is inheriting from BlockOutputStream
+    // this method returns 0 all the time from the unrelying streams.
+    // After we have a confirmed ack mechanism, like there is in
+    // RatisBlockOutputStream, we should revisit this part, and decide if we
+    // want to filter out parity here for example.
+//    return blockStreams()
+//        .mapToLong(BlockOutputStream::getTotalAckDataLength)
+//        .sum();

Review comment:
       ECBlockOutputStream extends and extended BlockOutputStream before. Earlier ECBlockOutputStream did not override this method, so the method in BlockOutputStream was called. That method returned 0 hardwired.
   RatisBlockOutputStream on the other hand maintains the acked data length via CommitWatcher, and returns that.
   
   Our EC logic does not hold or check for any ack from the DNs via the grpc client, and does not maintain the acked legth so far, and it seems to be safer for now to just return 0 instead of calling the method of underlying streams, as we may not do this via those streams later on. I have added the naive implementation for discussion. I am unsure if the acked length should account for the parity block acks or for just the data block acks either.
   And in any case, even if we sum the results, we will get 0 anyways, but it is easier to spot this way that the logic is not yet developed to calculate the acked length. Did I miss something?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r727476347



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+  private long length;
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+    this.length = replicationConfig.getData() * length;
+  }
+
+  @Override
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      blockOutputStreams =
+          new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+    }
+    if (blockOutputStreams[currentStreamIdx] == null) {
+      createOutputStream();
+    }
   }
 
   @Override
   void createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+        getBlockID(),
+        getXceiverClientManager(),
+        createSingleECBlockPipeline(
+            ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+        getBufferPool(),
+        getConf(),
+        getToken());
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    checkState(blockOutputStreams[currentStreamIdx] != null);
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
   }
 
-  public ECBlockOutputStream getOutputStream() {
-    return out;
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  public void resetToFirstEntry(){
+    currentStreamIdx = 0;
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }

Review comment:
       Not sure what do you mean?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r726855169



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams =
+        new ECBlockOutputStream[nodes.size()];
+    for (int i = 0; i< getPipeline().getNodes().size(); i++) {
+      blockOutputStreams[i] = new ECBlockOutputStream(
+          getBlockID(),
+          getXceiverClientManager(),
+          createSingleECBlockPipeline(ecPipeline, nodes.get(i), i+1),
+          getBufferPool(),
+          getConf(),
+          getToken());
+    }
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      blockOutputStreams[i].flush();
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }

Review comment:
       as we discussed live, the client if it is created then it is connecting as well, I have added the lazy initialization logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722104338



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();

Review comment:
       ECBlockOutputStreamEntry will manage the individual streams. In order for it to be able to create the streams and also to handle preallocations we need to have the full EC Pipeline here, as that will provide us all the necessary data to create the internal single replica pipelines (with that the BlockOutputStreams) to write the data to, and to report the key location info back to OM at the end of the write during commitKey.
   
   I think there are 1 change is necessary here, instread of getPipeline() in the next line I should have use the ecPipeline to reference the full pipeline, making it obvious where the nodes are coming from, and to differentiate from the single replica pipelines created by the createSingleECBlockPipeline() method which provides them to the internal block output streams 8 lines below from here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722097625



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
##########
@@ -283,26 +338,21 @@ void commitKey(long offset) throws IOException {
     }
   }
 
-  public BlockOutputStreamEntry getCurrentStreamEntry() {
+  BlockOutputStreamEntry getCurrentStreamEntry() {
     if (streamEntries.isEmpty() || streamEntries.size() <= currentStreamIndex) {
       return null;
     } else {
       return streamEntries.get(currentStreamIndex);
     }
   }
 
-  public int getCurrIdx(){
-    return currentStreamIndex;
-  }
-
-  public void setCurrIdx(int currIdx) {
-    this.currentStreamIndex = currIdx;
-  }
-
-  public void updateToNextStream(int rotation) {
-    currentStreamIndex = (currentStreamIndex + 1) % rotation;
-  }
-
+  /**
+   * Allocates a new block with OM, currently ECKeyOutputStream is an exclusive

Review comment:
       I have corrected this, and also removed the EC related notes and descriptions in #2709 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#issuecomment-942977949


   Thanks for updating the patch. Latest patch looks good to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r726790072



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+  private long length;
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+    this.length = replicationConfig.getData() * length;
+  }
+
+  @Override
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      blockOutputStreams =
+          new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+    }
+    if (blockOutputStreams[currentStreamIdx] == null) {
+      createOutputStream();
+    }
   }
 
   @Override
   void createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+        getBlockID(),
+        getXceiverClientManager(),
+        createSingleECBlockPipeline(
+            ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+        getBufferPool(),
+        getConf(),
+        getToken());
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    checkState(blockOutputStreams[currentStreamIdx] != null);
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
   }
 
-  public ECBlockOutputStream getOutputStream() {
-    return out;
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  public void resetToFirstEntry(){
+    currentStreamIdx = 0;
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      if (blockOutputStreams[i] != null) {
+        blockOutputStreams[i].flush();
+      }
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }
+    return blockStreams().allMatch(BlockOutputStream::isClosed);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for (ECBlockOutputStream stream : blockOutputStreams) {
+      if (stream != null) {
+        stream.close();
+      }
+    }
+    updateBlockID(underlyingBlockID());
+  }
+
+  @Override
+  long getTotalAckDataLength() {
+    if (!isInitialized()) {
+      return 0;
+    }
+    // blockID is the same for EC blocks inside one block group managed by
+    // this entry.
+    updateBlockID(underlyingBlockID());
+    //TODO: A future implementation might require something like this, but
+    // currently as ECBlockOutputStream is inheriting from BlockOutputStream
+    // this method returns 0 all the time from the unrelying streams.
+    // After we have a confirmed ack mechanism, like there is in
+    // RatisBlockOutputStream, we should revisit this part, and decide if we
+    // want to filter out parity here for example.

Review comment:
       Is this commented code intentionally left? Please check comments format

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+  private long length;
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+    this.length = replicationConfig.getData() * length;
+  }
+
+  @Override
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      blockOutputStreams =
+          new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+    }
+    if (blockOutputStreams[currentStreamIdx] == null) {
+      createOutputStream();
+    }
   }
 
   @Override
   void createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+        getBlockID(),
+        getXceiverClientManager(),
+        createSingleECBlockPipeline(
+            ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+        getBufferPool(),
+        getConf(),
+        getToken());
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    checkState(blockOutputStreams[currentStreamIdx] != null);
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
   }
 
-  public ECBlockOutputStream getOutputStream() {
-    return out;
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  public void resetToFirstEntry(){
+    currentStreamIdx = 0;
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }

Review comment:
       Below code lost formatting?

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams =
+        new ECBlockOutputStream[nodes.size()];
+    for (int i = 0; i< getPipeline().getNodes().size(); i++) {
+      blockOutputStreams[i] = new ECBlockOutputStream(
+          getBlockID(),
+          getXceiverClientManager(),
+          createSingleECBlockPipeline(ecPipeline, nodes.get(i), i+1),
+          getBufferPool(),
+          getConf(),
+          getToken());
+    }
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      blockOutputStreams[i].flush();
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }

Review comment:
       After the last offline discussion, we agreed to implement lazy initialization. So, when partial stripe case, there is a still chance we don;t initailize all streams and hence they don't need to match isClosed call. So, should we caplicitly take care of required streams only matching this condition?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#issuecomment-941237977


   As we discussed today, it seems that the ECBlockInputStream change is unnecessary, and shed light on a bug in the mock cluster side in case of mock based tests, where the original pipeline we got from the allocateBlock, did not contain replicaIndicies in the pipeline, while in a real cluster it is there for EC.
   I have removed the check from ECBlockInputStream, and added the indexes to the pipeline in the last commit.
   
   @umamaheswararao as we discussed probably this is the last change necessary, but please kindly do the review and let me know if you see anything else with the current patch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722027658



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
##########
@@ -27,7 +27,6 @@
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FSExceptionMessages;

Review comment:
       Added https://issues.apache.org/jira/browse/HDDS-5815 for this part, and will remove it from this PR.
   The PR for HDDS-5815 is 2708.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r727484782



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+  private long length;
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+    this.length = replicationConfig.getData() * length;
+  }
+
+  @Override
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      blockOutputStreams =
+          new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+    }
+    if (blockOutputStreams[currentStreamIdx] == null) {
+      createOutputStream();
+    }
   }
 
   @Override
   void createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+        getBlockID(),
+        getXceiverClientManager(),
+        createSingleECBlockPipeline(
+            ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+        getBufferPool(),
+        getConf(),
+        getToken());
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    checkState(blockOutputStreams[currentStreamIdx] != null);
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
   }
 
-  public ECBlockOutputStream getOutputStream() {
-    return out;
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  public void resetToFirstEntry(){
+    currentStreamIdx = 0;
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }

Review comment:
       for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
   This line required code format.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r726855867



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();

Review comment:
       resolving as we discussed this, and we agreed that this one is ok and should be this way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722094936



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
##########
@@ -36,7 +36,14 @@
 import com.google.common.annotations.VisibleForTesting;
 
 /**

Review comment:
       I have consolidated the changes that can go into master under HDDS-5816. PR for that is up under #2709, I am preparing this PR to be based on those changes, so that we will need to wait until the changes are merged in master, and then merged to the base EC branch though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r721780841



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();

Review comment:
       I think we don;t need to pass full pipeline. WE can just create single node pipeline for that stream.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722027658



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
##########
@@ -27,7 +27,6 @@
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FSExceptionMessages;

Review comment:
       Added https://issues.apache.org/jira/browse/HDDS-5815 for this part, and will remove it from this PR.
   The PR for HDDS-5815 is #2708.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722095883



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
##########
@@ -156,10 +180,22 @@ void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
             .setConfig(config)
             .setLength(subKeyInfo.getLength())
             .setBufferPool(bufferPool)
-            .setToken(subKeyInfo.getToken());
-    streamEntries.add(builder.build());
+            .setToken(subKeyInfo.getToken())
+            .build();
+
+  }
+
+  private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+    Preconditions.checkNotNull(subKeyInfo.getPipeline());
+    streamEntries.add(createStreamEntry(subKeyInfo));
   }
 
+  /**

Review comment:
       They are added through #2709, I will be removing these from this PR, consequence is that this PR will be ready for review/merge after the master has the changes and those are merged to the base EC branch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r722117432



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams =
+        new ECBlockOutputStream[nodes.size()];
+    for (int i = 0; i< getPipeline().getNodes().size(); i++) {
+      blockOutputStreams[i] = new ECBlockOutputStream(
+          getBlockID(),
+          getXceiverClientManager(),
+          createSingleECBlockPipeline(ecPipeline, nodes.get(i), i+1),
+          getBufferPool(),
+          getConf(),
+          getToken());
+    }
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      blockOutputStreams[i].flush();
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }
+    return blockStreams().allMatch(BlockOutputStream::isClosed);
   }
 
   @Override
-  ECBlockOutputStream createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
-    return this.out;
+  public void close() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for (ECBlockOutputStream stream : blockOutputStreams) {
+      stream.close();
+    }
+    updateBlockID(underlyingBlockID());
+  }
+
+  @Override
+  long getTotalAckDataLength() {
+    if (!isInitialized()) {
+      return 0;
+    }
+    // blockID is the same for EC blocks inside one block group managed by
+    // this entry.
+    updateBlockID(underlyingBlockID());
+    //TODO: A future implementation might require something like this, but
+    // currently as ECBlockOutputStream is inheriting from BlockOutputStream
+    // this method returns 0 all the time from the unrelying streams.
+    // After we have a confirmed ack mechanism, like there is in
+    // RatisBlockOutputStream, we should revisit this part, and decide if we
+    // want to filter out parity here for example.
+//    return blockStreams()
+//        .mapToLong(BlockOutputStream::getTotalAckDataLength)
+//        .sum();
+    return 0;
+  }
+
+  /**
+   * Returns the amount of bytes that were attempted to be sent through towards
+   * the DataNodes, and the write call succeeded without an exception.
+   * In EC entries the parity writes does not count into this, as the written
+   * data length represents the attempts of the classes using the entry, and
+   * not the attempts of the entry itself.
+   * @return 0 if the stream is not initialized, the amount of data bytes that
+   *    were attempted to be written to the entry.
+   */
+  //TODO: this might become problematic, and should be tested during the
+  //      implementation of retries and error handling, as if there is a retry,
+  //      then some data might have to be written twice.
+  //      This current implementation is an assumption here.
+  //      We might need to account the parity bytes written here, or elsewhere.
+  @Override
+  long getWrittenDataLength() {
+    if (!isInitialized()) {
+      return 0;
+    }
+    return blockStreams()

Review comment:
       Next line limits the functional stream to just go over the data blocks. As the createStream logic creates all the streams for this blockGroup, it is safe to sum up the data length for the first replicationConfig.getData() streams in the array I believe. Do I misunderstand your question, or the contract of this method in the EC case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r721766624



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
##########
@@ -283,26 +338,21 @@ void commitKey(long offset) throws IOException {
     }
   }
 
-  public BlockOutputStreamEntry getCurrentStreamEntry() {
+  BlockOutputStreamEntry getCurrentStreamEntry() {
     if (streamEntries.isEmpty() || streamEntries.size() <= currentStreamIndex) {
       return null;
     } else {
       return streamEntries.get(currentStreamIndex);
     }
   }
 
-  public int getCurrIdx(){
-    return currentStreamIndex;
-  }
-
-  public void setCurrIdx(int currIdx) {
-    this.currentStreamIndex = currIdx;
-  }
-
-  public void updateToNextStream(int rotation) {
-    currentStreamIndex = (currentStreamIndex + 1) % rotation;
-  }
-
+  /**
+   * Allocates a new block with OM, currently ECKeyOutputStream is an exclusive

Review comment:
       KeyOutputStream also uses this method. I would to suggest to avoid this doc here. If you wanted to improve the docs, let's file a master JIRA and improve. Otherwise we will have more diff compared to master and need to deal merge conflicts. So, we are following that if anything qualifies for master, we are just filing master JIRAs and committing there.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
##########
@@ -199,64 +191,6 @@ public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception {
     }
   }
 
-

Review comment:
       Why we removed this test?

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
##########
@@ -36,7 +36,14 @@
 import com.google.common.annotations.VisibleForTesting;
 
 /**

Review comment:
       On a high level, lot of changes in this file can go into master. So, I would suggest to do minimal changes. All EC specific to go into ECClasses. EC specific javadoc explanation can go into ECBlockoutputStreaEntry.

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams =
+        new ECBlockOutputStream[nodes.size()];
+    for (int i = 0; i< getPipeline().getNodes().size(); i++) {
+      blockOutputStreams[i] = new ECBlockOutputStream(
+          getBlockID(),
+          getXceiverClientManager(),
+          createSingleECBlockPipeline(ecPipeline, nodes.get(i), i+1),
+          getBufferPool(),
+          getConf(),
+          getToken());
+    }
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      blockOutputStreams[i].flush();
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }
+    return blockStreams().allMatch(BlockOutputStream::isClosed);
   }
 
   @Override
-  ECBlockOutputStream createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
-    return this.out;
+  public void close() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for (ECBlockOutputStream stream : blockOutputStreams) {
+      stream.close();
+    }
+    updateBlockID(underlyingBlockID());
+  }
+
+  @Override
+  long getTotalAckDataLength() {
+    if (!isInitialized()) {
+      return 0;
+    }
+    // blockID is the same for EC blocks inside one block group managed by
+    // this entry.
+    updateBlockID(underlyingBlockID());
+    //TODO: A future implementation might require something like this, but
+    // currently as ECBlockOutputStream is inheriting from BlockOutputStream
+    // this method returns 0 all the time from the unrelying streams.
+    // After we have a confirmed ack mechanism, like there is in
+    // RatisBlockOutputStream, we should revisit this part, and decide if we
+    // want to filter out parity here for example.
+//    return blockStreams()
+//        .mapToLong(BlockOutputStream::getTotalAckDataLength)
+//        .sum();
+    return 0;
+  }
+
+  /**
+   * Returns the amount of bytes that were attempted to be sent through towards
+   * the DataNodes, and the write call succeeded without an exception.
+   * In EC entries the parity writes does not count into this, as the written
+   * data length represents the attempts of the classes using the entry, and
+   * not the attempts of the entry itself.
+   * @return 0 if the stream is not initialized, the amount of data bytes that
+   *    were attempted to be written to the entry.
+   */
+  //TODO: this might become problematic, and should be tested during the
+  //      implementation of retries and error handling, as if there is a retry,
+  //      then some data might have to be written twice.
+  //      This current implementation is an assumption here.
+  //      We might need to account the parity bytes written here, or elsewhere.
+  @Override
+  long getWrittenDataLength() {
+    if (!isInitialized()) {
+      return 0;
+    }
+    return blockStreams()

Review comment:
       Where are we skipping parity blocks length in blogGroup Len?

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
##########
@@ -27,7 +27,6 @@
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FSExceptionMessages;

Review comment:
       Why do we need to change about FileEncryption changes here? Same as above: can they qualify into master?

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();

Review comment:
       I think we don;t need to pass fill pipeline. WE can just create single node pipeline for that stream.

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
##########
@@ -1071,7 +1071,7 @@ public OzoneOutputStream createMultipartKey(String volumeName,
     keyOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),
         openKey.getOpenVersion());

Review comment:
       We may consider this changes to master?

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams =
+        new ECBlockOutputStream[nodes.size()];
+    for (int i = 0; i< getPipeline().getNodes().size(); i++) {
+      blockOutputStreams[i] = new ECBlockOutputStream(
+          getBlockID(),
+          getXceiverClientManager(),
+          createSingleECBlockPipeline(ecPipeline, nodes.get(i), i+1),
+          getBufferPool(),
+          getConf(),
+          getToken());
+    }
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      blockOutputStreams[i].flush();
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }
+    return blockStreams().allMatch(BlockOutputStream::isClosed);
   }
 
   @Override
-  ECBlockOutputStream createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
-    return this.out;
+  public void close() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for (ECBlockOutputStream stream : blockOutputStreams) {
+      stream.close();
+    }
+    updateBlockID(underlyingBlockID());
+  }
+
+  @Override
+  long getTotalAckDataLength() {
+    if (!isInitialized()) {
+      return 0;
+    }
+    // blockID is the same for EC blocks inside one block group managed by
+    // this entry.
+    updateBlockID(underlyingBlockID());
+    //TODO: A future implementation might require something like this, but
+    // currently as ECBlockOutputStream is inheriting from BlockOutputStream
+    // this method returns 0 all the time from the unrelying streams.
+    // After we have a confirmed ack mechanism, like there is in
+    // RatisBlockOutputStream, we should revisit this part, and decide if we
+    // want to filter out parity here for example.
+//    return blockStreams()
+//        .mapToLong(BlockOutputStream::getTotalAckDataLength)
+//        .sum();

Review comment:
       Why are we returning 0?

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
##########
@@ -156,10 +180,22 @@ void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
             .setConfig(config)
             .setLength(subKeyInfo.getLength())
             .setBufferPool(bufferPool)
-            .setToken(subKeyInfo.getToken());
-    streamEntries.add(builder.build());
+            .setToken(subKeyInfo.getToken())
+            .build();
+
+  }
+
+  private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+    Preconditions.checkNotNull(subKeyInfo.getPipeline());
+    streamEntries.add(createStreamEntry(subKeyInfo));
   }
 
+  /**

Review comment:
       Looks like lot of these java doc improvements should go to master. 

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams =
+        new ECBlockOutputStream[nodes.size()];
+    for (int i = 0; i< getPipeline().getNodes().size(); i++) {
+      blockOutputStreams[i] = new ECBlockOutputStream(
+          getBlockID(),
+          getXceiverClientManager(),
+          createSingleECBlockPipeline(ecPipeline, nodes.get(i), i+1),
+          getBufferPool(),
+          getConf(),
+          getToken());
+    }
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      blockOutputStreams[i].flush();
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }

Review comment:
       What if some streams not initialized?
   In the case when file hase 1MB, we will not intialize 2nd and 3rd streams as we don;t write anything there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r726856435



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams =
+        new ECBlockOutputStream[nodes.size()];
+    for (int i = 0; i< getPipeline().getNodes().size(); i++) {
+      blockOutputStreams[i] = new ECBlockOutputStream(
+          getBlockID(),
+          getXceiverClientManager(),
+          createSingleECBlockPipeline(ecPipeline, nodes.get(i), i+1),
+          getBufferPool(),
+          getConf(),
+          getToken());
+    }
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      blockOutputStreams[i].flush();
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }
+    return blockStreams().allMatch(BlockOutputStream::isClosed);
   }
 
   @Override
-  ECBlockOutputStream createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
-    return this.out;
+  public void close() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for (ECBlockOutputStream stream : blockOutputStreams) {
+      stream.close();
+    }
+    updateBlockID(underlyingBlockID());
+  }
+
+  @Override
+  long getTotalAckDataLength() {
+    if (!isInitialized()) {
+      return 0;
+    }
+    // blockID is the same for EC blocks inside one block group managed by
+    // this entry.
+    updateBlockID(underlyingBlockID());
+    //TODO: A future implementation might require something like this, but
+    // currently as ECBlockOutputStream is inheriting from BlockOutputStream
+    // this method returns 0 all the time from the unrelying streams.
+    // After we have a confirmed ack mechanism, like there is in
+    // RatisBlockOutputStream, we should revisit this part, and decide if we
+    // want to filter out parity here for example.
+//    return blockStreams()
+//        .mapToLong(BlockOutputStream::getTotalAckDataLength)
+//        .sum();

Review comment:
       we cleared this one as well during our dicussion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#discussion_r726790072



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+  private long length;
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+    this.length = replicationConfig.getData() * length;
+  }
+
+  @Override
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      blockOutputStreams =
+          new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+    }
+    if (blockOutputStreams[currentStreamIdx] == null) {
+      createOutputStream();
+    }
   }
 
   @Override
   void createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+        getBlockID(),
+        getXceiverClientManager(),
+        createSingleECBlockPipeline(
+            ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+        getBufferPool(),
+        getConf(),
+        getToken());
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    checkState(blockOutputStreams[currentStreamIdx] != null);
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
   }
 
-  public ECBlockOutputStream getOutputStream() {
-    return out;
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  public void resetToFirstEntry(){
+    currentStreamIdx = 0;
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      if (blockOutputStreams[i] != null) {
+        blockOutputStreams[i].flush();
+      }
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }
+    return blockStreams().allMatch(BlockOutputStream::isClosed);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for (ECBlockOutputStream stream : blockOutputStreams) {
+      if (stream != null) {
+        stream.close();
+      }
+    }
+    updateBlockID(underlyingBlockID());
+  }
+
+  @Override
+  long getTotalAckDataLength() {
+    if (!isInitialized()) {
+      return 0;
+    }
+    // blockID is the same for EC blocks inside one block group managed by
+    // this entry.
+    updateBlockID(underlyingBlockID());
+    //TODO: A future implementation might require something like this, but
+    // currently as ECBlockOutputStream is inheriting from BlockOutputStream
+    // this method returns 0 all the time from the unrelying streams.
+    // After we have a confirmed ack mechanism, like there is in
+    // RatisBlockOutputStream, we should revisit this part, and decide if we
+    // want to filter out parity here for example.

Review comment:
       Is this commented code intentionally left? Please check comments format

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+  private long length;
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+    this.length = replicationConfig.getData() * length;
+  }
+
+  @Override
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      blockOutputStreams =
+          new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+    }
+    if (blockOutputStreams[currentStreamIdx] == null) {
+      createOutputStream();
+    }
   }
 
   @Override
   void createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+        getBlockID(),
+        getXceiverClientManager(),
+        createSingleECBlockPipeline(
+            ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+        getBufferPool(),
+        getConf(),
+        getToken());
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    checkState(blockOutputStreams[currentStreamIdx] != null);
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
   }
 
-  public ECBlockOutputStream getOutputStream() {
-    return out;
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  public void resetToFirstEntry(){
+    currentStreamIdx = 0;
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }

Review comment:
       Below code lost formatting?

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,53 +17,259 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    Preconditions.assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+  }
+
+  @Override
+  void createOutputStream() throws IOException {
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams =
+        new ECBlockOutputStream[nodes.size()];
+    for (int i = 0; i< getPipeline().getNodes().size(); i++) {
+      blockOutputStreams[i] = new ECBlockOutputStream(
+          getBlockID(),
+          getXceiverClientManager(),
+          createSingleECBlockPipeline(ecPipeline, nodes.get(i), i+1),
+          getBufferPool(),
+          getConf(),
+          getToken());
+    }
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }
+    for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
+      blockOutputStreams[i].flush();
+    }
+  }
+
+  @Override
+  boolean isClosed() {
+    if (!isInitialized()) {
+      return false;
+    }

Review comment:
       After the last offline discussion, we agreed to implement lazy initialization. So, when partial stripe case, there is a still chance we don;t initailize all streams and hence they don't need to match isClosed call. So, should we caplicitly take care of required streams only matching this condition?

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
##########
@@ -17,56 +17,296 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.ratis.util.Preconditions.assertInstanceOf;
 
 /**
  * Helper for {@link ECBlockOutputStream}.
  */
 public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
-  private final boolean isParityStreamEntry;
-  private ECBlockOutputStream out;
+  private ECBlockOutputStream[] blockOutputStreams;
+  private final ECReplicationConfig replicationConfig;
+  private Map<DatanodeDetails, Integer> replicaIndicies = new HashMap<>();
+  private long length;
+
+  private int currentStreamIdx = 0;
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, boolean isParityStream) {
+      OzoneClientConfig config) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
         token, config);
-    this.isParityStreamEntry = isParityStream;
+    assertInstanceOf(
+        pipeline.getReplicationConfig(), ECReplicationConfig.class);
+    this.replicationConfig =
+        (ECReplicationConfig) pipeline.getReplicationConfig();
+    this.length = replicationConfig.getData() * length;
+  }
+
+  @Override
+  void checkStream() throws IOException {
+    if (!isInitialized()) {
+      blockOutputStreams =
+          new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
+    }
+    if (blockOutputStreams[currentStreamIdx] == null) {
+      createOutputStream();
+    }
   }
 
   @Override
   void createOutputStream() throws IOException {
-    this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
-        getPipeline(), getBufferPool(), getConf(), getToken());
+    Pipeline ecPipeline = getPipeline();
+    List<DatanodeDetails> nodes = getPipeline().getNodes();
+    blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
+        getBlockID(),
+        getXceiverClientManager(),
+        createSingleECBlockPipeline(
+            ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
+        getBufferPool(),
+        getConf(),
+        getToken());
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    if (!isInitialized()) {
+      return null;
+    }
+    checkState(blockOutputStreams[currentStreamIdx] != null);
+    return blockOutputStreams[currentStreamIdx];
+  }
+
+  @Override
+  boolean isInitialized() {
+    return blockOutputStreams != null;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
   }
 
-  public ECBlockOutputStream getOutputStream() {
-    return out;
+  public int getCurrentStreamIdx() {
+    return currentStreamIdx;
+  }
+
+  public void useNextBlockStream() {
+    currentStreamIdx++;
+  }
+
+  public void forceToFirstParityBlock(){
+    currentStreamIdx = replicationConfig.getData();
+  }
+
+  public void resetToFirstEntry(){
+    currentStreamIdx = 0;
+  }
+
+  @Override
+  void incCurrentPosition() {
+    if (isWritingParity()) {
+      return;
+    }
+    super.incCurrentPosition();
+  }
+
+  @Override
+  void incCurrentPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    super.incCurrentPosition(len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!isInitialized()) {
+      return;
+    }

Review comment:
       for(int i=0; i<=currentStreamIdx && i<blockOutputStreams.length; i++) {
   This line required code format.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on pull request #2702: HDDS-5755 EC: Refactor ECBlockOutputStreamEntry to accommodate all block group related ECBlockOuputStreams.

Posted by GitBox <gi...@apache.org>.
fapifta commented on pull request #2702:
URL: https://github.com/apache/ozone/pull/2702#issuecomment-932686941


   @umamaheswararao you can take a look to the current state if you wish, I marked this as draft since there are still some junit tests I would like to add to it, and some things will require some polishing.
   Here is what I am tracking:
   - checkstyle for sure will fail, as there are some long lines remained
   - ECBlockoutputstreamEntry#getTotalAckedDataLenght and ECBlockoutputStreamEntry#getWrittenDataLength does not consider parity, which I believed should fail some tests, but it did not, hence I would like to add some new tests before touching that (if you have any input on that feel free to let me know)
   - Some method reordering and readability changes will come for sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org