You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2016/01/06 01:32:35 UTC

hadoop git commit: HADOOP-12685. Input buffer position after encode/decode not consistent between different kinds of buffers. Contributed by Rui Li.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 355c0ce72 -> c52b407cb


HADOOP-12685. Input buffer position after encode/decode not consistent between different kinds of buffers. Contributed by Rui Li.

Change-Id: I713c7b4e3cfae70c04b7e4b292ab53eae348d8d9


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c52b407c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c52b407c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c52b407c

Branch: refs/heads/trunk
Commit: c52b407cbffc8693738b31c6cc4e71751efd70e8
Parents: 355c0ce
Author: Zhe Zhang <zh...@apache.org>
Authored: Tue Jan 5 16:31:52 2016 -0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Tue Jan 5 16:32:18 2016 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 ++
 .../rawcoder/AbstractRawErasureDecoder.java     | 55 ++++++++++---------
 .../rawcoder/AbstractRawErasureEncoder.java     | 56 +++++++++++---------
 .../erasurecode/rawcoder/RawErasureDecoder.java |  5 +-
 .../erasurecode/rawcoder/RawErasureEncoder.java |  5 +-
 .../io/erasurecode/rawcoder/TestRSRawCoder.java |  7 +++
 .../erasurecode/rawcoder/TestRawCoderBase.java  | 34 ++++++++++++
 7 files changed, 112 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 0b88ed0..863d047 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -638,6 +638,9 @@ Trunk (Unreleased)
       HADOOP-12544. Erasure Coding: create dummy raw coder to isolate performance
       issues in testing. (Rui Li via zhz)
 
+      HADOOP-12685. Input buffer position after encode/decode not consistent
+      between different kinds of buffers. (Rui Li via zhz)
+
 Release 2.9.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java
index 2cfb57c..37a9bcd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java
@@ -51,39 +51,44 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
     checkParameterBuffers(inputs, true, dataLen, usingDirectBuffer, false);
     checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true);
 
