You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/03/23 21:15:01 UTC
[28/50] [abbrv] hadoop git commit: HADOOP-11541. Raw XOR coder
HADOOP-11541. Raw XOR coder
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aab05c40
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aab05c40
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aab05c40
Branch: refs/heads/HDFS-7285
Commit: aab05c40ad6869142df48b6985964207add0e0df
Parents: 0119597
Author: Kai Zheng <dr...@apache.org>
Authored: Sun Feb 8 01:40:27 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Mar 23 11:06:45 2015 -0700
----------------------------------------------------------------------
.../io/erasurecode/rawcoder/XorRawDecoder.java | 81 ++++++
.../io/erasurecode/rawcoder/XorRawEncoder.java | 61 +++++
.../hadoop/io/erasurecode/TestCoderBase.java | 262 +++++++++++++++++++
.../erasurecode/rawcoder/TestRawCoderBase.java | 96 +++++++
.../erasurecode/rawcoder/TestXorRawCoder.java | 52 ++++
5 files changed, 552 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aab05c40/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawDecoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawDecoder.java
new file mode 100644
index 0000000..98307a7
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawDecoder.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A raw decoder in XOR code scheme in pure Java, adapted from HDFS-RAID.
+ */
+public class XorRawDecoder extends AbstractRawErasureDecoder {
+
+ @Override
+ protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
+ ByteBuffer[] outputs) {
+ assert(erasedIndexes.length == outputs.length);
+ assert(erasedIndexes.length <= 1);
+
+ int bufSize = inputs[0].remaining();
+ int erasedIdx = erasedIndexes[0];
+
+ // Set the output to zeros.
+ for (int j = 0; j < bufSize; j++) {
+ outputs[0].put(j, (byte) 0);
+ }
+
+ // Process the inputs.
+ for (int i = 0; i < inputs.length; i++) {
+ // Skip the erased location.
+ if (i == erasedIdx) {
+ continue;
+ }
+
+ for (int j = 0; j < bufSize; j++) {
+ outputs[0].put(j, (byte) (outputs[0].get(j) ^ inputs[i].get(j)));
+ }
+ }
+ }
+
+ @Override
+ protected void doDecode(byte[][] inputs, int[] erasedIndexes,
+ byte[][] outputs) {
+ assert(erasedIndexes.length == outputs.length);
+ assert(erasedIndexes.length <= 1);
+
+ int bufSize = inputs[0].length;
+ int erasedIdx = erasedIndexes[0];
+
+ // Set the output to zeros.
+ for (int j = 0; j < bufSize; j++) {
+ outputs[0][j] = 0;
+ }
+
+ // Process the inputs.
+ for (int i = 0; i < inputs.length; i++) {
+ // Skip the erased location.
+ if (i == erasedIdx) {
+ continue;
+ }
+
+ for (int j = 0; j < bufSize; j++) {
+ outputs[0][j] ^= inputs[i][j];
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aab05c40/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawEncoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawEncoder.java
new file mode 100644
index 0000000..99b20b9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawEncoder.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A raw encoder in XOR code scheme in pure Java, adapted from HDFS-RAID.
+ */
+public class XorRawEncoder extends AbstractRawErasureEncoder {
+
+ @Override
+ protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
+ int bufSize = inputs[0].remaining();
+
+ // Get the first buffer's data.
+ for (int j = 0; j < bufSize; j++) {
+ outputs[0].put(j, inputs[0].get(j));
+ }
+
+ // XOR with everything else.
+ for (int i = 1; i < inputs.length; i++) {
+ for (int j = 0; j < bufSize; j++) {
+ outputs[0].put(j, (byte) (outputs[0].get(j) ^ inputs[i].get(j)));
+ }
+ }
+ }
+
+ @Override
+ protected void doEncode(byte[][] inputs, byte[][] outputs) {
+ int bufSize = inputs[0].length;
+
+ // Get the first buffer's data.
+ for (int j = 0; j < bufSize; j++) {
+ outputs[0][j] = inputs[0][j];
+ }
+
+ // XOR with everything else.
+ for (int i = 1; i < inputs.length; i++) {
+ for (int j = 0; j < bufSize; j++) {
+ outputs[0][j] ^= inputs[i][j];
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aab05c40/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
new file mode 100644
index 0000000..9482b43
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Test base of common utilities for tests not only raw coders but also block
+ * coders.
+ */
+public abstract class TestCoderBase {
+ protected static Random RAND = new Random();
+
+ protected int numDataUnits;
+ protected int numParityUnits;
+ protected int chunkSize = 16 * 1024;
+
+ // Indexes of erased data units. Will also support test of erasing
+ // parity units
+ protected int[] erasedDataIndexes = new int[] {0};
+
+ // Data buffers are either direct or on-heap, for performance the two cases
+ // may go to different coding implementations.
+ protected boolean usingDirectBuffer = true;
+
+ /**
+ * Compare and verify if erased chunks are equal to recovered chunks
+ * @param erasedChunks
+ * @param recoveredChunks
+ */
+ protected void compareAndVerify(ECChunk[] erasedChunks,
+ ECChunk[] recoveredChunks) {
+ byte[][] erased = ECChunk.toArray(erasedChunks);
+ byte[][] recovered = ECChunk.toArray(recoveredChunks);
+ for (int i = 0; i < erasedChunks.length; ++i) {
+ assertArrayEquals("Decoding and comparing failed.", erased[i],
+ recovered[i]);
+ }
+ }
+
+ /**
+ * Adjust and return erased indexes based on the array of the input chunks (
+ * parity chunks + data chunks).
+ * @return
+ */
+ protected int[] getErasedIndexesForDecoding() {
+ int[] erasedIndexesForDecoding = new int[erasedDataIndexes.length];
+ for (int i = 0; i < erasedDataIndexes.length; ++i) {
+ erasedIndexesForDecoding[i] = erasedDataIndexes[i] + numParityUnits;
+ }
+ return erasedIndexesForDecoding;
+ }
+
+ /**
+ * Return input chunks for decoding, which is parityChunks + dataChunks.
+ * @param dataChunks
+ * @param parityChunks
+ * @return
+ */
+ protected ECChunk[] prepareInputChunksForDecoding(ECChunk[] dataChunks,
+ ECChunk[] parityChunks) {
+ ECChunk[] inputChunks = new ECChunk[numParityUnits + numDataUnits];
+
+ int idx = 0;
+ for (int i = 0; i < numParityUnits; i++) {
+ inputChunks[idx ++] = parityChunks[i];
+ }
+ for (int i = 0; i < numDataUnits; i++) {
+ inputChunks[idx ++] = dataChunks[i];
+ }
+
+ return inputChunks;
+ }
+
+ /**
+ * Have a copy of the data chunks that's to be erased thereafter. The copy
+ * will be used to compare and verify with the to be recovered chunks.
+ * @param dataChunks
+ * @return
+ */
+ protected ECChunk[] copyDataChunksToErase(ECChunk[] dataChunks) {
+ ECChunk[] copiedChunks = new ECChunk[erasedDataIndexes.length];
+
+ int j = 0;
+ for (int i = 0; i < erasedDataIndexes.length; ++i) {
+ copiedChunks[j ++] = cloneChunkWithData(dataChunks[erasedDataIndexes[i]]);
+ }
+
+ return copiedChunks;
+ }
+
+ /**
+ * Erase some data chunks to test the recovering of them
+ * @param dataChunks
+ */
+ protected void eraseSomeDataBlocks(ECChunk[] dataChunks) {
+ for (int i = 0; i < erasedDataIndexes.length; ++i) {
+ eraseDataFromChunk(dataChunks[erasedDataIndexes[i]]);
+ }
+ }
+
+ /**
+ * Erase data from the specified chunks, putting ZERO bytes to the buffers.
+ * @param chunks
+ */
+ protected void eraseDataFromChunks(ECChunk[] chunks) {
+ for (int i = 0; i < chunks.length; ++i) {
+ eraseDataFromChunk(chunks[i]);
+ }
+ }
+
+ /**
+ * Erase data from the specified chunk, putting ZERO bytes to the buffer.
+ * @param chunk
+ */
+ protected void eraseDataFromChunk(ECChunk chunk) {
+ ByteBuffer chunkBuffer = chunk.getBuffer();
+ // erase the data
+ chunkBuffer.position(0);
+ for (int i = 0; i < chunkSize; ++i) {
+ chunkBuffer.put((byte) 0);
+ }
+ chunkBuffer.flip();
+ }
+
+ /**
+ * Clone chunks along with copying the associated data. It respects how the
+ * chunk buffer is allocated, direct or non-direct. It avoids affecting the
+ * original chunk buffers.
+ * @param chunks
+ * @return
+ */
+ protected static ECChunk[] cloneChunksWithData(ECChunk[] chunks) {
+ ECChunk[] results = new ECChunk[chunks.length];
+ for (int i = 0; i < chunks.length; ++i) {
+ results[i] = cloneChunkWithData(chunks[i]);
+ }
+
+ return results;
+ }
+
+ /**
+ * Clone chunk along with copying the associated data. It respects how the
+ * chunk buffer is allocated, direct or non-direct. It avoids affecting the
+ * original chunk.
+ * @param chunk
+ * @return a new chunk
+ */
+ protected static ECChunk cloneChunkWithData(ECChunk chunk) {
+ ByteBuffer srcBuffer = chunk.getBuffer();
+ ByteBuffer destBuffer;
+
+ byte[] bytesArr = new byte[srcBuffer.remaining()];
+ srcBuffer.mark();
+ srcBuffer.get(bytesArr);
+ srcBuffer.reset();
+
+ if (srcBuffer.hasArray()) {
+ destBuffer = ByteBuffer.wrap(bytesArr);
+ } else {
+ destBuffer = ByteBuffer.allocateDirect(srcBuffer.remaining());
+ destBuffer.put(bytesArr);
+ destBuffer.flip();
+ }
+
+ return new ECChunk(destBuffer);
+ }
+
+ /**
+ * Allocate a chunk for output or writing.
+ * @return
+ */
+ protected ECChunk allocateOutputChunk() {
+ ByteBuffer buffer = allocateOutputBuffer();
+
+ return new ECChunk(buffer);
+ }
+
+ /**
+ * Allocate a buffer for output or writing.
+ * @return
+ */
+ protected ByteBuffer allocateOutputBuffer() {
+ ByteBuffer buffer = usingDirectBuffer ?
+ ByteBuffer.allocateDirect(chunkSize) : ByteBuffer.allocate(chunkSize);
+
+ return buffer;
+ }
+
+ /**
+ * Prepare data chunks for each data unit, by generating random data.
+ * @return
+ */
+ protected ECChunk[] prepareDataChunksForEncoding() {
+ ECChunk[] chunks = new ECChunk[numDataUnits];
+ for (int i = 0; i < chunks.length; i++) {
+ chunks[i] = generateDataChunk();
+ }
+
+ return chunks;
+ }
+
+ /**
+ * Generate data chunk by making random data.
+ * @return
+ */
+ protected ECChunk generateDataChunk() {
+ ByteBuffer buffer = allocateOutputBuffer();
+ for (int i = 0; i < chunkSize; i++) {
+ buffer.put((byte) RAND.nextInt(256));
+ }
+ buffer.flip();
+
+ return new ECChunk(buffer);
+ }
+
+ /**
+ * Prepare parity chunks for encoding, each chunk for each parity unit.
+ * @return
+ */
+ protected ECChunk[] prepareParityChunksForEncoding() {
+ ECChunk[] chunks = new ECChunk[numParityUnits];
+ for (int i = 0; i < chunks.length; i++) {
+ chunks[i] = allocateOutputChunk();
+ }
+
+ return chunks;
+ }
+
+ /**
+ * Prepare output chunks for decoding, each output chunk for each erased
+ * chunk.
+ * @return
+ */
+ protected ECChunk[] prepareOutputChunksForDecoding() {
+ ECChunk[] chunks = new ECChunk[erasedDataIndexes.length];
+ for (int i = 0; i < chunks.length; i++) {
+ chunks[i] = allocateOutputChunk();
+ }
+
+ return chunks;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aab05c40/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
new file mode 100644
index 0000000..9119211
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.io.erasurecode.ECChunk;
+import org.apache.hadoop.io.erasurecode.TestCoderBase;
+
+/**
+ * Raw coder test base with utilities.
+ */
+public abstract class TestRawCoderBase extends TestCoderBase {
+ protected Class<? extends RawErasureEncoder> encoderClass;
+ protected Class<? extends RawErasureDecoder> decoderClass;
+
+ /**
+ * Generating source data, encoding, recovering and then verifying.
+ * RawErasureCoder mainly uses ECChunk to pass input and output data buffers,
+ * it supports two kinds of ByteBuffers, one is array backed, the other is
+ * direct ByteBuffer. Have usingDirectBuffer to indicate which case to test.
+ * @param usingDirectBuffer
+ */
+ protected void testCoding(boolean usingDirectBuffer) {
+ // Generate data and encode
+ ECChunk[] dataChunks = prepareDataChunksForEncoding();
+ ECChunk[] parityChunks = prepareParityChunksForEncoding();
+ RawErasureEncoder encoder = createEncoder();
+
+ // Backup all the source chunks for later recovering because some coders
+ // may affect the source data.
+ ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
+ // Make a copy of a strip for later comparing
+ ECChunk[] toEraseDataChunks = copyDataChunksToErase(clonedDataChunks);
+
+ encoder.encode(dataChunks, parityChunks);
+ // Erase the copied sources
+ eraseSomeDataBlocks(clonedDataChunks);
+
+ //Decode
+ ECChunk[] inputChunks = prepareInputChunksForDecoding(clonedDataChunks,
+ parityChunks);
+ ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
+ RawErasureDecoder decoder = createDecoder();
+ decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks);
+
+ //Compare
+ compareAndVerify(toEraseDataChunks, recoveredChunks);
+ }
+
+ /**
+ * Create the raw erasure encoder to test
+ * @return
+ */
+ protected RawErasureEncoder createEncoder() {
+ RawErasureEncoder encoder;
+ try {
+ encoder = encoderClass.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create encoder", e);
+ }
+
+ encoder.initialize(numDataUnits, numParityUnits, chunkSize);
+ return encoder;
+ }
+
+ /**
+ * create the raw erasure decoder to test
+ * @return
+ */
+ protected RawErasureDecoder createDecoder() {
+ RawErasureDecoder decoder;
+ try {
+ decoder = decoderClass.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create decoder", e);
+ }
+
+ decoder.initialize(numDataUnits, numParityUnits, chunkSize);
+ return decoder;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/aab05c40/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXorRawCoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXorRawCoder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXorRawCoder.java
new file mode 100644
index 0000000..8e59b8a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXorRawCoder.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Random;
+
+/**
+ * Test XOR encoding and decoding.
+ */
+public class TestXorRawCoder extends TestRawCoderBase {
+ private static Random RAND = new Random();
+
+ @Before
+ public void setup() {
+ this.encoderClass = XorRawEncoder.class;
+ this.decoderClass = XorRawDecoder.class;
+
+ this.numDataUnits = 10;
+ this.numParityUnits = 1;
+
+ this.erasedDataIndexes = new int[] {0};
+ }
+
+ @Test
+ public void testCodingNoDirectBuffer() {
+ testCoding(false);
+ }
+
+ @Test
+ public void testCodingDirectBuffer() {
+ testCoding(true);
+ }
+
+}