You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2023/02/08 09:26:27 UTC

[ozone] branch master updated: HDDS-7917. EC: ECBlockInputStream should try spare replicas on error (#4253)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 11fd5b12fd HDDS-7917. EC: ECBlockInputStream should try spare replicas on error (#4253)
11fd5b12fd is described below

commit 11fd5b12fd73b0d47dcf53a91346d842658c56d7
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Wed Feb 8 09:26:20 2023 +0000

    HDDS-7917. EC: ECBlockInputStream should try spare replicas on error (#4253)
---
 .../ozone/client/io/BadDataLocationException.java  | 32 +++++++++---
 .../hadoop/ozone/client/io/ECBlockInputStream.java | 60 ++++++++++++++++++++--
 .../ozone/client/io/ECBlockInputStreamProxy.java   |  9 ++--
 .../ozone/client/io/TestECBlockInputStream.java    | 52 ++++++++++++++++++-
 4 files changed, 138 insertions(+), 15 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
index dc8f7b09c0..65e124eaf5 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.client.io;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Exception used to indicate a problem with a specific block location, allowing
@@ -27,30 +29,46 @@ import java.io.IOException;
  */
 public class BadDataLocationException extends IOException {
 
-  private DatanodeDetails failedLocation;
+  private List<DatanodeDetails> failedLocations = new ArrayList<>();
+  private int failedLocationIndex;
 
   public BadDataLocationException(DatanodeDetails dn) {
     super();
-    failedLocation = dn;
+    failedLocations.add(dn);
   }
 
   public BadDataLocationException(DatanodeDetails dn, String message) {
     super(message);
-    failedLocation = dn;
+    failedLocations.add(dn);
   }
 
   public BadDataLocationException(DatanodeDetails dn, String message,
       Throwable ex) {
     super(message, ex);
-    failedLocation = dn;
+    failedLocations.add(dn);
   }
 
   public BadDataLocationException(DatanodeDetails dn, Throwable ex) {
     super(ex);
-    failedLocation = dn;
+    failedLocations.add(dn);
   }
 
-  public DatanodeDetails getFailedLocation() {
-    return failedLocation;
+  public BadDataLocationException(DatanodeDetails dn, int failedIndex,
+      Throwable ex) {
+    super(ex);
+    failedLocations.add(dn);
+    failedLocationIndex = failedIndex;
+  }
+
+  public List<DatanodeDetails> getFailedLocations() {
+    return failedLocations;
+  }
+
+  public void addFailedLocations(List<DatanodeDetails> dns) {
+    failedLocations.addAll(dns);
+  }
+
+  public int getFailedLocationIndex() {
+    return failedLocationIndex;
   }
 }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index ed57ea5a1e..5f491c4d5a 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -36,8 +36,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 
 /**
@@ -58,6 +64,9 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
   private final BlockLocationInfo blockInfo;
   private final DatanodeDetails[] dataLocations;
   private final BlockExtendedInputStream[] blockStreams;
+  private final Map<Integer, LinkedList<DatanodeDetails>> spareDataLocations
+      = new HashMap<>();
+  private final List<DatanodeDetails> failedLocations = new ArrayList<>();
   private final int maxLocations;
 
   private long position = 0;
@@ -263,7 +272,13 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
       throw new IndexOutOfBoundsException("The index " + index + " is greater "
           + "than the EC Replication Config (" + repConfig + ")");
     }
-    dataLocations[index - 1] = location;
+    int arrayIndex = index - 1;
+    if (dataLocations[arrayIndex] == null) {
+      dataLocations[arrayIndex] = location;
+    } else {
+      spareDataLocations.computeIfAbsent(arrayIndex,
+          k -> new LinkedList<>()).add(location);
+    }
   }
 
   protected long blockLength() {
@@ -274,6 +289,45 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
     return blockLength() - position;
   }
 
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    return read(ByteBuffer.wrap(b, off, len));
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer byteBuffer) throws IOException {
+    while (true) {
+      int currentBufferPosition = byteBuffer.position();
+      long currentPosition = getPos();
+      try {
+        return super.read(byteBuffer);
+      } catch (BadDataLocationException e) {
+        int failedIndex = e.getFailedLocationIndex();
+        if (shouldRetryFailedRead(failedIndex)) {
+          byteBuffer.position(currentBufferPosition);
+          seek(currentPosition);
+        } else {
+          e.addFailedLocations(failedLocations);
+          throw e;
+        }
+      }
+    }
+  }
+
+  private boolean shouldRetryFailedRead(int failedIndex) throws IOException {
+    LinkedList<DatanodeDetails> spare = spareDataLocations.get(failedIndex);
+    if (spare != null && spare.size() > 0) {
+      failedLocations.add(dataLocations[failedIndex]);
+      dataLocations[failedIndex] = spare.removeFirst();
+      if (blockStreams[failedIndex] != null) {
+        blockStreams[failedIndex].close();
+        blockStreams[failedIndex] = null;
+      }
+      return true;
+    }
+    return false;
+  }
+
   /**
    * Read from the internal BlockInputStreams one EC cell at a time into the
    * strategy buffer. This call may read from several internal BlockInputStreams
@@ -294,15 +348,15 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
 
     int totalRead = 0;
     while (strategy.getTargetLength() > 0 && remaining() > 0) {
+      int currentIndex = currentStreamIndex();
       try {
-        int currentIndex = currentStreamIndex();
         BlockExtendedInputStream stream = getOrOpenStream(currentIndex);
         int read = readFromStream(stream, strategy);
         totalRead += read;
         position += read;
       } catch (IOException ioe) {
         throw new BadDataLocationException(
-            dataLocations[currentStreamIndex()], ioe);
+            dataLocations[currentIndex], currentIndex, ioe);
       }
     }
     return totalRead;
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
index c1356230b2..47758eae02 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
@@ -182,7 +182,7 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
         }
 
         failoverToReconstructionRead(
-            ((BadDataLocationException) e).getFailedLocation(), lastPosition);
+            ((BadDataLocationException) e).getFailedLocations(), lastPosition);
         buf.reset();
         totalRead += read(buf);
       } else {
@@ -193,9 +193,10 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
   }
 
   private synchronized void failoverToReconstructionRead(
-      DatanodeDetails badLocation, long lastPosition) throws IOException {
-    if (badLocation != null) {
-      failedLocations.add(badLocation);
+      List<DatanodeDetails> badLocations, long lastPosition)
+      throws IOException {
+    if (badLocations != null) {
+      failedLocations.addAll(badLocations);
     }
     blockReader.close();
     reconstructionReader = true;
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
index e354fc7179..59d3cd387b 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
@@ -40,6 +40,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -393,7 +394,56 @@ public class TestECBlockInputStream {
       BadDataLocationException e =
           assertThrows(BadDataLocationException.class, () -> ecb.read(buf));
       Assertions.assertEquals(2,
-          keyInfo.getPipeline().getReplicaIndex(e.getFailedLocation()));
+          keyInfo.getPipeline().getReplicaIndex(e.getFailedLocations().get(0)));
+    }
+  }
+
+  @Test
+  public void testNoErrorIfSpareLocationToRead() throws IOException {
+    repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+        ONEMB);
+    Map<DatanodeDetails, Integer> datanodes = new LinkedHashMap<>();
+    for (int i = 1; i <= repConfig.getRequiredNodes(); i++) {
+      datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i);
+    }
+    // Add a second index = 1
+    datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+
+    BlockLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 8 * ONEMB, datanodes);
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+        keyInfo, true, null, null, streamFactory)) {
+      // Read a full stripe to ensure all streams are created in the stream
+      // factory
+      ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+      int read = ecb.read(buf);
+      Assertions.assertEquals(3 * ONEMB, read);
+      // Now make replication index 1 error on the next read but as there is a
+      // spare it should read from it with no errors
+      streamFactory.getBlockStreams().get(0).setThrowException(true);
+      buf.clear();
+      read = ecb.read(buf);
+      Assertions.assertEquals(3 * ONEMB, read);
+
+      // Now make the spare one error on the next read, and we should get an
+      // error with two failed locations. As each stream is created, a new
+      // stream will be created in the stream factory. Our read will read from
+      // DNs with EC indexes 1 - 3 first, creating streams 0 to 2. Then when
+      // stream(0) is failed for index=1 a new steam is created for the
+      // alternative index=1 at stream(3). Hence, to make it error we set
+      // stream(3) to throw as below.
+      streamFactory.getBlockStreams().get(3).setThrowException(true);
+      buf.clear();
+      BadDataLocationException e =
+          assertThrows(BadDataLocationException.class, () -> ecb.read(buf));
+      List<DatanodeDetails> failed = e.getFailedLocations();
+      // Expect 2 different DNs reported as failure
+      Assertions.assertEquals(2, failed.size());
+      Assertions.assertNotEquals(failed.get(0), failed.get(1));
+      // Both failures should map to index = 1.
+      for (DatanodeDetails dn : failed) {
+        Assertions.assertEquals(1, datanodes.get(dn));
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org