+    int[] inputPositions = new int[inputs.length];
+    for (int i = 0; i < inputPositions.length; i++) {
+      if (inputs[i] != null) {
+        inputPositions[i] = inputs[i].position();
+      }
+    }
+
     if (usingDirectBuffer) {
       doDecode(inputs, erasedIndexes, outputs);
-      return;
-    }
+    } else {
+      int[] inputOffsets = new int[inputs.length];
+      int[] outputOffsets = new int[outputs.length];
+      byte[][] newInputs = new byte[inputs.length][];
+      byte[][] newOutputs = new byte[outputs.length][];
+
+      ByteBuffer buffer;
+      for (int i = 0; i < inputs.length; ++i) {
+        buffer = inputs[i];
+        if (buffer != null) {
+          inputOffsets[i] = buffer.arrayOffset() + buffer.position();
+          newInputs[i] = buffer.array();
+        }
+      }
 
-    int[] inputOffsets = new int[inputs.length];
-    int[] outputOffsets = new int[outputs.length];
-    byte[][] newInputs = new byte[inputs.length][];
-    byte[][] newOutputs = new byte[outputs.length][];
-
-    ByteBuffer buffer;
-    for (int i = 0; i < inputs.length; ++i) {
-      buffer = inputs[i];
-      if (buffer != null) {
-        inputOffsets[i] = buffer.arrayOffset() + buffer.position();
-        newInputs[i] = buffer.array();
+      for (int i = 0; i < outputs.length; ++i) {
+        buffer = outputs[i];
+        outputOffsets[i] = buffer.arrayOffset() + buffer.position();
+        newOutputs[i] = buffer.array();
       }
-    }
 
-    for (int i = 0; i < outputs.length; ++i) {
-      buffer = outputs[i];
-      outputOffsets[i] = buffer.arrayOffset() + buffer.position();
-      newOutputs[i] = buffer.array();
+      doDecode(newInputs, inputOffsets, dataLen,
+          erasedIndexes, newOutputs, outputOffsets);
     }
 
-    doDecode(newInputs, inputOffsets, dataLen,
-        erasedIndexes, newOutputs, outputOffsets);
-
-    for (int i = 0; i < inputs.length; ++i) {
-      buffer = inputs[i];
-      if (buffer != null) {
+    for (int i = 0; i < inputs.length; i++) {
+      if (inputs[i] != null) {
         // dataLen bytes consumed
-        buffer.position(buffer.position() + dataLen);
+        inputs[i].position(inputPositions[i] + dataLen);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java
index 13c895c..49cc2c4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java
@@ -48,34 +48,42 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
     checkParameterBuffers(inputs, false, dataLen, usingDirectBuffer, false);
     checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true);
 
-    if (usingDirectBuffer) {
-      doEncode(inputs, outputs);
-      return;
+    int[] inputPositions = new int[inputs.length];
+    for (int i = 0; i < inputPositions.length; i++) {
+      if (inputs[i] != null) {
+        inputPositions[i] = inputs[i].position();
+      }
     }
 
-    int[] inputOffsets = new int[inputs.length];
-    int[] outputOffsets = new int[outputs.length];
-    byte[][] newInputs = new byte[inputs.length][];
-    byte[][] newOutputs = new byte[outputs.length][];
-
-    ByteBuffer buffer;
-    for (int i = 0; i < inputs.length; ++i) {
-      buffer = inputs[i];
-      inputOffsets[i] = buffer.arrayOffset() + buffer.position();
-      newInputs[i] = buffer.array();
-    }
-
-    for (int i = 0; i < outputs.length; ++i) {
-      buffer = outputs[i];
-      outputOffsets[i] = buffer.arrayOffset() + buffer.position();
-      newOutputs[i] = buffer.array();
+    if (usingDirectBuffer) {
+      doEncode(inputs, outputs);
+    } else {
+      int[] inputOffsets = new int[inputs.length];
+      int[] outputOffsets = new int[outputs.length];
+      byte[][] newInputs = new byte[inputs.length][];
+      byte[][] newOutputs = new byte[outputs.length][];
+
+      ByteBuffer buffer;
+      for (int i = 0; i < inputs.length; ++i) {
+        buffer = inputs[i];
+        inputOffsets[i] = buffer.arrayOffset() + buffer.position();
+        newInputs[i] = buffer.array();
+      }
+
+      for (int i = 0; i < outputs.length; ++i) {
+        buffer = outputs[i];
+        outputOffsets[i] = buffer.arrayOffset() + buffer.position();
+        newOutputs[i] = buffer.array();
+      }
+
+      doEncode(newInputs, inputOffsets, dataLen, newOutputs, outputOffsets);
     }
 
-    doEncode(newInputs, inputOffsets, dataLen, newOutputs, outputOffsets);
-
-    for (int i = 0; i < inputs.length; ++i) {
-      buffer = inputs[i];
-      buffer.position(buffer.position() + dataLen); // dataLen bytes consumed
+    for (int i = 0; i < inputs.length; i++) {
+      if (inputs[i] != null) {
+        // dataLen bytes consumed
+        inputs[i].position(inputPositions[i] + dataLen);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java
index ab322fa..1707650 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java
@@ -56,9 +56,10 @@ public interface RawErasureDecoder extends RawErasureCoder {
    *
    * If the coder option ALLOW_CHANGE_INPUTS is set true (false by default), the
    * content of input buffers may change after the call, subject to concrete
-   * implementation. Anyway the positions of input buffers will move forward.
+   * implementation.
    *
-   * @param inputs input buffers to read data from
+   * @param inputs input buffers to read data from. The buffers' remaining will
+   *               be 0 after decoding
    * @param erasedIndexes indexes of erased units in the inputs array
    * @param outputs output buffers to put decoded data into according to
    *                erasedIndexes, ready for read after the call

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java
index 91ef714..6303d82 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java
@@ -42,8 +42,9 @@ public interface RawErasureEncoder extends RawErasureCoder {
    * content of input buffers may change after the call, subject to concrete
    * implementation. Anyway the positions of input buffers will move forward.
    *
-   * @param inputs input buffers to read data from
-   * @param outputs output buffers to put the encoded data into, read to read
+   * @param inputs input buffers to read data from. The buffers' remaining will
+   *               be 0 after encoding
+   * @param outputs output buffers to put the encoded data into, ready to read
    *                after the call
    */
   void encode(ByteBuffer[] inputs, ByteBuffer[] outputs);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java
index a35a4dd..3e37e17 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java
@@ -115,4 +115,11 @@ public class TestRSRawCoder extends TestRSRawCoderBase {
     prepare(null, 10, 4, new int[] {0}, new int[] {0});
     testCodingDoMixAndTwice();
   }
+
+  @Test
+  public void testCodingInputBufferPosition() {
+    prepare(null, 6, 3, new int[]{0}, new int[]{0});
+    testInputPosition(false);
+    testInputPosition(true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c52b407c/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
index 9b6a196..cf77539 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
@@ -254,4 +254,38 @@ public abstract class TestRawCoderBase extends TestCoderBase {
     decoder.setConf(getConf());
     return decoder;
   }
+
+  /**
+   * Tests that the input buffer's position is moved to the end after
+   * encode/decode.
+   */
+  protected void testInputPosition(boolean usingDirectBuffer) {
+    this.usingDirectBuffer = usingDirectBuffer;
+    prepareCoders();
+    prepareBufferAllocator(false);
+
+    // verify encode
+    ECChunk[] dataChunks = prepareDataChunksForEncoding();
+    ECChunk[] parityChunks = prepareParityChunksForEncoding();
+    ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
+    encoder.encode(dataChunks, parityChunks);
+    verifyBufferPositionAtEnd(dataChunks);
+
+    // verify decode
+    backupAndEraseChunks(clonedDataChunks, parityChunks);
+    ECChunk[] inputChunks = prepareInputChunksForDecoding(
+        clonedDataChunks, parityChunks);
+    ensureOnlyLeastRequiredChunks(inputChunks);
+    ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
+    decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks);
+    verifyBufferPositionAtEnd(inputChunks);
+  }
+
+  private void verifyBufferPositionAtEnd(ECChunk[] inputChunks) {
+    for (ECChunk chunk : inputChunks) {
+      if (chunk != null) {
+        Assert.assertEquals(0, chunk.getBuffer().remaining());
+      }
+    }
+  }
 }