You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/07/20 19:16:49 UTC
hadoop git commit: HADOOP-12060. Fix ByteBuffer usage for raw erasure
coders. Contributed by Kai Zheng.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7285 06394e376 -> 29495cb8f
HADOOP-12060. Fix ByteBuffer usage for raw erasure coders. Contributed by Kai Zheng.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/29495cb8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/29495cb8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/29495cb8
Branch: refs/heads/HDFS-7285
Commit: 29495cb8f6b940caa9964c39a290ef233ce1ec7c
Parents: 06394e3
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Jul 20 10:15:14 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Jul 20 10:15:14 2015 -0700
----------------------------------------------------------------------
.../hadoop-common/CHANGES-HDFS-EC-7285.txt | 5 +-
.../apache/hadoop/io/erasurecode/CodecUtil.java | 38 ++++----
.../apache/hadoop/io/erasurecode/ECBlock.java | 14 +--
.../hadoop/io/erasurecode/ECBlockGroup.java | 6 +-
.../apache/hadoop/io/erasurecode/ECChunk.java | 6 +-
.../apache/hadoop/io/erasurecode/ECSchema.java | 18 ++--
.../hadoop/io/erasurecode/SchemaLoader.java | 3 +-
.../rawcoder/AbstractRawErasureCoder.java | 42 +++++----
.../rawcoder/AbstractRawErasureDecoder.java | 50 +++++------
.../rawcoder/AbstractRawErasureEncoder.java | 27 +++---
.../rawcoder/RawErasureCoderFactory.java | 8 +-
.../erasurecode/rawcoder/RawErasureDecoder.java | 25 ++++--
.../erasurecode/rawcoder/RawErasureEncoder.java | 24 ++++--
.../hadoop/io/erasurecode/BufferAllocator.java | 91 ++++++++++++++++++++
.../hadoop/io/erasurecode/TestCoderBase.java | 17 +++-
.../erasurecode/coder/TestErasureCoderBase.java | 10 +--
.../erasurecode/rawcoder/TestRawCoderBase.java | 13 +--
17 files changed, 268 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
index 9ccd3a7..1f3006e 100644
--- a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
@@ -68,4 +68,7 @@
HADOOP-12065. Using more meaningful keys in EC schema. (Kai Zheng)
- HDFS-8557. Allow to configure RS and XOR raw coders (Kai Zheng)
\ No newline at end of file
+ HDFS-8557. Allow to configure RS and XOR raw coders (Kai Zheng)
+
+ HADOOP-12060. Fix ByteBuffer usage for raw erasure coders. (Kai Zheng via
+ jing9)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java
index 5d22624..027d58b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java
@@ -22,17 +22,17 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.erasurecode.rawcoder.*;
/**
- * A codec utility.
+ * A codec & coder utility to help create raw coders conveniently.
*/
public final class CodecUtil {
- private CodecUtil() {}
+ private CodecUtil() { }
/**
* Create RS raw encoder according to configuration.
- * @param conf
- * @param numDataUnits
- * @param numParityUnits
+ * @param conf configuration possibly with some items to configure the coder
+ * @param numDataUnits number of data units in a coding group
+ * @param numParityUnits number of parity units in a coding group
* @return raw encoder
*/
public static RawErasureEncoder createRSRawEncoder(
@@ -49,9 +49,9 @@ public final class CodecUtil {
/**
* Create RS raw decoder according to configuration.
- * @param conf
- * @param numDataUnits
- * @param numParityUnits
+ * @param conf configuration possibly with some items to configure the coder
+ * @param numDataUnits number of data units in a coding group
+ * @param numParityUnits number of parity units in a coding group
* @return raw decoder
*/
public static RawErasureDecoder createRSRawDecoder(
@@ -68,9 +68,9 @@ public final class CodecUtil {
/**
* Create XOR raw encoder according to configuration.
- * @param conf
- * @param numDataUnits
- * @param numParityUnits
+ * @param conf configuration possibly with some items to configure the coder
+ * @param numDataUnits number of data units in a coding group
+ * @param numParityUnits number of parity units in a coding group
* @return raw encoder
*/
public static RawErasureEncoder createXORRawEncoder(
@@ -87,9 +87,9 @@ public final class CodecUtil {
/**
* Create XOR raw decoder according to configuration.
- * @param conf
- * @param numDataUnits
- * @param numParityUnits
+ * @param conf configuration possibly with some items to configure the coder
+ * @param numDataUnits number of data units in a coding group
+ * @param numParityUnits number of parity units in a coding group
* @return raw decoder
*/
public static RawErasureDecoder createXORRawDecoder(
@@ -106,11 +106,11 @@ public final class CodecUtil {
/**
* Create raw coder using specified conf and raw coder factory key.
- * @param conf
- * @param rawCoderFactoryKey
- * @param isEncoder
- * @param numDataUnits
- * @param numParityUnits
+ * @param conf configuration possibly with some items to configure the coder
+ * @param rawCoderFactoryKey configuration key to find the raw coder factory
+ * @param isEncoder is encoder or not we're going to create
+ * @param numDataUnits number of data units in a coding group
+ * @param numParityUnits number of parity units in a coding group
* @return raw coder
*/
public static RawErasureCoder createRawCoder(Configuration conf,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlock.java
index 956954a..5c0a160 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlock.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlock.java
@@ -37,8 +37,8 @@ public class ECBlock {
/**
* A constructor specifying isParity and isErased.
- * @param isParity
- * @param isErased
+ * @param isParity is a parity block
+ * @param isErased is erased or not
*/
public ECBlock(boolean isParity, boolean isErased) {
this.isParity = isParity;
@@ -47,7 +47,7 @@ public class ECBlock {
/**
* Set true if it's for a parity block.
- * @param isParity
+ * @param isParity is parity or not
*/
public void setParity(boolean isParity) {
this.isParity = isParity;
@@ -55,10 +55,10 @@ public class ECBlock {
/**
* Set true if the block is missing.
- * @param isMissing
+ * @param isErased is erased or not
*/
- public void setErased(boolean isMissing) {
- this.isErased = isMissing;
+ public void setErased(boolean isErased) {
+ this.isErased = isErased;
}
/**
@@ -71,7 +71,7 @@ public class ECBlock {
/**
*
- * @return true if it's missing or corrupt due to erasure, otherwise false
+ * @return true if it's erased due to erasure, otherwise false
*/
public boolean isErased() {
return isErased;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlockGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlockGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlockGroup.java
index 0a86907..91e4fb8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlockGroup.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlockGroup.java
@@ -27,8 +27,8 @@ public class ECBlockGroup {
/**
* A constructor specifying data blocks and parity blocks.
- * @param dataBlocks
- * @param parityBlocks
+ * @param dataBlocks data blocks in the group
+ * @param parityBlocks parity blocks in the group
*/
public ECBlockGroup(ECBlock[] dataBlocks, ECBlock[] parityBlocks) {
this.dataBlocks = dataBlocks;
@@ -81,7 +81,7 @@ public class ECBlockGroup {
/**
* Get erased blocks count
- * @return
+ * @return erased count of blocks
*/
public int getErasedCount() {
int erasedCount = 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java
index 310c738..d0120d8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java
@@ -28,7 +28,7 @@ public class ECChunk {
/**
* Wrapping a ByteBuffer
- * @param buffer
+ * @param buffer buffer to be wrapped by the chunk
*/
public ECChunk(ByteBuffer buffer) {
this.chunkBuffer = buffer;
@@ -36,7 +36,7 @@ public class ECChunk {
/**
* Wrapping a bytes array
- * @param buffer
+ * @param buffer buffer to be wrapped by the chunk
*/
public ECChunk(byte[] buffer) {
this.chunkBuffer = ByteBuffer.wrap(buffer);
@@ -52,7 +52,7 @@ public class ECChunk {
/**
* Convert an array of this chunks to an array of ByteBuffers
- * @param chunks
+ * @param chunks chunks to convert into buffers
* @return an array of ByteBuffers
*/
public static ByteBuffer[] toBuffers(ECChunk[] chunks) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
index 1e07d3d..fb02476 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
@@ -94,10 +94,10 @@ public final class ECSchema {
/**
* Constructor with key parameters provided.
- * @param schemaName
- * @param codecName
- * @param numDataUnits
- * @param numParityUnits
+ * @param schemaName schema name
+ * @param codecName codec name
+ * @param numDataUnits number of data units used in the schema
+ * @param numParityUnits number os parity units used in the schema
*/
public ECSchema(String schemaName, String codecName,
int numDataUnits, int numParityUnits) {
@@ -107,11 +107,11 @@ public final class ECSchema {
/**
* Constructor with key parameters provided. Note the extraOptions may contain
* additional information for the erasure codec to interpret further.
- * @param schemaName
- * @param codecName
- * @param numDataUnits
- * @param numParityUnits
- * @param extraOptions
+ * @param schemaName schema name
+ * @param codecName codec name
+ * @param numDataUnits number of data units used in the schema
+ * @param numParityUnits number os parity units used in the schema
+ * @param extraOptions extra options to configure the codec
*/
public ECSchema(String schemaName, String codecName, int numDataUnits,
int numParityUnits, Map<String, String> extraOptions) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java
index 9b10c78..fce46f8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java
@@ -43,7 +43,8 @@ import org.xml.sax.SAXException;
* A EC schema loading utility that loads predefined EC schemas from XML file
*/
public class SchemaLoader {
- private static final Logger LOG = LoggerFactory.getLogger(SchemaLoader.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SchemaLoader.class.getName());
/**
* Load predefined ec schemas from configuration file. This file is
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java
index e6a1542..4b7461e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java
@@ -91,37 +91,45 @@ public abstract class AbstractRawErasureCoder
}
/**
- * Check and ensure the buffers are of the length specified by dataLen.
- * @param buffers
- * @param allowNull
- * @param dataLen
+ * Check and ensure the buffers are of the length specified by dataLen, also
+ * ensure the buffers are direct buffers or not according to isDirectBuffer.
+ * @param buffers the buffers to check
+ * @param allowNull whether to allow any element to be null or not
+ * @param dataLen the length of data available in the buffer to ensure with
+ * @param isDirectBuffer is direct buffer or not to ensure with
*/
- protected void ensureLength(ByteBuffer[] buffers,
- boolean allowNull, int dataLen) {
- for (int i = 0; i < buffers.length; ++i) {
- if (buffers[i] == null && !allowNull) {
+ protected void ensureLengthAndType(ByteBuffer[] buffers, boolean allowNull,
+ int dataLen, boolean isDirectBuffer) {
+ for (ByteBuffer buffer : buffers) {
+ if (buffer == null && !allowNull) {
throw new HadoopIllegalArgumentException(
"Invalid buffer found, not allowing null");
- } else if (buffers[i] != null && buffers[i].remaining() != dataLen) {
- throw new HadoopIllegalArgumentException(
- "Invalid buffer, not of length " + dataLen);
+ } else if (buffer != null) {
+ if (buffer.remaining() != dataLen) {
+ throw new HadoopIllegalArgumentException(
+ "Invalid buffer, not of length " + dataLen);
+ }
+ if (buffer.isDirect() != isDirectBuffer) {
+ throw new HadoopIllegalArgumentException(
+ "Invalid buffer, isDirect should be " + isDirectBuffer);
+ }
}
}
}
/**
* Check and ensure the buffers are of the length specified by dataLen.
- * @param buffers
- * @param allowNull
- * @param dataLen
+ * @param buffers the buffers to check
+ * @param allowNull whether to allow any element to be null or not
+ * @param dataLen the length of data available in the buffer to ensure with
*/
protected void ensureLength(byte[][] buffers,
boolean allowNull, int dataLen) {
- for (int i = 0; i < buffers.length; ++i) {
- if (buffers[i] == null && !allowNull) {
+ for (byte[] buffer : buffers) {
+ if (buffer == null && !allowNull) {
throw new HadoopIllegalArgumentException(
"Invalid buffer found, not allowing null");
- } else if (buffers[i] != null && buffers[i].length != dataLen) {
+ } else if (buffer != null && buffer.length != dataLen) {
throw new HadoopIllegalArgumentException(
"Invalid buffer not of length " + dataLen);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java
index c6105b0..931cda1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java
@@ -41,14 +41,14 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
checkParameters(inputs, erasedIndexes, outputs);
ByteBuffer validInput = findFirstValidInput(inputs);
+ boolean usingDirectBuffer = validInput.isDirect();
int dataLen = validInput.remaining();
if (dataLen == 0) {
return;
}
- ensureLength(inputs, true, dataLen);
- ensureLength(outputs, false, dataLen);
+ ensureLengthAndType(inputs, true, dataLen, usingDirectBuffer);
+ ensureLengthAndType(outputs, false, dataLen, usingDirectBuffer);
- boolean usingDirectBuffer = validInput.isDirect();
if (usingDirectBuffer) {
doDecode(inputs, erasedIndexes, outputs);
return;
@@ -63,14 +63,14 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
if (buffer != null) {
- inputOffsets[i] = buffer.position();
+ inputOffsets[i] = buffer.arrayOffset() + buffer.position();
newInputs[i] = buffer.array();
}
}
for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i];
- outputOffsets[i] = buffer.position();
+ outputOffsets[i] = buffer.arrayOffset() + buffer.position();
newOutputs[i] = buffer.array();
}
@@ -81,7 +81,7 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
buffer = inputs[i];
if (buffer != null) {
// dataLen bytes consumed
- buffer.position(inputOffsets[i] + dataLen);
+ buffer.position(buffer.position() + dataLen);
}
}
}
@@ -89,7 +89,7 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
/**
* Perform the real decoding using Direct ByteBuffer.
* @param inputs Direct ByteBuffers expected
- * @param erasedIndexes
+ * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs Direct ByteBuffers expected
*/
protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
@@ -117,12 +117,12 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
/**
* Perform the real decoding using bytes array, supporting offsets and
* lengths.
- * @param inputs
- * @param inputOffsets
- * @param dataLen
- * @param erasedIndexes
- * @param outputs
- * @param outputOffsets
+ * @param inputs the input byte arrays to read data from
+ * @param inputOffsets offsets for the input byte arrays to read data from
+ * @param dataLen how much data are to be read from
+ * @param erasedIndexes indexes of erased units in the inputs array
+ * @param outputs the output byte arrays to write resultant data into
+ * @param outputOffsets offsets from which to write resultant data into
*/
protected abstract void doDecode(byte[][] inputs, int[] inputOffsets,
int dataLen, int[] erasedIndexes,
@@ -139,12 +139,12 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
/**
* Check and validate decoding parameters, throw exception accordingly. The
* checking assumes it's a MDS code. Other code can override this.
- * @param inputs
- * @param erasedIndexes
- * @param outputs
+ * @param inputs input buffers to check
+ * @param erasedIndexes indexes of erased units in the inputs array
+ * @param outputs output buffers to check
*/
- protected void checkParameters(Object[] inputs, int[] erasedIndexes,
- Object[] outputs) {
+ protected <T> void checkParameters(T[] inputs, int[] erasedIndexes,
+ T[] outputs) {
if (inputs.length != getNumParityUnits() + getNumDataUnits()) {
throw new IllegalArgumentException("Invalid inputs length");
}
@@ -160,8 +160,8 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
}
int validInputs = 0;
- for (int i = 0; i < inputs.length; ++i) {
- if (inputs[i] != null) {
+ for (T input : inputs) {
+ if (input != null) {
validInputs += 1;
}
}
@@ -177,7 +177,7 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
* not to read.
* @return indexes into inputs array
*/
- protected int[] getErasedOrNotToReadIndexes(Object[] inputs) {
+ protected <T> int[] getErasedOrNotToReadIndexes(T[] inputs) {
int[] invalidIndexes = new int[inputs.length];
int idx = 0;
for (int i = 0; i < inputs.length; i++) {
@@ -191,13 +191,13 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
/**
* Find the valid input from all the inputs.
- * @param inputs
+ * @param inputs input buffers to look for valid input
* @return the first valid input
*/
protected static <T> T findFirstValidInput(T[] inputs) {
- for (int i = 0; i < inputs.length; i++) {
- if (inputs[i] != null) {
- return inputs[i];
+ for (T input : inputs) {
+ if (input != null) {
+ return input;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java
index d1faa8c..a0b3cfe 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java
@@ -37,14 +37,15 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
@Override
public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
checkParameters(inputs, outputs);
+
+ boolean usingDirectBuffer = inputs[0].isDirect();
int dataLen = inputs[0].remaining();
if (dataLen == 0) {
return;
}
- ensureLength(inputs, false, dataLen);
- ensureLength(outputs, false, dataLen);
+ ensureLengthAndType(inputs, false, dataLen, usingDirectBuffer);
+ ensureLengthAndType(outputs, false, dataLen, usingDirectBuffer);
- boolean usingDirectBuffer = inputs[0].isDirect();
if (usingDirectBuffer) {
doEncode(inputs, outputs);
return;
@@ -58,13 +59,13 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
ByteBuffer buffer;
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
- inputOffsets[i] = buffer.position();
+ inputOffsets[i] = buffer.arrayOffset() + buffer.position();
newInputs[i] = buffer.array();
}
for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i];
- outputOffsets[i] = buffer.position();
+ outputOffsets[i] = buffer.arrayOffset() + buffer.position();
newOutputs[i] = buffer.array();
}
@@ -102,11 +103,11 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
/**
* Perform the real encoding work using bytes array, supporting offsets
* and lengths.
- * @param inputs
- * @param inputOffsets
- * @param dataLen
- * @param outputs
- * @param outputOffsets
+ * @param inputs the input byte arrays to read data from
+ * @param inputOffsets offsets for the input byte arrays to read data from
+ * @param dataLen how much data are to be read from
+ * @param outputs the output byte arrays to write resultant data into
+ * @param outputOffsets offsets from which to write resultant data into
*/
protected abstract void doEncode(byte[][] inputs, int[] inputOffsets,
int dataLen, byte[][] outputs,
@@ -121,10 +122,10 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
/**
* Check and validate decoding parameters, throw exception accordingly.
- * @param inputs
- * @param outputs
+ * @param inputs input buffers to check
+ * @param outputs output buffers to check
*/
- protected void checkParameters(Object[] inputs, Object[] outputs) {
+ protected <T> void checkParameters(T[] inputs, T[] outputs) {
if (inputs.length != getNumDataUnits()) {
throw new HadoopIllegalArgumentException("Invalid inputs length");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java
index 26eddfc..280daf3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java
@@ -26,16 +26,16 @@ public interface RawErasureCoderFactory {
/**
* Create raw erasure encoder.
- * @param numDataUnits
- * @param numParityUnits
+ * @param numDataUnits number of data units in a coding group
+ * @param numParityUnits number of parity units in a coding group
* @return raw erasure encoder
*/
public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits);
/**
* Create raw erasure decoder.
- * @param numDataUnits
- * @param numParityUnits
+ * @param numDataUnits number of data units in a coding group
+ * @param numParityUnits number of parity units in a coding group
* @return raw erasure decoder
*/
public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java
index ad7f32d..e2d01d9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java
@@ -33,7 +33,8 @@ public interface RawErasureDecoder extends RawErasureCoder {
/**
* Decode with inputs and erasedIndexes, generates outputs.
* How to prepare for inputs:
- * 1. Create an array containing parity units + data units;
+ * 1. Create an array containing parity units + data units. Please note the
+ * parity units should be first or before the data units.
* 2. Set null in the array locations specified via erasedIndexes to indicate
* they're erased and no data are to read from;
* 3. Set null in the array locations for extra redundant items, as they're
@@ -48,29 +49,39 @@ public interface RawErasureDecoder extends RawErasureCoder {
* erasedIndexes = [5] // index of d2 into inputs array
* outputs = [a-writable-buffer]
*
- * @param inputs inputs to read data from
+ * Note, for both inputs and outputs, no mixing of on-heap buffers and direct
+ * buffers are allowed.
+ *
+ * @param inputs inputs to read data from, contents may change after the call
* @param erasedIndexes indexes of erased units in the inputs array
* @param outputs outputs to write into for data generated according to
- * erasedIndexes
+ * erasedIndexes, ready for reading the result data from after
+ * the call
*/
public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs);
/**
* Decode with inputs and erasedIndexes, generates outputs. More see above.
- * @param inputs inputs to read data from
+ * @param inputs inputs to read data from, contents may change after the call
* @param erasedIndexes indexes of erased units in the inputs array
* @param outputs outputs to write into for data generated according to
- * erasedIndexes
+ * erasedIndexes, ready for reading the result data from after
+ * the call
*/
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs);
/**
* Decode with inputs and erasedIndexes, generates outputs. More see above.
- * @param inputs inputs to read data from
+ *
+ * Note, for both input and output ECChunks, no mixing of on-heap buffers and
+ * direct buffers are allowed.
+ *
+ * @param inputs inputs to read data from, contents may change after the call
* @param erasedIndexes indexes of erased units in the inputs array
* @param outputs outputs to write into for data generated according to
- * erasedIndexes
+ * erasedIndexes, ready for reading the result data from after
+ * the call
*/
public void decode(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java
index 974f86c..7571f09 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java
@@ -31,23 +31,33 @@ import java.nio.ByteBuffer;
public interface RawErasureEncoder extends RawErasureCoder {
/**
- * Encode with inputs and generates outputs
- * @param inputs
+ * Encode with inputs and generates outputs.
+ *
+ * Note, for both inputs and outputs, no mixing of on-heap buffers and direct
+ * buffers are allowed.
+ *
+ * @param inputs inputs to read data from, contents may change after the call
* @param outputs
*/
public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs);
/**
* Encode with inputs and generates outputs
- * @param inputs
- * @param outputs
+ * @param inputs inputs to read data from, contents may change after the call
+ * @param outputs outputs to write into for data generated, ready for reading
+ * the result data from after the call
*/
public void encode(byte[][] inputs, byte[][] outputs);
/**
- * Encode with inputs and generates outputs
- * @param inputs
- * @param outputs
+ * Encode with inputs and generates outputs.
+ *
+ * Note, for both input and output ECChunks, no mixing of on-heap buffers and
+ * direct buffers are allowed.
+ *
+ * @param inputs inputs to read data from, contents may change after the call
+ * @param outputs outputs to write into for data generated, ready for reading
+ * the result data from after the call
*/
public void encode(ECChunk[] inputs, ECChunk[] outputs);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/BufferAllocator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/BufferAllocator.java
new file mode 100644
index 0000000..8f552b7
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/BufferAllocator.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode;
+
+
+import java.nio.ByteBuffer;
+
+/**
+ * An abstract buffer allocator used for test.
+ */
+public abstract class BufferAllocator {
+ private boolean usingDirect = false;
+
+ public BufferAllocator(boolean usingDirect) {
+ this.usingDirect = usingDirect;
+ }
+
+ protected boolean isUsingDirect() {
+ return usingDirect;
+ }
+
+ /**
+ * Allocate and return a ByteBuffer of specified length.
+ * @param bufferLen
+ * @return
+ */
+ public abstract ByteBuffer allocate(int bufferLen);
+
+ /**
+ * A simple buffer allocator that just uses ByteBuffer's
+ * allocate/allocateDirect API.
+ */
+ public static class SimpleBufferAllocator extends BufferAllocator {
+
+ public SimpleBufferAllocator(boolean usingDirect) {
+ super(usingDirect);
+ }
+
+ @Override
+ public ByteBuffer allocate(int bufferLen) {
+ return isUsingDirect() ? ByteBuffer.allocateDirect(bufferLen) :
+ ByteBuffer.allocate(bufferLen);
+ }
+ }
+
+ /**
+ * A buffer allocator that allocates a buffer from an existing large buffer by
+ * slice calling, but if no available space just degrades as
+ * SimpleBufferAllocator. So please ensure enough space for it.
+ */
+ public static class SlicedBufferAllocator extends BufferAllocator {
+ private ByteBuffer overallBuffer;
+
+ public SlicedBufferAllocator(boolean usingDirect, int totalBufferLen) {
+ super(usingDirect);
+ overallBuffer = isUsingDirect() ?
+ ByteBuffer.allocateDirect(totalBufferLen) :
+ ByteBuffer.allocate(totalBufferLen);
+ }
+
+ @Override
+ public ByteBuffer allocate(int bufferLen) {
+ if (bufferLen > overallBuffer.capacity() - overallBuffer.position()) {
+ // If no available space for the requested length, then allocate new
+ return isUsingDirect() ? ByteBuffer.allocateDirect(bufferLen) :
+ ByteBuffer.allocate(bufferLen);
+ }
+
+ overallBuffer.limit(overallBuffer.position() + bufferLen);
+ ByteBuffer result = overallBuffer.slice();
+ overallBuffer.position(overallBuffer.position() + bufferLen);
+ return result;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
index 10edae8..8f277f4 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.io.erasurecode;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.erasurecode.BufferAllocator.SimpleBufferAllocator;
+import org.apache.hadoop.io.erasurecode.BufferAllocator.SlicedBufferAllocator;
import org.apache.hadoop.io.erasurecode.rawcoder.util.DumpUtil;
import java.nio.ByteBuffer;
@@ -40,6 +42,7 @@ public abstract class TestCoderBase {
protected int numParityUnits;
protected int baseChunkSize = 513;
private int chunkSize = baseChunkSize;
+ private BufferAllocator allocator;
private byte[] zeroChunkBytes;
@@ -70,6 +73,17 @@ public abstract class TestCoderBase {
this.zeroChunkBytes = new byte[chunkSize]; // With ZERO by default
}
+ protected void prepareBufferAllocator(boolean usingSlicedBuffer) {
+ if (usingSlicedBuffer) {
+ int roughEstimationSpace =
+ chunkSize * (numDataUnits + numParityUnits) * 10;
+ allocator = new SlicedBufferAllocator(usingDirectBuffer,
+ roughEstimationSpace);
+ } else {
+ allocator = new SimpleBufferAllocator(usingDirectBuffer);
+ }
+ }
+
/**
* Set true during setup if want to dump test settings and coding data,
* useful in debugging.
@@ -299,8 +313,7 @@ public abstract class TestCoderBase {
*/
int startOffset = startBufferWithZero ? 0 : 11; // 11 is arbitrary
int allocLen = startOffset + bufferLen + startOffset;
- ByteBuffer buffer = usingDirectBuffer ?
- ByteBuffer.allocateDirect(allocLen) : ByteBuffer.allocate(allocLen);
+ ByteBuffer buffer = allocator.allocate(allocLen);
buffer.limit(startOffset + bufferLen);
fillDummyData(buffer, startOffset);
startBufferWithZero = ! startBufferWithZero;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java
index 98fa956..738d28e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java
@@ -65,14 +65,14 @@ public abstract class TestErasureCoderBase extends TestCoderBase {
* The following runs will use 3 different chunkSize for inputs and outputs,
* to verify the same encoder/decoder can process variable width of data.
*/
- performTestCoding(baseChunkSize);
- performTestCoding(baseChunkSize - 17);
- performTestCoding(baseChunkSize + 16);
+ performTestCoding(baseChunkSize, true);
+ performTestCoding(baseChunkSize - 17, false);
+ performTestCoding(baseChunkSize + 16, true);
}
- private void performTestCoding(int chunkSize) {
+ private void performTestCoding(int chunkSize, boolean usingSlicedBuffer) {
setChunkSize(chunkSize);
-
+ prepareBufferAllocator(usingSlicedBuffer);
// Generate data and encode
ECBlockGroup blockGroup = prepareBlockGroupForEncoding();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
index 587ce96..2b7a3c4 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java
@@ -68,9 +68,9 @@ public abstract class TestRawCoderBase extends TestCoderBase {
* The following runs will use 3 different chunkSize for inputs and outputs,
* to verify the same encoder/decoder can process variable width of data.
*/
- performTestCoding(baseChunkSize, false, false);
- performTestCoding(baseChunkSize - 17, false, false);
- performTestCoding(baseChunkSize + 16, false, false);
+ performTestCoding(baseChunkSize, true, false, false);
+ performTestCoding(baseChunkSize - 17, false, false, false);
+ performTestCoding(baseChunkSize + 16, true, false, false);
}
/**
@@ -82,7 +82,7 @@ public abstract class TestRawCoderBase extends TestCoderBase {
prepareCoders();
try {
- performTestCoding(baseChunkSize, true, false);
+ performTestCoding(baseChunkSize, false, true, false);
Assert.fail("Encoding test with bad input should fail");
} catch (Exception e) {
// Expected
@@ -98,7 +98,7 @@ public abstract class TestRawCoderBase extends TestCoderBase {
prepareCoders();
try {
- performTestCoding(baseChunkSize, false, true);
+ performTestCoding(baseChunkSize, false, false, true);
Assert.fail("Decoding test with bad output should fail");
} catch (Exception e) {
// Expected
@@ -122,9 +122,10 @@ public abstract class TestRawCoderBase extends TestCoderBase {
}
}
- private void performTestCoding(int chunkSize,
+ private void performTestCoding(int chunkSize, boolean usingSlicedBuffer,
boolean useBadInput, boolean useBadOutput) {
setChunkSize(chunkSize);
+ prepareBufferAllocator(usingSlicedBuffer);
dumpSetting();