You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2021/04/13 02:46:05 UTC
[hadoop] branch branch-3.2 updated: HDFS-15759. EC: Verify EC
reconstruction correctness on DataNode (#2585) (#2868)
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 0faa626 HDFS-15759. EC: Verify EC reconstruction correctness on DataNode (#2585) (#2868)
0faa626 is described below
commit 0faa626bd84a02688e0938de1320fc32ec550aea
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Mon Apr 12 19:45:49 2021 -0700
HDFS-15759. EC: Verify EC reconstruction correctness on DataNode (#2585) (#2868)
(cherry picked from commit 95e68926750b55196cf9da53c25359c98ef58a4f)
(cherry picked from commit 6cfa01c905ca422c3c6b2d1d735dd16ee08b4fb4)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
Co-authored-by: touchida <56...@users.noreply.github.com>
---
.../io/erasurecode/rawcoder/DecodingValidator.java | 187 ++++++++++++++++
.../rawcoder/InvalidDecodingException.java | 35 +++
.../hadoop-common/src/site/markdown/Metrics.md | 1 +
.../hadoop/io/erasurecode/TestCoderBase.java | 12 ++
.../rawcoder/TestDecodingValidator.java | 237 +++++++++++++++++++++
.../io/erasurecode/rawcoder/TestRawCoderBase.java | 2 +-
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 +
.../server/datanode/DataNodeFaultInjector.java | 7 +
.../StripedBlockChecksumReconstructor.java | 12 +-
.../erasurecode/StripedBlockReconstructor.java | 26 ++-
.../datanode/erasurecode/StripedReconstructor.java | 46 +++-
.../server/datanode/metrics/DataNodeMetrics.java | 10 +
.../src/main/resources/hdfs-default.xml | 10 +
.../hadoop/hdfs/TestReconstructStripedFile.java | 24 ++-
.../TestReconstructStripedFileWithValidator.java | 115 ++++++++++
15 files changed, 720 insertions(+), 7 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingValidator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingValidator.java
new file mode 100644
index 0000000..235ef80
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingValidator.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.erasurecode.ECChunk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A utility class to validate decoding.
+ */
+@InterfaceAudience.Private
+public class DecodingValidator {
+
+ private final RawErasureDecoder decoder;
+ private ByteBuffer buffer;
+ private int[] newValidIndexes;
+ private int newErasedIndex;
+
+ public DecodingValidator(RawErasureDecoder decoder) {
+ this.decoder = decoder;
+ }
+
+ /**
+ * Validate outputs decoded from inputs, by decoding an input back from
+ * the outputs and comparing it with the original one.
+ *
+ * For instance, in RS (6, 3), let (d0, d1, d2, d3, d4, d5) be sources
+ * and (p0, p1, p2) be parities, and assume
+ * inputs = [d0, null (d1), d2, d3, d4, d5, null (p0), p1, null (p2)];
+ * erasedIndexes = [1, 6];
+ * outputs = [d1, p0].
+ * Then
+ * 1. Create new inputs, erasedIndexes and outputs for validation so that
+ * the inputs could contain the decoded outputs, and decode them:
+ * newInputs = [d1, d2, d3, d4, d5, p0]
+ * newErasedIndexes = [0]
+ * newOutputs = [d0']
+ * 2. Compare d0 and d0'. The comparison will fail with high probability
+ * when the initial outputs are wrong.
+ *
+ * Note that the input buffers' positions must be the ones where data are
+ * read: If the input buffers have been processed by a decoder, the buffers'
+ * positions must be reset before being passed into this method.
+ *
+ * This method does not change outputs and erasedIndexes.
+ *
+ * @param inputs input buffers used for decoding. The buffers' position
+ * are moved to the end after this method.
+ * @param erasedIndexes indexes of erased units used for decoding
+ * @param outputs decoded output buffers, which are ready to be read after
+ * the call
+ * @throws IOException
+ */
+ public void validate(ByteBuffer[] inputs, int[] erasedIndexes,
+ ByteBuffer[] outputs) throws IOException {
+ markBuffers(outputs);
+
+ try {
+ ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs);
+ boolean isDirect = validInput.isDirect();
+ int capacity = validInput.capacity();
+ int remaining = validInput.remaining();
+
+ // Init buffer
+ if (buffer == null || buffer.isDirect() != isDirect
+ || buffer.capacity() < remaining) {
+ buffer = allocateBuffer(isDirect, capacity);
+ }
+ buffer.clear().limit(remaining);
+
+ // Create newInputs and newErasedIndex for validation
+ ByteBuffer[] newInputs = new ByteBuffer[inputs.length];
+ int count = 0;
+ for (int i = 0; i < erasedIndexes.length; i++) {
+ newInputs[erasedIndexes[i]] = outputs[i];
+ count++;
+ }
+ newErasedIndex = -1;
+ boolean selected = false;
+ int numValidIndexes = CoderUtil.getValidIndexes(inputs).length;
+ for (int i = 0; i < newInputs.length; i++) {
+ if (count == numValidIndexes) {
+ break;
+ } else if (!selected && inputs[i] != null) {
+ newErasedIndex = i;
+ newInputs[i] = null;
+ selected = true;
+ } else if (newInputs[i] == null) {
+ newInputs[i] = inputs[i];
+ if (inputs[i] != null) {
+ count++;
+ }
+ }
+ }
+
+ // Keep it for testing
+ newValidIndexes = CoderUtil.getValidIndexes(newInputs);
+
+ decoder.decode(newInputs, new int[]{newErasedIndex},
+ new ByteBuffer[]{buffer});
+
+ if (!buffer.equals(inputs[newErasedIndex])) {
+ throw new InvalidDecodingException("Failed to validate decoding");
+ }
+ } finally {
+ toLimits(inputs);
+ resetBuffers(outputs);
+ }
+ }
+
+ /**
+ * Validate outputs decoded from inputs, by decoding an input back from
+ * those outputs and comparing it with the original one.
+ * @param inputs input buffers used for decoding
+ * @param erasedIndexes indexes of erased units used for decoding
+ * @param outputs decoded output buffers
+ * @throws IOException
+ */
+ public void validate(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs)
+ throws IOException {
+ ByteBuffer[] newInputs = CoderUtil.toBuffers(inputs);
+ ByteBuffer[] newOutputs = CoderUtil.toBuffers(outputs);
+ validate(newInputs, erasedIndexes, newOutputs);
+ }
+
+ private ByteBuffer allocateBuffer(boolean direct, int capacity) {
+ if (direct) {
+ buffer = ByteBuffer.allocateDirect(capacity);
+ } else {
+ buffer = ByteBuffer.allocate(capacity);
+ }
+ return buffer;
+ }
+
+ private static void markBuffers(ByteBuffer[] buffers) {
+ for (ByteBuffer buffer: buffers) {
+ if (buffer != null) {
+ buffer.mark();
+ }
+ }
+ }
+
+ private static void resetBuffers(ByteBuffer[] buffers) {
+ for (ByteBuffer buffer: buffers) {
+ if (buffer != null) {
+ buffer.reset();
+ }
+ }
+ }
+
+ private static void toLimits(ByteBuffer[] buffers) {
+ for (ByteBuffer buffer: buffers) {
+ if (buffer != null) {
+ buffer.position(buffer.limit());
+ }
+ }
+ }
+
+ @VisibleForTesting
+ protected int[] getNewValidIndexes() {
+ return newValidIndexes;
+ }
+
+ @VisibleForTesting
+ protected int getNewErasedIndex() {
+ return newErasedIndex;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/InvalidDecodingException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/InvalidDecodingException.java
new file mode 100644
index 0000000..37869f8
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/InvalidDecodingException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.IOException;
+
+/**
+ * Thrown for invalid decoding.
+ */
+@InterfaceAudience.Private
+public class InvalidDecodingException
+ extends IOException {
+ private static final long serialVersionUID = 0L;
+
+ public InvalidDecodingException(String description) {
+ super(description);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 0e48ca3..9cffedd 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -428,6 +428,7 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `BlocksDeletedInPendingIBR` | Number of blocks at deleted status in pending incremental block report (IBR) |
| `EcReconstructionTasks` | Total number of erasure coding reconstruction tasks |
| `EcFailedReconstructionTasks` | Total number of erasure coding failed reconstruction tasks |
+| `EcInvalidReconstructionTasks` | Total number of erasure coding invalidated reconstruction tasks |
| `EcDecodingTimeNanos` | Total number of nanoseconds spent by decoding tasks |
| `EcReconstructionBytesRead` | Total number of bytes read by erasure coding worker |
| `EcReconstructionBytesWritten` | Total number of bytes written by erasure coding worker |
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
index 6d14de8..331cecb 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
@@ -527,4 +527,16 @@ public abstract class TestCoderBase {
buffer.position(buffer.position() + 1);
}
}
+
+ /**
+ * Pollute some chunk.
+ * @param chunks
+ */
+ protected void polluteSomeChunk(ECChunk[] chunks) {
+ int idx = new Random().nextInt(chunks.length);
+ ByteBuffer buffer = chunks[idx].getBuffer();
+ buffer.mark();
+ buffer.put((byte) ((buffer.get(buffer.position()) + 1)));
+ buffer.reset();
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestDecodingValidator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestDecodingValidator.java
new file mode 100644
index 0000000..06744cc
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestDecodingValidator.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.io.erasurecode.ECChunk;
+import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test {@link DecodingValidator} under various decoders.
+ */
+@RunWith(Parameterized.class)
+public class TestDecodingValidator extends TestRawCoderBase {
+
+ private DecodingValidator validator;
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {RSRawErasureCoderFactory.class, 6, 3, new int[]{1}, new int[]{}},
+ {RSRawErasureCoderFactory.class, 6, 3, new int[]{3}, new int[]{0}},
+ {RSRawErasureCoderFactory.class, 6, 3, new int[]{2, 4}, new int[]{1}},
+ {NativeRSRawErasureCoderFactory.class, 6, 3, new int[]{0}, new int[]{}},
+ {XORRawErasureCoderFactory.class, 10, 1, new int[]{0}, new int[]{}},
+ {NativeXORRawErasureCoderFactory.class, 10, 1, new int[]{0},
+ new int[]{}}
+ });
+ }
+
+ public TestDecodingValidator(
+ Class<? extends RawErasureCoderFactory> factoryClass, int numDataUnits,
+ int numParityUnits, int[] erasedDataIndexes, int[] erasedParityIndexes) {
+ this.encoderFactoryClass = factoryClass;
+ this.decoderFactoryClass = factoryClass;
+ this.numDataUnits = numDataUnits;
+ this.numParityUnits = numParityUnits;
+ this.erasedDataIndexes = erasedDataIndexes;
+ this.erasedParityIndexes = erasedParityIndexes;
+ }
+
+ @Before
+ public void setup() {
+ if (encoderFactoryClass == NativeRSRawErasureCoderFactory.class
+ || encoderFactoryClass == NativeXORRawErasureCoderFactory.class) {
+ Assume.assumeTrue(ErasureCodeNative.isNativeCodeLoaded());
+ }
+ setAllowDump(false);
+ }
+
+ /**
+ * Test if the same validator can process direct and non-direct buffers.
+ */
+ @Test
+ public void testValidate() {
+ prepare(null, numDataUnits, numParityUnits, erasedDataIndexes,
+ erasedParityIndexes);
+ testValidate(true);
+ testValidate(false);
+ }
+
+ /**
+ * Test if the same validator can process variable width of data for
+ * inputs and outputs.
+ */
+ protected void testValidate(boolean usingDirectBuffer) {
+ this.usingDirectBuffer = usingDirectBuffer;
+ prepareCoders(false);
+ prepareValidator(false);
+
+ performTestValidate(baseChunkSize);
+ performTestValidate(baseChunkSize - 17);
+ performTestValidate(baseChunkSize + 18);
+ }
+
+ protected void prepareValidator(boolean recreate) {
+ if (validator == null || recreate) {
+ validator = new DecodingValidator(decoder);
+ }
+ }
+
+ protected void performTestValidate(int chunkSize) {
+ setChunkSize(chunkSize);
+ prepareBufferAllocator(false);
+
+ // encode
+ ECChunk[] dataChunks = prepareDataChunksForEncoding();
+ ECChunk[] parityChunks = prepareParityChunksForEncoding();
+ ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
+ try {
+ encoder.encode(dataChunks, parityChunks);
+ } catch (Exception e) {
+ Assert.fail("Should not get Exception: " + e.getMessage());
+ }
+
+ // decode
+ backupAndEraseChunks(clonedDataChunks, parityChunks);
+ ECChunk[] inputChunks =
+ prepareInputChunksForDecoding(clonedDataChunks, parityChunks);
+ markChunks(inputChunks);
+ ensureOnlyLeastRequiredChunks(inputChunks);
+ ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
+ int[] erasedIndexes = getErasedIndexesForDecoding();
+ try {
+ decoder.decode(inputChunks, erasedIndexes, recoveredChunks);
+ } catch (Exception e) {
+ Assert.fail("Should not get Exception: " + e.getMessage());
+ }
+
+ // validate
+ restoreChunksFromMark(inputChunks);
+ ECChunk[] clonedInputChunks = cloneChunksWithData(inputChunks);
+ ECChunk[] clonedRecoveredChunks = cloneChunksWithData(recoveredChunks);
+ int[] clonedErasedIndexes = erasedIndexes.clone();
+
+ try {
+ validator.validate(clonedInputChunks, clonedErasedIndexes,
+ clonedRecoveredChunks);
+ } catch (Exception e) {
+ Assert.fail("Should not get Exception: " + e.getMessage());
+ }
+
+ // Check if input buffers' positions are moved to the end
+ verifyBufferPositionAtEnd(clonedInputChunks);
+
+ // Check if validator does not change recovered chunks and erased indexes
+ verifyChunksEqual(recoveredChunks, clonedRecoveredChunks);
+ Assert.assertArrayEquals("Erased indexes should not be changed",
+ erasedIndexes, clonedErasedIndexes);
+
+ // Check if validator uses correct indexes for validation
+ List<Integer> validIndexesList =
+ IntStream.of(CoderUtil.getValidIndexes(inputChunks)).boxed()
+ .collect(Collectors.toList());
+ List<Integer> newValidIndexesList =
+ IntStream.of(validator.getNewValidIndexes()).boxed()
+ .collect(Collectors.toList());
+ List<Integer> erasedIndexesList =
+ IntStream.of(erasedIndexes).boxed().collect(Collectors.toList());
+ int newErasedIndex = validator.getNewErasedIndex();
+ Assert.assertTrue(
+ "Valid indexes for validation should contain"
+ + " erased indexes for decoding",
+ newValidIndexesList.containsAll(erasedIndexesList));
+ Assert.assertTrue(
+ "An erased index for validation should be contained"
+ + " in valid indexes for decoding",
+ validIndexesList.contains(newErasedIndex));
+ Assert.assertFalse(
+ "An erased index for validation should not be contained"
+ + " in valid indexes for validation",
+ newValidIndexesList.contains(newErasedIndex));
+ }
+
+ private void verifyChunksEqual(ECChunk[] chunks1, ECChunk[] chunks2) {
+ boolean result = Arrays.deepEquals(toArrays(chunks1), toArrays(chunks2));
+ assertTrue("Recovered chunks should not be changed", result);
+ }
+
+ /**
+ * Test if validator throws {@link InvalidDecodingException} when
+ * a decoded output buffer is polluted.
+ */
+ @Test
+ public void testValidateWithBadDecoding() throws IOException {
+ prepare(null, numDataUnits, numParityUnits, erasedDataIndexes,
+ erasedParityIndexes);
+ this.usingDirectBuffer = true;
+ prepareCoders(true);
+ prepareValidator(true);
+ prepareBufferAllocator(false);
+
+ // encode
+ ECChunk[] dataChunks = prepareDataChunksForEncoding();
+ ECChunk[] parityChunks = prepareParityChunksForEncoding();
+ ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
+ try {
+ encoder.encode(dataChunks, parityChunks);
+ } catch (Exception e) {
+ Assert.fail("Should not get Exception: " + e.getMessage());
+ }
+
+ // decode
+ backupAndEraseChunks(clonedDataChunks, parityChunks);
+ ECChunk[] inputChunks =
+ prepareInputChunksForDecoding(clonedDataChunks, parityChunks);
+ markChunks(inputChunks);
+ ensureOnlyLeastRequiredChunks(inputChunks);
+ ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
+ int[] erasedIndexes = getErasedIndexesForDecoding();
+ try {
+ decoder.decode(inputChunks, erasedIndexes, recoveredChunks);
+ } catch (Exception e) {
+ Assert.fail("Should not get Exception: " + e.getMessage());
+ }
+
+ // validate
+ restoreChunksFromMark(inputChunks);
+ polluteSomeChunk(recoveredChunks);
+ try {
+ validator.validate(inputChunks, erasedIndexes, recoveredChunks);
+ Assert.fail("Validation should fail due to bad decoding");
+ } catch (InvalidDecodingException e) {
+ String expected = "Failed to validate decoding";
+ GenericTestUtils.assertExceptionContains(expected, e);
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
index 4519e35..eb63494 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
@@ -334,7 +334,7 @@ public abstract class TestRawCoderBase extends TestCoderBase {
verifyBufferPositionAtEnd(inputChunks);
}
- private void verifyBufferPositionAtEnd(ECChunk[] inputChunks) {
+ void verifyBufferPositionAtEnd(ECChunk[] inputChunks) {
for (ECChunk chunk : inputChunks) {
if (chunk != null) {
Assert.assertEquals(0, chunk.getBuffer().remaining());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 7f57f4c..4ce08cd 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -764,6 +764,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.ec.reconstruction.xmits.weight";
public static final float DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_DEFAULT =
0.5f;
+ public static final String DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY =
+ "dfs.datanode.ec.reconstruction.validation";
+ public static final boolean DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE = false;
public static final String
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index cf5cfe5..821717b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
/**
* Used for injecting faults in DFSClient and DFSOutputStream tests.
@@ -127,4 +128,10 @@ public class DataNodeFaultInjector {
* Just delay a while.
*/
public void delay() {}
+
+ /**
+ * Used as a hook to inject data pollution
+ * into an erasure coding reconstruction.
+ */
+ public void badDecoding(ByteBuffer[] outputs) {}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
index e28d6c5..a196935 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
@@ -57,6 +57,7 @@ public abstract class StripedBlockChecksumReconstructor
private void init() throws IOException {
initDecoderIfNecessary();
+ initDecodingValidatorIfNecessary();
getStripedReader().init();
// allocate buffer to keep the reconstructed block data
targetBuffer = allocateBuffer(getBufferSize());
@@ -192,7 +193,16 @@ public abstract class StripedBlockChecksumReconstructor
for (int i = 0; i < targetIndices.length; i++) {
tarIndices[i] = targetIndices[i];
}
- getDecoder().decode(inputs, tarIndices, outputs);
+
+ if (isValidationEnabled()) {
+ markBuffers(inputs);
+ getDecoder().decode(inputs, tarIndices, outputs);
+ resetBuffers(inputs);
+
+ getValidator().validate(inputs, tarIndices, outputs);
+ } else {
+ getDecoder().decode(inputs, tarIndices, outputs);
+ }
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
index 1af2380..cd59f51 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.io.erasurecode.rawcoder.InvalidDecodingException;
import org.apache.hadoop.util.Time;
/**
@@ -53,6 +54,8 @@ class StripedBlockReconstructor extends StripedReconstructor
try {
initDecoderIfNecessary();
+ initDecodingValidatorIfNecessary();
+
getStripedReader().init();
stripedWriter.init();
@@ -126,12 +129,31 @@ class StripedBlockReconstructor extends StripedReconstructor
int[] erasedIndices = stripedWriter.getRealTargetIndices();
ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);
+ if (isValidationEnabled()) {
+ markBuffers(inputs);
+ decode(inputs, erasedIndices, outputs);
+ resetBuffers(inputs);
+
+ DataNodeFaultInjector.get().badDecoding(outputs);
+ try {
+ getValidator().validate(inputs, erasedIndices, outputs);
+ } catch (InvalidDecodingException e) {
+ getDatanode().getMetrics().incrECInvalidReconstructionTasks();
+ throw e;
+ }
+ } else {
+ decode(inputs, erasedIndices, outputs);
+ }
+
+ stripedWriter.updateRealTargetBuffers(toReconstructLen);
+ }
+
+ private void decode(ByteBuffer[] inputs, int[] erasedIndices,
+ ByteBuffer[] outputs) throws IOException {
long start = System.nanoTime();
getDecoder().decode(inputs, erasedIndices, outputs);
long end = System.nanoTime();
this.getDatanode().getMetrics().incrECDecodingTime(end - start);
-
- stripedWriter.updateRealTargetBuffers(toReconstructLen);
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
index 48a0747..0b1cc4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.erasurecode.rawcoder.DecodingValidator;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -103,10 +105,14 @@ abstract class StripedReconstructor {
private final Configuration conf;
private final DataNode datanode;
private final ErasureCodingPolicy ecPolicy;
+ private final ErasureCoderOptions coderOptions;
private RawErasureDecoder decoder;
private final ExtendedBlock blockGroup;
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
+ private final boolean isValidationEnabled;
+ private DecodingValidator validator;
+
// position in striped internal block
private long positionInBlock;
private StripedReader stripedReader;
@@ -136,6 +142,13 @@ abstract class StripedReconstructor {
cachingStrategy = CachingStrategy.newDefaultStrategy();
positionInBlock = 0L;
+
+ coderOptions = new ErasureCoderOptions(
+ ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
+ isValidationEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY,
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE)
+ && !coderOptions.allowChangeInputs();
}
public void incrBytesRead(boolean local, long delta) {
@@ -196,13 +209,18 @@ abstract class StripedReconstructor {
// Initialize decoder
protected void initDecoderIfNecessary() {
if (decoder == null) {
- ErasureCoderOptions coderOptions = new ErasureCoderOptions(
- ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
decoder = CodecUtil.createRawDecoder(conf, ecPolicy.getCodecName(),
coderOptions);
}
}
+ // Initialize decoding validator
+ protected void initDecodingValidatorIfNecessary() {
+ if (isValidationEnabled && validator == null) {
+ validator = new DecodingValidator(decoder);
+ }
+ }
+
long getPositionInBlock() {
return positionInBlock;
}
@@ -285,4 +303,28 @@ abstract class StripedReconstructor {
static ByteBufferPool getBufferPool() {
return BUFFER_POOL;
}
+
+ boolean isValidationEnabled() {
+ return isValidationEnabled;
+ }
+
+ DecodingValidator getValidator() {
+ return validator;
+ }
+
+ protected static void markBuffers(ByteBuffer[] buffers) {
+ for (ByteBuffer buffer: buffers) {
+ if (buffer != null) {
+ buffer.mark();
+ }
+ }
+ }
+
+ protected static void resetBuffers(ByteBuffer[] buffers) {
+ for (ByteBuffer buffer: buffers) {
+ if (buffer != null) {
+ buffer.reset();
+ }
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index b02fc1e..01731d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -145,6 +145,8 @@ public class DataNodeMetrics {
MutableCounterLong ecReconstructionTasks;
@Metric("Count of erasure coding failed reconstruction tasks")
MutableCounterLong ecFailedReconstructionTasks;
+ @Metric("Count of erasure coding invalidated reconstruction tasks")
+ private MutableCounterLong ecInvalidReconstructionTasks;
@Metric("Nanoseconds spent by decoding tasks")
MutableCounterLong ecDecodingTimeNanos;
@Metric("Bytes read by erasure coding worker")
@@ -486,6 +488,14 @@ public class DataNodeMetrics {
ecFailedReconstructionTasks.incr();
}
+ public void incrECInvalidReconstructionTasks() {
+ ecInvalidReconstructionTasks.incr();
+ }
+
+ public long getECInvalidReconstructionTasks() {
+ return ecInvalidReconstructionTasks.value();
+ }
+
public void incrDataNodeActiveXceiversCount() {
dataNodeActiveXceiversCount.incr();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 09c4f54..d413ed1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3318,6 +3318,16 @@
</property>
<property>
+ <name>dfs.datanode.ec.reconstruction.validation</name>
+ <value>false</value>
+ <description>
+ Decide if datanode validates that EC reconstruction tasks reconstruct
+ target blocks correctly. When validation fails, reconstruction tasks
+ will fail and be retried by namenode.
+ </description>
+</property>
+
+<property>
<name>dfs.namenode.quota.init-threads</name>
<value>4</value>
<description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
index 16ce0dd..5b639a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
@@ -107,6 +107,23 @@ public class TestReconstructStripedFile {
return StripedFileTestUtil.getDefaultECPolicy();
}
+ public boolean isValidationEnabled() {
+ return false;
+ }
+
+ public int getPendingTimeout() {
+ return DFSConfigKeys
+ .DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT;
+ }
+
+ public int getBlockSize() {
+ return blockSize;
+ }
+
+ public MiniDFSCluster getCluster() {
+ return cluster;
+ }
+
@Before
public void setup() throws IOException {
ecPolicy = getEcPolicy();
@@ -130,6 +147,11 @@ public class TestReconstructStripedFile {
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
NativeRSRawErasureCoderFactory.CODER_NAME);
}
+ conf.setInt(
+ DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
+ getPendingTimeout());
+ conf.setBoolean(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY,
+ isValidationEnabled());
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, basedir).numDataNodes(dnNum)
.build();
@@ -305,7 +327,7 @@ public class TestReconstructStripedFile {
* and verify the block replica length, generationStamp and content.
* 2. Read the file and verify content.
*/
- private void assertFileBlocksReconstruction(String fileName, int fileLen,
+ void assertFileBlocksReconstruction(String fileName, int fileLen,
ReconstructionType type, int toRecoverBlockNum) throws Exception {
if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFileWithValidator.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFileWithValidator.java
new file mode 100644
index 0000000..00749ef
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFileWithValidator.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This test extends {@link TestReconstructStripedFile} to test
+ * ec reconstruction validation.
+ */
+public class TestReconstructStripedFileWithValidator
+ extends TestReconstructStripedFile {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestReconstructStripedFileWithValidator.class);
+
+ public TestReconstructStripedFileWithValidator() {
+ LOG.info("run {} with validator.",
+ TestReconstructStripedFileWithValidator.class.getSuperclass()
+ .getSimpleName());
+ }
+
+ /**
+ * This test injects data pollution into decoded outputs once.
+ * When validation enabled, the first reconstruction task should fail
+ * in the validation, but the data will be recovered correctly
+ * by the next task.
+ * On the other hand, when validation disabled, the first reconstruction task
+ * will succeed and then lead to data corruption.
+ */
+ @Test(timeout = 120000)
+ public void testValidatorWithBadDecoding()
+ throws Exception {
+ MiniDFSCluster cluster = getCluster();
+
+ cluster.getDataNodes().stream()
+ .map(DataNode::getMetrics)
+ .map(DataNodeMetrics::getECInvalidReconstructionTasks)
+ .forEach(n -> Assert.assertEquals(0, (long) n));
+
+ DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+ DataNodeFaultInjector badDecodingInjector = new DataNodeFaultInjector() {
+ private final AtomicBoolean flag = new AtomicBoolean(false);
+
+ @Override
+ public void badDecoding(ByteBuffer[] outputs) {
+ if (!flag.get()) {
+ for (ByteBuffer output : outputs) {
+ output.mark();
+ output.put((byte) (output.get(output.position()) + 1));
+ output.reset();
+ }
+ }
+ flag.set(true);
+ }
+ };
+ DataNodeFaultInjector.set(badDecodingInjector);
+
+ int fileLen =
+ (getEcPolicy().getNumDataUnits() + getEcPolicy().getNumParityUnits())
+ * getBlockSize() + getBlockSize() / 10;
+ try {
+ assertFileBlocksReconstruction(
+ "/testValidatorWithBadDecoding",
+ fileLen,
+ ReconstructionType.DataOnly,
+ getEcPolicy().getNumParityUnits());
+
+ long sum = cluster.getDataNodes().stream()
+ .map(DataNode::getMetrics)
+ .mapToLong(DataNodeMetrics::getECInvalidReconstructionTasks)
+ .sum();
+ Assert.assertEquals(1, sum);
+ } finally {
+ DataNodeFaultInjector.set(oldInjector);
+ }
+ }
+
+ @Override
+ public boolean isValidationEnabled() {
+ return true;
+ }
+
+ /**
+ * Set a small value for the failed reconstruction task to be
+ * rescheduled in a short period of time.
+ */
+ @Override
+ public int getPendingTimeout() {
+ return 10;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org