You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2022/05/25 17:46:17 UTC

[GitHub] [ozone] adoroszlai opened a new pull request, #3457: HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing

adoroszlai opened a new pull request, #3457:
URL: https://github.com/apache/ozone/pull/3457

   ## What changes were proposed in this pull request?
   
   Allow recovering specific replicas, including parity, in `ECBlockReconstructedStripeInputStream`.
   
   https://issues.apache.org/jira/browse/HDDS-6665
   
   ## How was this patch tested?
   
   Added some unit tests.
   
   Regular CI:
   https://github.com/adoroszlai/hadoop-ozone/actions/runs/2385430223


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on pull request #3457: HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on PR #3457:
URL: https://github.com/apache/ozone/pull/3457#issuecomment-1146311319

   @sodonnel do you have further comments? 
   Thanks @adoroszlai for updating the patch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on pull request #3457: HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on PR #3457:
URL: https://github.com/apache/ozone/pull/3457#issuecomment-1149084538

   Thanks @sodonnel, @umamaheswararao for the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on a diff in pull request #3457: HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on code in PR #3457:
URL: https://github.com/apache/ozone/pull/3457#discussion_r887923263


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java:
##########
@@ -154,92 +182,112 @@ public synchronized void addFailedDatanodes(List<DatanodeDetails> dns) {
     }
   }
 
