You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2023/02/08 09:26:27 UTC
[ozone] branch master updated: HDDS-7917. EC: ECBlockInputStream should try spare replicas on error (#4253)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 11fd5b12fd HDDS-7917. EC: ECBlockInputStream should try spare replicas on error (#4253)
11fd5b12fd is described below
commit 11fd5b12fd73b0d47dcf53a91346d842658c56d7
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Wed Feb 8 09:26:20 2023 +0000
HDDS-7917. EC: ECBlockInputStream should try spare replicas on error (#4253)
---
.../ozone/client/io/BadDataLocationException.java | 32 +++++++++---
.../hadoop/ozone/client/io/ECBlockInputStream.java | 60 ++++++++++++++++++++--
.../ozone/client/io/ECBlockInputStreamProxy.java | 9 ++--
.../ozone/client/io/TestECBlockInputStream.java | 52 ++++++++++++++++++-
4 files changed, 138 insertions(+), 15 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
index dc8f7b09c0..65e124eaf5 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.client.io;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
/**
* Exception used to indicate a problem with a specific block location, allowing
@@ -27,30 +29,46 @@ import java.io.IOException;
*/
public class BadDataLocationException extends IOException {
- private DatanodeDetails failedLocation;
+ private List<DatanodeDetails> failedLocations = new ArrayList<>();
+ private int failedLocationIndex;
public BadDataLocationException(DatanodeDetails dn) {
super();
- failedLocation = dn;
+ failedLocations.add(dn);
}
public BadDataLocationException(DatanodeDetails dn, String message) {
super(message);
- failedLocation = dn;
+ failedLocations.add(dn);
}
public BadDataLocationException(DatanodeDetails dn, String message,
Throwable ex) {
super(message, ex);
- failedLocation = dn;
+ failedLocations.add(dn);
}
public BadDataLocationException(DatanodeDetails dn, Throwable ex) {
super(ex);
- failedLocation = dn;
+ failedLocations.add(dn);
}
- public DatanodeDetails getFailedLocation() {
- return failedLocation;
+ public BadDataLocationException(DatanodeDetails dn, int failedIndex,
+ Throwable ex) {
+ super(ex);
+ failedLocations.add(dn);
+ failedLocationIndex = failedIndex;
+ }
+
+ public List<DatanodeDetails> getFailedLocations() {
+ return failedLocations;
+ }
+
+ public void addFailedLocations(List<DatanodeDetails> dns) {
+ failedLocations.addAll(dns);
+ }
+
+ public int getFailedLocationIndex() {
+ return failedLocationIndex;
}
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index ed57ea5a1e..5f491c4d5a 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -36,8 +36,14 @@ import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.function.Function;
/**
@@ -58,6 +64,9 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
private final BlockLocationInfo blockInfo;
private final DatanodeDetails[] dataLocations;
private final BlockExtendedInputStream[] blockStreams;
+ private final Map<Integer, LinkedList<DatanodeDetails>> spareDataLocations
+ = new HashMap<>();
+ private final List<DatanodeDetails> failedLocations = new ArrayList<>();
private final int maxLocations;
private long position = 0;
@@ -263,7 +272,13 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
throw new IndexOutOfBoundsException("The index " + index + " is greater "
+ "than the EC Replication Config (" + repConfig + ")");
}
- dataLocations[index - 1] = location;
+ int arrayIndex = index - 1;
+ if (dataLocations[arrayIndex] == null) {
+ dataLocations[arrayIndex] = location;
+ } else {
+ spareDataLocations.computeIfAbsent(arrayIndex,
+ k -> new LinkedList<>()).add(location);
+ }
}
protected long blockLength() {
@@ -274,6 +289,45 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
return blockLength() - position;
}
+ @Override
+ public synchronized int read(byte[] b, int off, int len) throws IOException {
+ return read(ByteBuffer.wrap(b, off, len));
+ }
+
+ @Override
+ public synchronized int read(ByteBuffer byteBuffer) throws IOException {
+ while (true) {
+ int currentBufferPosition = byteBuffer.position();
+ long currentPosition = getPos();
+ try {
+ return super.read(byteBuffer);
+ } catch (BadDataLocationException e) {
+ int failedIndex = e.getFailedLocationIndex();
+ if (shouldRetryFailedRead(failedIndex)) {
+ byteBuffer.position(currentBufferPosition);
+ seek(currentPosition);
+ } else {
+ e.addFailedLocations(failedLocations);
+ throw e;
+ }
+ }
+ }
+ }
+
+ private boolean shouldRetryFailedRead(int failedIndex) throws IOException {
+ LinkedList<DatanodeDetails> spare = spareDataLocations.get(failedIndex);
+ if (spare != null && spare.size() > 0) {
+ failedLocations.add(dataLocations[failedIndex]);
+ dataLocations[failedIndex] = spare.removeFirst();
+ if (blockStreams[failedIndex] != null) {
+ blockStreams[failedIndex].close();
+ blockStreams[failedIndex] = null;
+ }
+ return true;
+ }
+ return false;
+ }
+
/**
* Read from the internal BlockInputStreams one EC cell at a time into the
* strategy buffer. This call may read from several internal BlockInputStreams
@@ -294,15 +348,15 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
int totalRead = 0;
while (strategy.getTargetLength() > 0 && remaining() > 0) {
+ int currentIndex = currentStreamIndex();
try {
- int currentIndex = currentStreamIndex();
BlockExtendedInputStream stream = getOrOpenStream(currentIndex);
int read = readFromStream(stream, strategy);
totalRead += read;
position += read;
} catch (IOException ioe) {
throw new BadDataLocationException(
- dataLocations[currentStreamIndex()], ioe);
+ dataLocations[currentIndex], currentIndex, ioe);
}
}
return totalRead;
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
index c1356230b2..47758eae02 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
@@ -182,7 +182,7 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
}
failoverToReconstructionRead(
- ((BadDataLocationException) e).getFailedLocation(), lastPosition);
+ ((BadDataLocationException) e).getFailedLocations(), lastPosition);
buf.reset();
totalRead += read(buf);
} else {
@@ -193,9 +193,10 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
}
private synchronized void failoverToReconstructionRead(
- DatanodeDetails badLocation, long lastPosition) throws IOException {
- if (badLocation != null) {
- failedLocations.add(badLocation);
+ List<DatanodeDetails> badLocations, long lastPosition)
+ throws IOException {
+ if (badLocations != null) {
+ failedLocations.addAll(badLocations);
}
blockReader.close();
reconstructionReader = true;
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
index e354fc7179..59d3cd387b 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
@@ -40,6 +40,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -393,7 +394,56 @@ public class TestECBlockInputStream {
BadDataLocationException e =
assertThrows(BadDataLocationException.class, () -> ecb.read(buf));
Assertions.assertEquals(2,
- keyInfo.getPipeline().getReplicaIndex(e.getFailedLocation()));
+ keyInfo.getPipeline().getReplicaIndex(e.getFailedLocations().get(0)));
+ }
+ }
+
+ @Test
+ public void testNoErrorIfSpareLocationToRead() throws IOException {
+ repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ ONEMB);
+ Map<DatanodeDetails, Integer> datanodes = new LinkedHashMap<>();
+ for (int i = 1; i <= repConfig.getRequiredNodes(); i++) {
+ datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i);
+ }
+ // Add a second index = 1
+ datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+
+ BlockLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 8 * ONEMB, datanodes);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+ keyInfo, true, null, null, streamFactory)) {
+ // Read a full stripe to ensure all streams are created in the stream
+ // factory
+ ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+ int read = ecb.read(buf);
+ Assertions.assertEquals(3 * ONEMB, read);
+ // Now make replication index 1 error on the next read but as there is a
+ // spare it should read from it with no errors
+ streamFactory.getBlockStreams().get(0).setThrowException(true);
+ buf.clear();
+ read = ecb.read(buf);
+ Assertions.assertEquals(3 * ONEMB, read);
+
+ // Now make the spare one error on the next read, and we should get an
+ // error with two failed locations. As each stream is created, a new
+ // stream will be created in the stream factory. Our read will read from
+ // DNs with EC indexes 1 - 3 first, creating streams 0 to 2. Then when
+ // stream(0) is failed for index=1 a new steam is created for the
+ // alternative index=1 at stream(3). Hence, to make it error we set
+ // stream(3) to throw as below.
+ streamFactory.getBlockStreams().get(3).setThrowException(true);
+ buf.clear();
+ BadDataLocationException e =
+ assertThrows(BadDataLocationException.class, () -> ecb.read(buf));
+ List<DatanodeDetails> failed = e.getFailedLocations();
+ // Expect 2 different DNs reported as failure
+ Assertions.assertEquals(2, failed.size());
+ Assertions.assertNotEquals(failed.get(0), failed.get(1));
+ // Both failures should map to index = 1.
+ for (DatanodeDetails dn : failed) {
+ Assertions.assertEquals(1, datanodes.get(dn));
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org