You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/03/23 21:15:01 UTC

[28/50] [abbrv] hadoop git commit: HADOOP-11541. Raw XOR coder

HADOOP-11541. Raw XOR coder


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aab05c40
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aab05c40
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aab05c40

Branch: refs/heads/HDFS-7285
Commit: aab05c40ad6869142df48b6985964207add0e0df
Parents: 0119597
Author: Kai Zheng <dr...@apache.org>
Authored: Sun Feb 8 01:40:27 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Mar 23 11:06:45 2015 -0700

----------------------------------------------------------------------
 .../io/erasurecode/rawcoder/XorRawDecoder.java  |  81 ++++++
 .../io/erasurecode/rawcoder/XorRawEncoder.java  |  61 +++++
 .../hadoop/io/erasurecode/TestCoderBase.java    | 262 +++++++++++++++++++
 .../erasurecode/rawcoder/TestRawCoderBase.java  |  96 +++++++
 .../erasurecode/rawcoder/TestXorRawCoder.java   |  52 ++++
 5 files changed, 552 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/aab05c40/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawDecoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawDecoder.java
new file mode 100644
index 0000000..98307a7
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawDecoder.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A raw decoder in XOR code scheme in pure Java, adapted from HDFS-RAID.
+ */
+public class XorRawDecoder extends AbstractRawErasureDecoder {
+
+  @Override
+  protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
+                          ByteBuffer[] outputs) {
+    assert(erasedIndexes.length == outputs.length);
+    assert(erasedIndexes.length <= 1);
+
+    int bufSize = inputs[0].remaining();
+    int erasedIdx = erasedIndexes[0];
+
+    // Set the output to zeros.
+    for (int j = 0; j < bufSize; j++) {
+      outputs[0].put(j, (byte) 0);
+    }
+
+    // Process the inputs.
+    for (int i = 0; i < inputs.length; i++) {
+      // Skip the erased location.
+      if (i == erasedIdx) {
+        continue;
+      }
+
+      for (int j = 0; j < bufSize; j++) {
+        outputs[0].put(j, (byte) (outputs[0].get(j) ^ inputs[i].get(j)));
+      }
+    }
+  }
+
+  @Override
+  protected void doDecode(byte[][] inputs, int[] erasedIndexes,
+                          byte[][] outputs) {
+    assert(erasedIndexes.length == outputs.length);
+    assert(erasedIndexes.length <= 1);
+
+    int bufSize = inputs[0].length;
+    int erasedIdx = erasedIndexes[0];
+
+    // Set the output to zeros.
+    for (int j = 0; j < bufSize; j++) {
+      outputs[0][j] = 0;
+    }
+
+    // Process the inputs.
+    for (int i = 0; i < inputs.length; i++) {
+      // Skip the erased location.
+      if (i == erasedIdx) {
+        continue;
+      }
+
+      for (int j = 0; j < bufSize; j++) {
+        outputs[0][j] ^= inputs[i][j];
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aab05c40/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawEncoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawEncoder.java
new file mode 100644
index 0000000..99b20b9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XorRawEncoder.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A raw encoder in XOR code scheme in pure Java, adapted from HDFS-RAID.
+ */
+public class XorRawEncoder extends AbstractRawErasureEncoder {
+
+  @Override
+  protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
+    int bufSize = inputs[0].remaining();
+
+    // Get the first buffer's data.
+    for (int j = 0; j < bufSize; j++) {
+      outputs[0].put(j, inputs[0].get(j));
+    }
+
+    // XOR with everything else.
+    for (int i = 1; i < inputs.length; i++) {
+      for (int j = 0; j < bufSize; j++) {
+        outputs[0].put(j, (byte) (outputs[0].get(j) ^ inputs[i].get(j)));
+      }
+    }
+  }
+
+  @Override
+  protected void doEncode(byte[][] inputs, byte[][] outputs) {
+    int bufSize = inputs[0].length;
+
+    // Get the first buffer's data.
+    for (int j = 0; j < bufSize; j++) {
+      outputs[0][j] = inputs[0][j];
+    }
+
+    // XOR with everything else.
+    for (int i = 1; i < inputs.length; i++) {
+      for (int j = 0; j < bufSize; j++) {
+        outputs[0][j] ^= inputs[i][j];
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aab05c40/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
new file mode 100644
index 0000000..9482b43
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Test base of common utilities for tests not only raw coders but also block
+ * coders.
+ */
+public abstract class TestCoderBase {
+  protected static Random RAND = new Random();
+
+  protected int numDataUnits;
+  protected int numParityUnits;
+  protected int chunkSize = 16 * 1024;
+
+  // Indexes of erased data units. Will also support test of erasing
+  // parity units
+  protected int[] erasedDataIndexes = new int[] {0};
+
+  // Data buffers are either direct or on-heap, for performance the two cases
+  // may go to different coding implementations.
+  protected boolean usingDirectBuffer = true;
+
+  /**
+   * Compare and verify if erased chunks are equal to recovered chunks
+   * @param erasedChunks
+   * @param recoveredChunks
+   */
+  protected void compareAndVerify(ECChunk[] erasedChunks,
+                                  ECChunk[] recoveredChunks) {
+    byte[][] erased = ECChunk.toArray(erasedChunks);
+    byte[][] recovered = ECChunk.toArray(recoveredChunks);
+    for (int i = 0; i < erasedChunks.length; ++i) {
+      assertArrayEquals("Decoding and comparing failed.", erased[i],
+          recovered[i]);
+    }
+  }
+
+  /**
+   * Adjust and return erased indexes based on the array of the input chunks (
+   * parity chunks + data chunks).
+   * @return
+   */
+  protected int[] getErasedIndexesForDecoding() {
+    int[] erasedIndexesForDecoding = new int[erasedDataIndexes.length];
+    for (int i = 0; i < erasedDataIndexes.length; ++i) {
+      erasedIndexesForDecoding[i] = erasedDataIndexes[i] + numParityUnits;
+    }
+    return erasedIndexesForDecoding;
+  }
+
+  /**
+   * Return input chunks for decoding, which is parityChunks + dataChunks.
+   * @param dataChunks
+   * @param parityChunks
+   * @return
+   */
+  protected ECChunk[] prepareInputChunksForDecoding(ECChunk[] dataChunks,
+                                                  ECChunk[] parityChunks) {
+    ECChunk[] inputChunks = new ECChunk[numParityUnits + numDataUnits];
+    
+    int idx = 0;
+    for (int i = 0; i < numParityUnits; i++) {
+      inputChunks[idx ++] = parityChunks[i];
+    }
+    for (int i = 0; i < numDataUnits; i++) {
+      inputChunks[idx ++] = dataChunks[i];
+    }
+    
+    return inputChunks;
+  }
+
+  /**
+   * Have a copy of the data chunks that's to be erased thereafter. The copy
+   * will be used to compare and verify with the to be recovered chunks.
+   * @param dataChunks
+   * @return
+   */
+  protected ECChunk[] copyDataChunksToErase(ECChunk[] dataChunks) {
+    ECChunk[] copiedChunks = new ECChunk[erasedDataIndexes.length];
+
+    int j = 0;
+    for (int i = 0; i < erasedDataIndexes.length; ++i) {
+      copiedChunks[j ++] = cloneChunkWithData(dataChunks[erasedDataIndexes[i]]);
+    }
+
+    return copiedChunks;
+  }
+
+  /**
+   * Erase some data chunks to test the recovering of them
+   * @param dataChunks
+   */
+  protected void eraseSomeDataBlocks(ECChunk[] dataChunks) {
+    for (int i = 0; i < erasedDataIndexes.length; ++i) {
+      eraseDataFromChunk(dataChunks[erasedDataIndexes[i]]);
+    }
+  }
+
+  /**
+   * Erase data from the specified chunks, putting ZERO bytes to the buffers.
+   * @param chunks
+   */
+  protected void eraseDataFromChunks(ECChunk[] chunks) {
+    for (int i = 0; i < chunks.length; ++i) {
+      eraseDataFromChunk(chunks[i]);
+    }
+  }
+
+  /**
+   * Erase data from the specified chunk, putting ZERO bytes to the buffer.
+   * @param chunk
+   */
+  protected void eraseDataFromChunk(ECChunk chunk) {
+    ByteBuffer chunkBuffer = chunk.getBuffer();
+    // erase the data
+    chunkBuffer.position(0);
+    for (int i = 0; i < chunkSize; ++i) {
+      chunkBuffer.put((byte) 0);
+    }
+    chunkBuffer.flip();
+  }
+
+  /**
+   * Clone chunks along with copying the associated data. It respects how the
+   * chunk buffer is allocated, direct or non-direct. It avoids affecting the
+   * original chunk buffers.
+   * @param chunks
+   * @return
+   */
+  protected static ECChunk[] cloneChunksWithData(ECChunk[] chunks) {
+    ECChunk[] results = new ECChunk[chunks.length];
+    for (int i = 0; i < chunks.length; ++i) {
+      results[i] = cloneChunkWithData(chunks[i]);
+    }
+
+    return results;
+  }
+
+  /**
+   * Clone chunk along with copying the associated data. It respects how the
+   * chunk buffer is allocated, direct or non-direct. It avoids affecting the
+   * original chunk.
+   * @param chunk
+   * @return a new chunk
+   */
+  protected static ECChunk cloneChunkWithData(ECChunk chunk) {
+    ByteBuffer srcBuffer = chunk.getBuffer();
+    ByteBuffer destBuffer;
+
+    byte[] bytesArr = new byte[srcBuffer.remaining()];
+    srcBuffer.mark();
+    srcBuffer.get(bytesArr);
+    srcBuffer.reset();
+
+    if (srcBuffer.hasArray()) {
+      destBuffer = ByteBuffer.wrap(bytesArr);
+    } else {
+      destBuffer = ByteBuffer.allocateDirect(srcBuffer.remaining());
+      destBuffer.put(bytesArr);
+      destBuffer.flip();
+    }
+
+    return new ECChunk(destBuffer);
+  }
+
+  /**
+   * Allocate a chunk for output or writing.
+   * @return
+   */
+  protected ECChunk allocateOutputChunk() {
+    ByteBuffer buffer = allocateOutputBuffer();
+
+    return new ECChunk(buffer);
+  }
+
+  /**
+   * Allocate a buffer for output or writing.
+   * @return
+   */
+  protected ByteBuffer allocateOutputBuffer() {
+    ByteBuffer buffer = usingDirectBuffer ?
+        ByteBuffer.allocateDirect(chunkSize) : ByteBuffer.allocate(chunkSize);
+
+    return buffer;
+  }
+
+  /**
+   * Prepare data chunks for each data unit, by generating random data.
+   * @return
+   */
+  protected ECChunk[] prepareDataChunksForEncoding() {
+    ECChunk[] chunks = new ECChunk[numDataUnits];
+    for (int i = 0; i < chunks.length; i++) {
+      chunks[i] = generateDataChunk();
+    }
+
+    return chunks;
+  }
+
+  /**
+   * Generate data chunk by making random data.
+   * @return
+   */
+  protected ECChunk generateDataChunk() {
+    ByteBuffer buffer = allocateOutputBuffer();
+    for (int i = 0; i < chunkSize; i++) {
+      buffer.put((byte) RAND.nextInt(256));
+    }
+    buffer.flip();
+
+    return new ECChunk(buffer);
+  }
+
+  /**
+   * Prepare parity chunks for encoding, each chunk for each parity unit.
+   * @return
+   */
+  protected ECChunk[] prepareParityChunksForEncoding() {
+    ECChunk[] chunks = new ECChunk[numParityUnits];
+    for (int i = 0; i < chunks.length; i++) {
+      chunks[i] = allocateOutputChunk();
+    }
+
+    return chunks;
+  }
+
+  /**
+   * Prepare output chunks for decoding, each output chunk for each erased
+   * chunk.
+   * @return
+   */
+  protected ECChunk[] prepareOutputChunksForDecoding() {
+    ECChunk[] chunks = new ECChunk[erasedDataIndexes.length];
+    for (int i = 0; i < chunks.length; i++) {
+      chunks[i] = allocateOutputChunk();
+    }
+
+    return chunks;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aab05c40/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
new file mode 100644
index 0000000..9119211
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.apache.hadoop.io.erasurecode.ECChunk;
+import org.apache.hadoop.io.erasurecode.TestCoderBase;
+
+/**
+ * Raw coder test base with utilities.
+ */
+public abstract class TestRawCoderBase extends TestCoderBase {
+  protected Class<? extends RawErasureEncoder> encoderClass;
+  protected Class<? extends RawErasureDecoder> decoderClass;
+
+  /**
+   * Generating source data, encoding, recovering and then verifying.
+   * RawErasureCoder mainly uses ECChunk to pass input and output data buffers,
+   * it supports two kinds of ByteBuffers, one is array backed, the other is
+   * direct ByteBuffer. Have usingDirectBuffer to indicate which case to test.
+   * @param usingDirectBuffer
+   */
+  protected void testCoding(boolean usingDirectBuffer) {
+    // Generate data and encode
+    ECChunk[] dataChunks = prepareDataChunksForEncoding();
+    ECChunk[] parityChunks = prepareParityChunksForEncoding();
+    RawErasureEncoder encoder = createEncoder();
+
+    // Backup all the source chunks for later recovering because some coders
+    // may affect the source data.
+    ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
+    // Make a copy of a strip for later comparing
+    ECChunk[] toEraseDataChunks = copyDataChunksToErase(clonedDataChunks);
+
+    encoder.encode(dataChunks, parityChunks);
+    // Erase the copied sources
+    eraseSomeDataBlocks(clonedDataChunks);
+
+    //Decode
+    ECChunk[] inputChunks = prepareInputChunksForDecoding(clonedDataChunks,
+        parityChunks);
+    ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
+    RawErasureDecoder decoder = createDecoder();
+    decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks);
+
+    //Compare
+    compareAndVerify(toEraseDataChunks, recoveredChunks);
+  }
+
+  /**
+   * Create the raw erasure encoder to test
+   * @return
+   */
+  protected RawErasureEncoder createEncoder() {
+    RawErasureEncoder encoder;
+    try {
+      encoder = encoderClass.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create encoder", e);
+    }
+
+    encoder.initialize(numDataUnits, numParityUnits, chunkSize);
+    return encoder;
+  }
+
+  /**
+   * create the raw erasure decoder to test
+   * @return
+   */
+  protected RawErasureDecoder createDecoder() {
+    RawErasureDecoder decoder;
+    try {
+      decoder = decoderClass.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create decoder", e);
+    }
+
+    decoder.initialize(numDataUnits, numParityUnits, chunkSize);
+    return decoder;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aab05c40/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXorRawCoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXorRawCoder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXorRawCoder.java
new file mode 100644
index 0000000..8e59b8a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXorRawCoder.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode.rawcoder;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Random;
+
+/**
+ * Test XOR encoding and decoding.
+ */
+public class TestXorRawCoder extends TestRawCoderBase {
+  private static Random RAND = new Random();
+
+  @Before
+  public void setup() {
+    this.encoderClass = XorRawEncoder.class;
+    this.decoderClass = XorRawDecoder.class;
+
+    this.numDataUnits = 10;
+    this.numParityUnits = 1;
+
+    this.erasedDataIndexes = new int[] {0};
+  }
+
+  @Test
+  public void testCodingNoDirectBuffer() {
+    testCoding(false);
+  }
+
+  @Test
+  public void testCodingDirectBuffer() {
+    testCoding(true);
+  }
+
+}