+  public synchronized void setRecoveryIndexes(Collection<Integer> indexes) {
+    if (initialized) {
+      throw new IllegalStateException("Cannot set recovery indexes after the " +
+          "reader has been initialized");
+    }
+    Preconditions.assertNotNull(indexes, "recovery indexes");
+    recoveryIndexes.clear();
+    recoveryIndexes.addAll(indexes);
+  }
+
   private void init() throws InsufficientLocationsException {
     if (decoder == null) {
       decoder = CodecUtil.createRawDecoderWithFallback(getRepConfig());
     }
-    if (decoderInputBuffers == null) {
-      // The EC decoder needs an array data+parity long, with missing or not
-      // needed indexes set to null.
-      decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
-    }
     if (!hasSufficientLocations()) {
       throw new InsufficientLocationsException("There are insufficient " +
           "datanodes to read the EC block");
     }
-    dataIndexes.clear();
-    ECReplicationConfig repConfig = getRepConfig();
-    DatanodeDetails[] locations = getDataLocations();
-    setMissingIndexesAndDataLocations(locations);
-    List<Integer> parityIndexes =
-        selectParityIndexes(locations, missingIndexes.length);
-    // We read from the selected parity blocks, so add them to the data indexes.
-    dataIndexes.addAll(parityIndexes);
+    allocateInternalBuffers();
+    if (!isOfflineRecovery()) {
+      decoderOutputBuffers = new ByteBuffer[missingIndexes.size()];
+    }
+    initialized = true;
+  }
+
+  private void allocateInternalBuffers() {
     // The decoder inputs originally start as all nulls. Then we populate the
-    // pieces we have data for. The parity buffers are reused for the block
-    // so we can allocated them now. On re-init, we reuse any parity buffers
+    // pieces we have data for. The internal buffers are reused for the block,
+    // so we can allocate them now. On re-init, we reuse any internal buffers
     // already allocated.
-    for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
-      if (parityIndexes.contains(i)) {
-        if (decoderInputBuffers[i] == null) {
-          decoderInputBuffers[i] = allocateBuffer(repConfig);
-        }
-      } else {
-        decoderInputBuffers[i] = null;
+    final int minIndex = isOfflineRecovery() ? 0 : getRepConfig().getData();
+    for (int i = minIndex; i < getRepConfig().getRequiredNodes(); i++) {
+      boolean internalInput = selectedIndexes.contains(i)
+          || paddingIndexes.contains(i);
+      boolean hasBuffer = decoderInputBuffers[i] != null;
+
+      if (internalInput && !hasBuffer) {
+        allocateInternalBuffer(i);
+      } else if (!internalInput && hasBuffer) {
+        releaseInternalBuffer(i);
       }
     }
-    decoderOutputBuffers = new ByteBuffer[missingIndexes.length];
-    initialized = true;
   }
 
-  /**
-   * Determine which indexes are missing, taking into account the length of the
-   * block. For a block shorter than a full EC stripe, it is expected that
-   * some of the data locations will not be present.
-   * Populates the missingIndex and dataIndexes instance variables.
-   * @param locations Available locations for the block group
-   */
-  private void setMissingIndexesAndDataLocations(DatanodeDetails[] locations) {
-    ECReplicationConfig repConfig = getRepConfig();
-    int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
-    List<Integer> missingInd = new ArrayList<>();
-    for (int i = 0; i < repConfig.getData(); i++) {
-      if ((locations[i] == null || failedDataIndexes.contains(i))
-          && i < expectedDataBlocks) {
-        missingInd.add(i);
-      } else if (locations[i] != null && !failedDataIndexes.contains(i)) {
-        dataIndexes.add(i);
+  private void allocateInternalBuffer(int index) {
+    Preconditions.assertTrue(internalBuffers.add(index),
+        () -> "Buffer " + index + " already tracked as internal input");
+    decoderInputBuffers[index] =
+        byteBufferPool.getBuffer(false, getRepConfig().getEcChunkSize());
+  }
+
+  private void releaseInternalBuffer(int index) {
+    Preconditions.assertTrue(internalBuffers.remove(index),
+        () -> "Buffer " + index + " not tracked as internal input");
+    byteBufferPool.putBuffer(decoderInputBuffers[index]);
+    decoderInputBuffers[index] = null;
+  }
+
+  private void markMissingLocationsAsFailed() {
+    DatanodeDetails[] locations = getDataLocations();
+    for (int i = 0; i < locations.length; i++) {
+      if (locations[i] == null && failedDataIndexes.add(i)) {
+        LOG.debug("Marked index={} as failed", i);
       }
     }
-    missingIndexes = missingInd.stream().mapToInt(Integer::valueOf).toArray();
+  }
+
+  private boolean isOfflineRecovery() {
+    return !recoveryIndexes.isEmpty();
   }
 
   private void assignBuffers(ByteBuffer[] bufs) {
-    ECReplicationConfig repConfig = getRepConfig();
-    Preconditions.assertTrue(bufs.length == repConfig.getData());
-    int recoveryIndex = 0;
-    // Here bufs come from the caller and will be filled with data read from
-    // the blocks or recovered. Therefore, if the index is missing, we assign
-    // the buffer to the decoder outputs, where data is recovered via EC
-    // decoding. Otherwise the buffer is set to the input. Note, it may be a
-    // buffer which needs padded.
-    for (int i = 0; i < repConfig.getData(); i++) {
-      if (isMissingIndex(i)) {
-        decoderOutputBuffers[recoveryIndex++] = bufs[i];
-        decoderInputBuffers[i] = null;
-      } else {
-        decoderInputBuffers[i] = bufs[i];
+    Preconditions.assertTrue(bufs.length == getExpectedBufferCount());
+
+    if (isOfflineRecovery()) {
+      decoderOutputBuffers = bufs;
+    } else {
+      int recoveryIndex = 0;
+      // Here bufs come from the caller and will be filled with data read from
+      // the blocks or recovered. Therefore, if the index is missing, we assign
+      // the buffer to the decoder outputs, where data is recovered via EC
+      // decoding. Otherwise the buffer is set to the input. Note, it may be a
+      // buffer which needs padded.
+      for (int i = 0; i < bufs.length; i++) {
+        if (isMissingIndex(i)) {
+          decoderOutputBuffers[recoveryIndex++] = bufs[i];
+          if (internalBuffers.contains(i)) {
+            releaseInternalBuffer(i);
+          } else {
+            decoderInputBuffers[i] = null;
+          }
+        } else {
+          decoderInputBuffers[i] = bufs[i];
+        }
       }
     }
   }
 
+  private int getExpectedBufferCount() {
+    return isOfflineRecovery()
+        ? recoveryIndexes.size()
+        : getRepConfig().getData();
+  }
+
   private boolean isMissingIndex(int ind) {
-    for (int i : missingIndexes) {
-      if (i == ind) {
-        return true;
-      }
-    }
-    return false;
+    return missingIndexes.contains(ind);
   }
 
   /**

Review Comment:
   > it may be a good idea to have overloaded API and define it's own expectations?
   
   Yep, we can have a separate method with different javadoc and different precondition (recovery indexes should or should not be set).  They would call the same underlying implementation.  In parameterized test we can either call the separate methods or the common helper -- which one would you prefer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on a diff in pull request #3457: HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on code in PR #3457:
URL: https://github.com/apache/ozone/pull/3457#discussion_r887923263


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java:
##########
@@ -154,92 +182,112 @@ public synchronized void addFailedDatanodes(List<DatanodeDetails> dns) {
     }
   }
 
+  public synchronized void setRecoveryIndexes(Collection<Integer> indexes) {
+    if (initialized) {
+      throw new IllegalStateException("Cannot set recovery indexes after the " +
+          "reader has been initialized");
+    }
+    Preconditions.assertNotNull(indexes, "recovery indexes");
+    recoveryIndexes.clear();
+    recoveryIndexes.addAll(indexes);
+  }
+
   private void init() throws InsufficientLocationsException {
     if (decoder == null) {
       decoder = CodecUtil.createRawDecoderWithFallback(getRepConfig());
     }
-    if (decoderInputBuffers == null) {
-      // The EC decoder needs an array data+parity long, with missing or not
-      // needed indexes set to null.
-      decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
-    }
     if (!hasSufficientLocations()) {
       throw new InsufficientLocationsException("There are insufficient " +
           "datanodes to read the EC block");
     }
-    dataIndexes.clear();
-    ECReplicationConfig repConfig = getRepConfig();
-    DatanodeDetails[] locations = getDataLocations();
-    setMissingIndexesAndDataLocations(locations);
-    List<Integer> parityIndexes =
-        selectParityIndexes(locations, missingIndexes.length);
-    // We read from the selected parity blocks, so add them to the data indexes.
-    dataIndexes.addAll(parityIndexes);
+    allocateInternalBuffers();
+    if (!isOfflineRecovery()) {
+      decoderOutputBuffers = new ByteBuffer[missingIndexes.size()];
+    }
+    initialized = true;
+  }
+
+  private void allocateInternalBuffers() {
     // The decoder inputs originally start as all nulls. Then we populate the
-    // pieces we have data for. The parity buffers are reused for the block
-    // so we can allocated them now. On re-init, we reuse any parity buffers
+    // pieces we have data for. The internal buffers are reused for the block,
+    // so we can allocate them now. On re-init, we reuse any internal buffers
     // already allocated.
-    for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
-      if (parityIndexes.contains(i)) {
-        if (decoderInputBuffers[i] == null) {
-          decoderInputBuffers[i] = allocateBuffer(repConfig);
-        }
-      } else {
-        decoderInputBuffers[i] = null;
+    final int minIndex = isOfflineRecovery() ? 0 : getRepConfig().getData();
+    for (int i = minIndex; i < getRepConfig().getRequiredNodes(); i++) {
+      boolean internalInput = selectedIndexes.contains(i)
+          || paddingIndexes.contains(i);
+      boolean hasBuffer = decoderInputBuffers[i] != null;
+
+      if (internalInput && !hasBuffer) {
+        allocateInternalBuffer(i);
+      } else if (!internalInput && hasBuffer) {
+        releaseInternalBuffer(i);
       }
     }
-    decoderOutputBuffers = new ByteBuffer[missingIndexes.length];
-    initialized = true;
   }
 
-  /**
-   * Determine which indexes are missing, taking into account the length of the
-   * block. For a block shorter than a full EC stripe, it is expected that
-   * some of the data locations will not be present.
-   * Populates the missingIndex and dataIndexes instance variables.
-   * @param locations Available locations for the block group
-   */
-  private void setMissingIndexesAndDataLocations(DatanodeDetails[] locations) {
-    ECReplicationConfig repConfig = getRepConfig();
-    int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
-    List<Integer> missingInd = new ArrayList<>();
-    for (int i = 0; i < repConfig.getData(); i++) {
-      if ((locations[i] == null || failedDataIndexes.contains(i))
-          && i < expectedDataBlocks) {
-        missingInd.add(i);
-      } else if (locations[i] != null && !failedDataIndexes.contains(i)) {
-        dataIndexes.add(i);
+  private void allocateInternalBuffer(int index) {
+    Preconditions.assertTrue(internalBuffers.add(index),
+        () -> "Buffer " + index + " already tracked as internal input");
+    decoderInputBuffers[index] =
+        byteBufferPool.getBuffer(false, getRepConfig().getEcChunkSize());
+  }
+
+  private void releaseInternalBuffer(int index) {
+    Preconditions.assertTrue(internalBuffers.remove(index),
+        () -> "Buffer " + index + " not tracked as internal input");
+    byteBufferPool.putBuffer(decoderInputBuffers[index]);
+    decoderInputBuffers[index] = null;
+  }
+
+  private void markMissingLocationsAsFailed() {
+    DatanodeDetails[] locations = getDataLocations();
+    for (int i = 0; i < locations.length; i++) {
+      if (locations[i] == null && failedDataIndexes.add(i)) {
+        LOG.debug("Marked index={} as failed", i);
       }
     }
-    missingIndexes = missingInd.stream().mapToInt(Integer::valueOf).toArray();
+  }
+
+  private boolean isOfflineRecovery() {
+    return !recoveryIndexes.isEmpty();
   }
 
   private void assignBuffers(ByteBuffer[] bufs) {
-    ECReplicationConfig repConfig = getRepConfig();
-    Preconditions.assertTrue(bufs.length == repConfig.getData());
-    int recoveryIndex = 0;
-    // Here bufs come from the caller and will be filled with data read from
-    // the blocks or recovered. Therefore, if the index is missing, we assign
-    // the buffer to the decoder outputs, where data is recovered via EC
-    // decoding. Otherwise the buffer is set to the input. Note, it may be a
-    // buffer which needs padded.
-    for (int i = 0; i < repConfig.getData(); i++) {
-      if (isMissingIndex(i)) {
-        decoderOutputBuffers[recoveryIndex++] = bufs[i];
-        decoderInputBuffers[i] = null;
-      } else {
-        decoderInputBuffers[i] = bufs[i];
+    Preconditions.assertTrue(bufs.length == getExpectedBufferCount());
+
+    if (isOfflineRecovery()) {
+      decoderOutputBuffers = bufs;
+    } else {
+      int recoveryIndex = 0;
+      // Here bufs come from the caller and will be filled with data read from
+      // the blocks or recovered. Therefore, if the index is missing, we assign
+      // the buffer to the decoder outputs, where data is recovered via EC
+      // decoding. Otherwise the buffer is set to the input. Note, it may be a
+      // buffer which needs padded.
+      for (int i = 0; i < bufs.length; i++) {
+        if (isMissingIndex(i)) {
+          decoderOutputBuffers[recoveryIndex++] = bufs[i];
+          if (internalBuffers.contains(i)) {
+            releaseInternalBuffer(i);
+          } else {
+            decoderInputBuffers[i] = null;
+          }
+        } else {
+          decoderInputBuffers[i] = bufs[i];
+        }
       }
     }
   }
 
+  private int getExpectedBufferCount() {
+    return isOfflineRecovery()
+        ? recoveryIndexes.size()
+        : getRepConfig().getData();
+  }
+
   private boolean isMissingIndex(int ind) {
-    for (int i : missingIndexes) {
-      if (i == ind) {
-        return true;
-      }
-    }
-    return false;
+    return missingIndexes.contains(ind);
   }
 
   /**

Review Comment:
   > it may be a good idea to have overloaded API and define it's own expectations?
   
   Yep, we can have a separate method with different javadoc and different precondition (recovery indexes should or should not be set).  They would call the same underlying implementation.  In parameterized test we can either call the separate methods or the common helper -- which one would you prefer?
   
   Do you think `recoverPart` is a good name for the separate method, or do you have some other suggestion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on pull request #3457: HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on PR #3457:
URL: https://github.com/apache/ozone/pull/3457#issuecomment-1143492132

   Thanks @sodonnel and @umamaheswararao for the comments.  I'll update the PR with javadoc, but wanted some feedback before spending time on that. ;)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a diff in pull request #3457: HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing

Posted by GitBox <gi...@apache.org>.
sodonnel commented on code in PR #3457:
URL: https://github.com/apache/ozone/pull/3457#discussion_r886673789


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java:
##########
@@ -411,41 +460,38 @@ private void zeroFill(ByteBuffer buf) {
    * Take the parity indexes which are already used, and the others which are
    * available, and select random indexes to meet numRequired. The resulting
    * list is sorted in ascending order of the indexes.
-   * @param locations The list of locations for all blocks in the block group/
-   * @param numRequired The number of parity chunks needed for reconstruction
-   * @return A list of indexes indicating which parity locations to read.
    */
   @SuppressWarnings("java:S2245") // no need for secure random
-  private List<Integer> selectParityIndexes(
-      DatanodeDetails[] locations, int numRequired) {
-    List<Integer> indexes = new ArrayList<>();
-    List<Integer> selected = new ArrayList<>();
-    ECReplicationConfig repConfig = getRepConfig();
-    for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
-      if (locations[i] != null && !failedDataIndexes.contains(i)
-          && decoderInputBuffers[i] == null) {
-        indexes.add(i);
-      }
-      // If we are re-initializing, we want to make sure we are re-using any
-      // previously selected good parity indexes, as the block stream is already
-      // opened.
-      if (decoderInputBuffers[i] != null && !failedDataIndexes.contains(i)) {
+  private SortedSet<Integer> selectInternalInputs(

Review Comment:
   I wonder if it would work, by just passing fewer locations to the class? Eg, if we need to recover one index, and there 4 available where we need to read 3. If we just pass the 3 locations to the class, then it should just use them as preferred nodes as they are the only nodes available. I guess its not quite as good as it cannot fallback if there is a problem reading one of the others.
   
   But I agree that this change is complex enough already. Lets leave preferred nodes for now and add it later if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai commented on a diff in pull request #3457: HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing

Posted by GitBox <gi...@apache.org>.
adoroszlai commented on code in PR #3457:
URL: https://github.com/apache/ozone/pull/3457#discussion_r886699110


##########
hadoop-hdds/erasurecode/src/main/java/org/apache/ozone/erasurecode/rawcoder/ByteBufferDecodingState.java:
##########
@@ -95,26 +95,29 @@ ByteArrayDecodingState convertToByteArrayState() {
   void checkInputBuffers(ByteBuffer[] buffers) {
     int validInputs = 0;
 
-    for (ByteBuffer buffer : buffers) {

Review Comment:
   `validInputs` is not incremented if buffer is `null`.  So to point to a specific element of the input `buffers`, we need another counter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a diff in pull request #3457: HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on code in PR #3457:
URL: https://github.com/apache/ozone/pull/3457#discussion_r887175048


##########
hadoop-hdds/erasurecode/src/main/java/org/apache/ozone/erasurecode/rawcoder/ByteBufferDecodingState.java:
##########
@@ -95,26 +95,29 @@ ByteArrayDecodingState convertToByteArrayState() {
   void checkInputBuffers(ByteBuffer[] buffers) {
     int validInputs = 0;
 
-    for (ByteBuffer buffer : buffers) {

Review Comment:
   got it. sure. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a diff in pull request #3457: HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on code in PR #3457:
URL: https://github.com/apache/ozone/pull/3457#discussion_r886183437


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java:
##########
@@ -99,20 +107,33 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
   private ByteBuffer[] decoderInputBuffers;
   // Missing chunks are recovered into these buffers.
   private ByteBuffer[] decoderOutputBuffers;
+
   // Missing indexes to be recovered into the recovered buffers. Required by the
   // EC decoder
-  private int[] missingIndexes;
-  // The blockLocation indexes to use to read data into the dataBuffers.
-  private List<Integer> dataIndexes = new ArrayList<>();
+  private final SortedSet<Integer> missingIndexes = new TreeSet<>();
+
+  // indexes for data, padding and parity blocks
+  private final SortedSet<Integer> dataIndexes;
+  private final SortedSet<Integer> paddingIndexes;
+  private final SortedSet<Integer> parityIndexes;
+  private final SortedSet<Integer> allIndexes;
+
+  // The blockLocation indexes to use to read data into the output buffers
+  private final SortedSet<Integer> selectedIndexes = new TreeSet<>();
+  // indexes to which internal buffers are allocated

Review Comment:
   You mean internal buffer indexes?



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java:
##########
@@ -154,92 +182,112 @@ public synchronized void addFailedDatanodes(List<DatanodeDetails> dns) {
     }
   }
 
+  public synchronized void setRecoveryIndexes(Collection<Integer> indexes) {
+    if (initialized) {
+      throw new IllegalStateException("Cannot set recovery indexes after the " +
+          "reader has been initialized");
+    }
+    Preconditions.assertNotNull(indexes, "recovery indexes");
+    recoveryIndexes.clear();
+    recoveryIndexes.addAll(indexes);
+  }
+
   private void init() throws InsufficientLocationsException {
     if (decoder == null) {
       decoder = CodecUtil.createRawDecoderWithFallback(getRepConfig());
     }
-    if (decoderInputBuffers == null) {
-      // The EC decoder needs an array data+parity long, with missing or not
-      // needed indexes set to null.
-      decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
-    }
     if (!hasSufficientLocations()) {
       throw new InsufficientLocationsException("There are insufficient " +
           "datanodes to read the EC block");
     }
-    dataIndexes.clear();
-    ECReplicationConfig repConfig = getRepConfig();
-    DatanodeDetails[] locations = getDataLocations();
-    setMissingIndexesAndDataLocations(locations);
-    List<Integer> parityIndexes =
-        selectParityIndexes(locations, missingIndexes.length);
-    // We read from the selected parity blocks, so add them to the data indexes.
-    dataIndexes.addAll(parityIndexes);
+    allocateInternalBuffers();
+    if (!isOfflineRecovery()) {
+      decoderOutputBuffers = new ByteBuffer[missingIndexes.size()];
+    }
+    initialized = true;
+  }
+
+  private void allocateInternalBuffers() {
     // The decoder inputs originally start as all nulls. Then we populate the
-    // pieces we have data for. The parity buffers are reused for the block
-    // so we can allocated them now. On re-init, we reuse any parity buffers
+    // pieces we have data for. The internal buffers are reused for the block,
+    // so we can allocate them now. On re-init, we reuse any internal buffers
     // already allocated.
-    for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
-      if (parityIndexes.contains(i)) {
-        if (decoderInputBuffers[i] == null) {
-          decoderInputBuffers[i] = allocateBuffer(repConfig);
-        }
-      } else {
-        decoderInputBuffers[i] = null;
+    final int minIndex = isOfflineRecovery() ? 0 : getRepConfig().getData();
+    for (int i = minIndex; i < getRepConfig().getRequiredNodes(); i++) {
+      boolean internalInput = selectedIndexes.contains(i)
+          || paddingIndexes.contains(i);
+      boolean hasBuffer = decoderInputBuffers[i] != null;
+
+      if (internalInput && !hasBuffer) {
+        allocateInternalBuffer(i);
+      } else if (!internalInput && hasBuffer) {
+        releaseInternalBuffer(i);
       }
     }
-    decoderOutputBuffers = new ByteBuffer[missingIndexes.length];
-    initialized = true;
   }
 
-  /**
-   * Determine which indexes are missing, taking into account the length of the
-   * block. For a block shorter than a full EC stripe, it is expected that
-   * some of the data locations will not be present.
-   * Populates the missingIndex and dataIndexes instance variables.
-   * @param locations Available locations for the block group
-   */
-  private void setMissingIndexesAndDataLocations(DatanodeDetails[] locations) {
-    ECReplicationConfig repConfig = getRepConfig();
-    int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
-    List<Integer> missingInd = new ArrayList<>();
-    for (int i = 0; i < repConfig.getData(); i++) {
-      if ((locations[i] == null || failedDataIndexes.contains(i))
-          && i < expectedDataBlocks) {
-        missingInd.add(i);
-      } else if (locations[i] != null && !failedDataIndexes.contains(i)) {
-        dataIndexes.add(i);
+  private void allocateInternalBuffer(int index) {
+    Preconditions.assertTrue(internalBuffers.add(index),
+        () -> "Buffer " + index + " already tracked as internal input");
+    decoderInputBuffers[index] =
+        byteBufferPool.getBuffer(false, getRepConfig().getEcChunkSize());
+  }
+
+  private void releaseInternalBuffer(int index) {
+    Preconditions.assertTrue(internalBuffers.remove(index),
+        () -> "Buffer " + index + " not tracked as internal input");
+    byteBufferPool.putBuffer(decoderInputBuffers[index]);
+    decoderInputBuffers[index] = null;
+  }
+
+  private void markMissingLocationsAsFailed() {
+    DatanodeDetails[] locations = getDataLocations();
+    for (int i = 0; i < locations.length; i++) {
+      if (locations[i] == null && failedDataIndexes.add(i)) {
+        LOG.debug("Marked index={} as failed", i);
       }
     }
-    missingIndexes = missingInd.stream().mapToInt(Integer::valueOf).toArray();
+  }
+
+  private boolean isOfflineRecovery() {
+    return !recoveryIndexes.isEmpty();
   }
 
   private void assignBuffers(ByteBuffer[] bufs) {
-    ECReplicationConfig repConfig = getRepConfig();
-    Preconditions.assertTrue(bufs.length == repConfig.getData());
-    int recoveryIndex = 0;
-    // Here bufs come from the caller and will be filled with data read from
-    // the blocks or recovered. Therefore, if the index is missing, we assign
-    // the buffer to the decoder outputs, where data is recovered via EC
-    // decoding. Otherwise the buffer is set to the input. Note, it may be a
-    // buffer which needs padded.
-    for (int i = 0; i < repConfig.getData(); i++) {
-      if (isMissingIndex(i)) {
-        decoderOutputBuffers[recoveryIndex++] = bufs[i];
-        decoderInputBuffers[i] = null;
-      } else {
-        decoderInputBuffers[i] = bufs[i];
+    Preconditions.assertTrue(bufs.length == getExpectedBufferCount());
+
+    if (isOfflineRecovery()) {
+      decoderOutputBuffers = bufs;
+    } else {
+      int recoveryIndex = 0;
+      // Here bufs come from the caller and will be filled with data read from
+      // the blocks or recovered. Therefore, if the index is missing, we assign
+      // the buffer to the decoder outputs, where data is recovered via EC
+      // decoding. Otherwise the buffer is set to the input. Note, it may be a
+      // buffer which needs padded.
+      for (int i = 0; i < bufs.length; i++) {
+        if (isMissingIndex(i)) {
+          decoderOutputBuffers[recoveryIndex++] = bufs[i];
+          if (internalBuffers.contains(i)) {
+            releaseInternalBuffer(i);
+          } else {
+            decoderInputBuffers[i] = null;
+          }
+        } else {
+          decoderInputBuffers[i] = bufs[i];
+        }
       }
     }
   }
 
+  private int getExpectedBufferCount() {
+    return isOfflineRecovery()
+        ? recoveryIndexes.size()
+        : getRepConfig().getData();
+  }
+
   private boolean isMissingIndex(int ind) {
-    for (int i : missingIndexes) {
-      if (i == ind) {
-        return true;
-      }
-    }
-    return false;
+    return missingIndexes.contains(ind);
   }
 
   /**

Review Comment:
   Since we have different expectation of input params, it may be a good idea to have overloaded API and define it's own expectations?
   
   It would be great to have an example case in the java doc to better explain how to use this API?
   example: 
   [ 0 1  2.  3. 4] are EC indexes, in that 2 and 4 are missing and need to recover.
   setRecoveryIndexes are 2, 4
   Then we will pass buff (= new ByteBuffer[2]). Once recovered, they are relatively missing 2 is recovered in buf[0]. and missing 4 is recovered in buf[1] ?
   [ correct if my understanding not correct here :-) ]
   



##########
hadoop-hdds/erasurecode/src/main/java/org/apache/ozone/erasurecode/rawcoder/ByteBufferDecodingState.java:
##########
@@ -95,26 +95,29 @@ ByteArrayDecodingState convertToByteArrayState() {
   void checkInputBuffers(ByteBuffer[] buffers) {
     int validInputs = 0;
 
-    for (ByteBuffer buffer : buffers) {

Review Comment:
   Nit: there is validInputs counter going on here. you may want to reuse that counter with some valid naming instead of having another counter i ? 



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java:
##########
@@ -411,41 +460,38 @@ private void zeroFill(ByteBuffer buf) {
    * Take the parity indexes which are already used, and the others which are
    * available, and select random indexes to meet numRequired. The resulting
    * list is sorted in ascending order of the indexes.
-   * @param locations The list of locations for all blocks in the block group/
-   * @param numRequired The number of parity chunks needed for reconstruction
-   * @return A list of indexes indicating which parity locations to read.
    */
   @SuppressWarnings("java:S2245") // no need for secure random
-  private List<Integer> selectParityIndexes(
-      DatanodeDetails[] locations, int numRequired) {
-    List<Integer> indexes = new ArrayList<>();
-    List<Integer> selected = new ArrayList<>();
-    ECReplicationConfig repConfig = getRepConfig();
-    for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
-      if (locations[i] != null && !failedDataIndexes.contains(i)
-          && decoderInputBuffers[i] == null) {
-        indexes.add(i);
-      }
-      // If we are re-initializing, we want to make sure we are re-using any
-      // previously selected good parity indexes, as the block stream is already
-      // opened.
-      if (decoderInputBuffers[i] != null && !failedDataIndexes.contains(i)) {
+  private SortedSet<Integer> selectInternalInputs(

Review Comment:
   I am wondering if there is a way to leverage or accept favored input nodes here?
   In recovery case, SCM may sends the command to coordinator with available locations included. That available might be picked based on DN side loads factors etc. So, it would be good to consider the passed favored nodes as preferred input locations as first attempt?
   
   But this can be taken care in follow up JIRAs also, need not be in the first cut.
   



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java:
##########
@@ -154,92 +182,112 @@ public synchronized void addFailedDatanodes(List<DatanodeDetails> dns) {
     }
   }
 
+  public synchronized void setRecoveryIndexes(Collection<Integer> indexes) {
+    if (initialized) {
+      throw new IllegalStateException("Cannot set recovery indexes after the " +
+          "reader has been initialized");
+    }
+    Preconditions.assertNotNull(indexes, "recovery indexes");
+    recoveryIndexes.clear();
+    recoveryIndexes.addAll(indexes);
+  }
+
   private void init() throws InsufficientLocationsException {
     if (decoder == null) {
       decoder = CodecUtil.createRawDecoderWithFallback(getRepConfig());
     }
-    if (decoderInputBuffers == null) {
-      // The EC decoder needs an array data+parity long, with missing or not
-      // needed indexes set to null.
-      decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
-    }
     if (!hasSufficientLocations()) {
       throw new InsufficientLocationsException("There are insufficient " +
           "datanodes to read the EC block");
     }
-    dataIndexes.clear();
-    ECReplicationConfig repConfig = getRepConfig();
-    DatanodeDetails[] locations = getDataLocations();
-    setMissingIndexesAndDataLocations(locations);
-    List<Integer> parityIndexes =
-        selectParityIndexes(locations, missingIndexes.length);
-    // We read from the selected parity blocks, so add them to the data indexes.
-    dataIndexes.addAll(parityIndexes);
+    allocateInternalBuffers();
+    if (!isOfflineRecovery()) {
+      decoderOutputBuffers = new ByteBuffer[missingIndexes.size()];
+    }
+    initialized = true;
+  }
+
+  private void allocateInternalBuffers() {
     // The decoder inputs originally start as all nulls. Then we populate the
-    // pieces we have data for. The parity buffers are reused for the block
-    // so we can allocated them now. On re-init, we reuse any parity buffers
+    // pieces we have data for. The internal buffers are reused for the block,
+    // so we can allocate them now. On re-init, we reuse any internal buffers
     // already allocated.
-    for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
-      if (parityIndexes.contains(i)) {
-        if (decoderInputBuffers[i] == null) {
-          decoderInputBuffers[i] = allocateBuffer(repConfig);
-        }
-      } else {
-        decoderInputBuffers[i] = null;
+    final int minIndex = isOfflineRecovery() ? 0 : getRepConfig().getData();
+    for (int i = minIndex; i < getRepConfig().getRequiredNodes(); i++) {
+      boolean internalInput = selectedIndexes.contains(i)
+          || paddingIndexes.contains(i);
+      boolean hasBuffer = decoderInputBuffers[i] != null;
+
+      if (internalInput && !hasBuffer) {
+        allocateInternalBuffer(i);
+      } else if (!internalInput && hasBuffer) {
+        releaseInternalBuffer(i);
       }
     }
-    decoderOutputBuffers = new ByteBuffer[missingIndexes.length];
-    initialized = true;
   }
 
-  /**
-   * Determine which indexes are missing, taking into account the length of the
-   * block. For a block shorter than a full EC stripe, it is expected that
-   * some of the data locations will not be present.
-   * Populates the missingIndex and dataIndexes instance variables.
-   * @param locations Available locations for the block group
-   */
-  private void setMissingIndexesAndDataLocations(DatanodeDetails[] locations) {
-    ECReplicationConfig repConfig = getRepConfig();
-    int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
-    List<Integer> missingInd = new ArrayList<>();
-    for (int i = 0; i < repConfig.getData(); i++) {
-      if ((locations[i] == null || failedDataIndexes.contains(i))
-          && i < expectedDataBlocks) {
-        missingInd.add(i);
-      } else if (locations[i] != null && !failedDataIndexes.contains(i)) {
-        dataIndexes.add(i);
+  private void allocateInternalBuffer(int index) {
+    Preconditions.assertTrue(internalBuffers.add(index),
+        () -> "Buffer " + index + " already tracked as internal input");
+    decoderInputBuffers[index] =
+        byteBufferPool.getBuffer(false, getRepConfig().getEcChunkSize());
+  }
+
+  private void releaseInternalBuffer(int index) {
+    Preconditions.assertTrue(internalBuffers.remove(index),
+        () -> "Buffer " + index + " not tracked as internal input");
+    byteBufferPool.putBuffer(decoderInputBuffers[index]);
+    decoderInputBuffers[index] = null;
+  }
+
+  private void markMissingLocationsAsFailed() {
+    DatanodeDetails[] locations = getDataLocations();
+    for (int i = 0; i < locations.length; i++) {
+      if (locations[i] == null && failedDataIndexes.add(i)) {
+        LOG.debug("Marked index={} as failed", i);
       }
     }
-    missingIndexes = missingInd.stream().mapToInt(Integer::valueOf).toArray();
+  }
+
+  private boolean isOfflineRecovery() {
+    return !recoveryIndexes.isEmpty();
   }
 
   private void assignBuffers(ByteBuffer[] bufs) {
-    ECReplicationConfig repConfig = getRepConfig();
-    Preconditions.assertTrue(bufs.length == repConfig.getData());
-    int recoveryIndex = 0;
-    // Here bufs come from the caller and will be filled with data read from
-    // the blocks or recovered. Therefore, if the index is missing, we assign
-    // the buffer to the decoder outputs, where data is recovered via EC
-    // decoding. Otherwise the buffer is set to the input. Note, it may be a
-    // buffer which needs padded.
-    for (int i = 0; i < repConfig.getData(); i++) {
-      if (isMissingIndex(i)) {
-        decoderOutputBuffers[recoveryIndex++] = bufs[i];
-        decoderInputBuffers[i] = null;
-      } else {
-        decoderInputBuffers[i] = bufs[i];
+    Preconditions.assertTrue(bufs.length == getExpectedBufferCount());
+
+    if (isOfflineRecovery()) {
+      decoderOutputBuffers = bufs;
+    } else {
+      int recoveryIndex = 0;
+      // Here bufs come from the caller and will be filled with data read from
+      // the blocks or recovered. Therefore, if the index is missing, we assign
+      // the buffer to the decoder outputs, where data is recovered via EC
+      // decoding. Otherwise the buffer is set to the input. Note, it may be a
+      // buffer which needs padded.
+      for (int i = 0; i < bufs.length; i++) {
+        if (isMissingIndex(i)) {
+          decoderOutputBuffers[recoveryIndex++] = bufs[i];
+          if (internalBuffers.contains(i)) {
+            releaseInternalBuffer(i);
+          } else {
+            decoderInputBuffers[i] = null;
+          }
+        } else {
+          decoderInputBuffers[i] = bufs[i];
+        }
       }
     }
   }
 
+  private int getExpectedBufferCount() {
+    return isOfflineRecovery()
+        ? recoveryIndexes.size()
+        : getRepConfig().getData();
+  }
+
   private boolean isMissingIndex(int ind) {
-    for (int i : missingIndexes) {
-      if (i == ind) {
-        return true;
-      }
-    }
-    return false;
+    return missingIndexes.contains(ind);
   }
 
   /**

Review Comment:
   Can we define the expected buffs to pass in recovery case? 
   Currently the doc says, we need to pass "This method should be passed a list of byteBuffers which must contain EC Data Number entries"
   But as of now, my understanding is that, we just need to pass torecover buffs only right?
   
   I am assuming from the following code in assign buffers
   
   ```
   if (isOfflineRecovery()) {
         decoderOutputBuffers = bufs;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] adoroszlai merged pull request #3457: HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing

Posted by GitBox <gi...@apache.org>.
adoroszlai merged PR #3457:
URL: https://github.com/apache/ozone/pull/3457


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org