You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2015/06/10 01:37:47 UTC
hadoop git commit: HDFS-8557 Allow to configure RS and XOR raw
coders. Contributed by Kai Zheng
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7285 c41b02cc0 -> e299fe86b
HDFS-8557 Allow to configure RS and XOR raw coders. Contributed by Kai Zheng
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e299fe86
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e299fe86
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e299fe86
Branch: refs/heads/HDFS-7285
Commit: e299fe86b889968a0093f9f9b097dd71b4f49e88
Parents: c41b02c
Author: Kai Zheng <ka...@intel.com>
Authored: Wed Jun 10 15:35:26 2015 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Wed Jun 10 15:35:26 2015 +0800
----------------------------------------------------------------------
.../hadoop-common/CHANGES-HDFS-EC-7285.txt | 4 +-
.../hadoop/fs/CommonConfigurationKeys.java | 6 +-
.../apache/hadoop/io/erasurecode/CodecUtil.java | 144 +++++++++++++++++++
.../erasurecode/coder/AbstractErasureCoder.java | 67 +--------
.../io/erasurecode/coder/RSErasureDecoder.java | 11 +-
.../io/erasurecode/coder/RSErasureEncoder.java | 9 +-
.../io/erasurecode/coder/XORErasureDecoder.java | 10 +-
.../io/erasurecode/coder/XORErasureEncoder.java | 10 +-
.../hadoop/hdfs/DFSStripedInputStream.java | 23 +--
.../hadoop/hdfs/DFSStripedOutputStream.java | 16 ++-
.../erasurecode/ErasureCodingWorker.java | 3 +-
.../hadoop/hdfs/TestDFSStripedInputStream.java | 7 +-
.../hadoop/hdfs/TestDFSStripedOutputStream.java | 22 +--
.../TestDFSStripedOutputStreamWithFailure.java | 2 +-
14 files changed, 216 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
index 505eabd..9ccd3a7 100644
--- a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
@@ -66,4 +66,6 @@
HADOOP-12011. Allow to dump verbose information to ease debugging in raw erasure coders
(Kai Zheng)
- HADOOP-12065. Using more meaningful keys in EC schema. (Kai Zheng)
\ No newline at end of file
+ HADOOP-12065. Using more meaningful keys in EC schema. (Kai Zheng)
+
+ HDFS-8557. Allow to configure RS and XOR raw coders (Kai Zheng)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 3f2871b..9588254 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -143,10 +143,14 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Supported erasure codec classes */
public static final String IO_ERASURECODE_CODECS_KEY = "io.erasurecode.codecs";
- /** Raw coder factory for the RS codec */
+ /** Raw coder factory for the RS codec. */
public static final String IO_ERASURECODE_CODEC_RS_RAWCODER_KEY =
"io.erasurecode.codec.rs.rawcoder";
+ /** Raw coder factory for the XOR codec. */
+ public static final String IO_ERASURECODE_CODEC_XOR_RAWCODER_KEY =
+ "io.erasurecode.codec.xor.rawcoder";
+
/**
* Service Authorization
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java
new file mode 100644
index 0000000..5d22624
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.erasurecode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.erasurecode.rawcoder.*;
+
+/**
+ * A codec utility.
+ */
+public final class CodecUtil {
+
+ private CodecUtil() {}
+
+ /**
+ * Create RS raw encoder according to configuration.
+ * @param conf
+ * @param numDataUnits
+ * @param numParityUnits
+ * @return raw encoder
+ */
+ public static RawErasureEncoder createRSRawEncoder(
+ Configuration conf, int numDataUnits, int numParityUnits) {
+ RawErasureCoder rawCoder = createRawCoder(conf,
+ CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
+ true, numDataUnits, numParityUnits);
+ if (rawCoder == null) {
+ rawCoder = new RSRawEncoder(numDataUnits, numParityUnits);
+ }
+
+ return (RawErasureEncoder) rawCoder;
+ }
+
+ /**
+ * Create RS raw decoder according to configuration.
+ * @param conf
+ * @param numDataUnits
+ * @param numParityUnits
+ * @return raw decoder
+ */
+ public static RawErasureDecoder createRSRawDecoder(
+ Configuration conf, int numDataUnits, int numParityUnits) {
+ RawErasureCoder rawCoder = createRawCoder(conf,
+ CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
+ false, numDataUnits, numParityUnits);
+ if (rawCoder == null) {
+ rawCoder = new RSRawDecoder(numDataUnits, numParityUnits);
+ }
+
+ return (RawErasureDecoder) rawCoder;
+ }
+
+ /**
+ * Create XOR raw encoder according to configuration.
+ * @param conf
+ * @param numDataUnits
+ * @param numParityUnits
+ * @return raw encoder
+ */
+ public static RawErasureEncoder createXORRawEncoder(
+ Configuration conf, int numDataUnits, int numParityUnits) {
+ RawErasureCoder rawCoder = createRawCoder(conf,
+ CommonConfigurationKeys.IO_ERASURECODE_CODEC_XOR_RAWCODER_KEY,
+ true, numDataUnits, numParityUnits);
+ if (rawCoder == null) {
+ rawCoder = new XORRawEncoder(numDataUnits, numParityUnits);
+ }
+
+ return (RawErasureEncoder) rawCoder;
+ }
+
+ /**
+ * Create XOR raw decoder according to configuration.
+ * @param conf
+ * @param numDataUnits
+ * @param numParityUnits
+ * @return raw decoder
+ */
+ public static RawErasureDecoder createXORRawDecoder(
+ Configuration conf, int numDataUnits, int numParityUnits) {
+ RawErasureCoder rawCoder = createRawCoder(conf,
+ CommonConfigurationKeys.IO_ERASURECODE_CODEC_XOR_RAWCODER_KEY,
+ false, numDataUnits, numParityUnits);
+ if (rawCoder == null) {
+ rawCoder = new XORRawDecoder(numDataUnits, numParityUnits);
+ }
+
+ return (RawErasureDecoder) rawCoder;
+ }
+
+ /**
+ * Create raw coder using specified conf and raw coder factory key.
+ * @param conf
+ * @param rawCoderFactoryKey
+ * @param isEncoder
+ * @param numDataUnits
+ * @param numParityUnits
+ * @return raw coder
+ */
+ public static RawErasureCoder createRawCoder(Configuration conf,
+ String rawCoderFactoryKey, boolean isEncoder, int numDataUnits,
+ int numParityUnits) {
+
+ if (conf == null) {
+ return null;
+ }
+
+ Class<? extends RawErasureCoderFactory> factClass = null;
+ factClass = conf.getClass(rawCoderFactoryKey,
+ factClass, RawErasureCoderFactory.class);
+
+ if (factClass == null) {
+ return null;
+ }
+
+ RawErasureCoderFactory fact;
+ try {
+ fact = factClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new RuntimeException("Failed to create raw coder", e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Failed to create raw coder", e);
+ }
+
+ return isEncoder ? fact.createEncoder(numDataUnits, numParityUnits) :
+ fact.createDecoder(numDataUnits, numParityUnits);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureCoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureCoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureCoder.java
index c572bad..5cd0ee8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureCoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/AbstractErasureCoder.java
@@ -17,13 +17,8 @@
*/
package org.apache.hadoop.io.erasurecode.coder;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.erasurecode.ECSchema;
-import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder;
-import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoderFactory;
-import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
-import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
/**
* A common class of basic facilities to be shared by encoder and decoder
@@ -36,73 +31,13 @@ public abstract class AbstractErasureCoder
private final int numDataUnits;
private final int numParityUnits;
- /**
- * Create raw decoder using the factory specified by rawCoderFactoryKey
- * @param rawCoderFactoryKey
- * @return raw decoder
- */
- protected RawErasureDecoder createRawDecoder(
- String rawCoderFactoryKey, int dataUnitsCount, int parityUnitsCount) {
- RawErasureCoder rawCoder = createRawCoder(getConf(),
- rawCoderFactoryKey, false, dataUnitsCount, parityUnitsCount);
- return (RawErasureDecoder) rawCoder;
- }
-
- /**
- * Create raw encoder using the factory specified by rawCoderFactoryKey
- * @param rawCoderFactoryKey
- * @return raw encoder
- */
- protected RawErasureEncoder createRawEncoder(
- String rawCoderFactoryKey, int dataUnitsCount, int parityUnitsCount) {
- RawErasureCoder rawCoder = createRawCoder(getConf(),
- rawCoderFactoryKey, true, dataUnitsCount, parityUnitsCount);
- return (RawErasureEncoder) rawCoder;
- }
-
- /**
- * Create raw coder using specified conf and raw coder factory key.
- * @param conf
- * @param rawCoderFactoryKey
- * @param isEncoder
- * @return raw coder
- */
- public static RawErasureCoder createRawCoder(Configuration conf,
- String rawCoderFactoryKey, boolean isEncoder, int numDataUnits,
- int numParityUnits) {
-
- if (conf == null) {
- return null;
- }
-
- Class<? extends RawErasureCoderFactory> factClass = null;
- factClass = conf.getClass(rawCoderFactoryKey,
- factClass, RawErasureCoderFactory.class);
-
- if (factClass == null) {
- return null;
- }
-
- RawErasureCoderFactory fact;
- try {
- fact = factClass.newInstance();
- } catch (InstantiationException e) {
- throw new RuntimeException("Failed to create raw coder", e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("Failed to create raw coder", e);
- }
-
- return isEncoder ? fact.createEncoder(numDataUnits, numParityUnits) :
- fact.createDecoder(numDataUnits, numParityUnits);
- }
-
public AbstractErasureCoder(int numDataUnits, int numParityUnits) {
this.numDataUnits = numDataUnits;
this.numParityUnits = numParityUnits;
}
public AbstractErasureCoder(ECSchema schema) {
- this(schema.getNumDataUnits(), schema.getNumParityUnits());
+ this(schema.getNumDataUnits(), schema.getNumParityUnits());
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
index 57f4373..f56674d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
@@ -17,11 +17,10 @@
*/
package org.apache.hadoop.io.erasurecode.coder;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema;
-import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
/**
@@ -53,12 +52,8 @@ public class RSErasureDecoder extends AbstractErasureDecoder {
private RawErasureDecoder checkCreateRSRawDecoder() {
if (rsRawDecoder == null) {
- rsRawDecoder = createRawDecoder(
- CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
- getNumDataUnits(), getNumParityUnits());
- if (rsRawDecoder == null) {
- rsRawDecoder = new RSRawDecoder(getNumDataUnits(), getNumParityUnits());
- }
+ rsRawDecoder = CodecUtil.createRSRawDecoder(getConf(),
+ getNumDataUnits(), getNumParityUnits());
}
return rsRawDecoder;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
index ab23474..3ed3e20 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
@@ -17,11 +17,10 @@
*/
package org.apache.hadoop.io.erasurecode.coder;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema;
-import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
/**
@@ -53,12 +52,8 @@ public class RSErasureEncoder extends AbstractErasureEncoder {
private RawErasureEncoder checkCreateRSRawEncoder() {
if (rawEncoder == null) {
- rawEncoder = createRawEncoder(
- CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
+ rawEncoder = CodecUtil.createRSRawEncoder(getConf(),
getNumDataUnits(), getNumParityUnits());
- if (rawEncoder == null) {
- rawEncoder = new RSRawEncoder(getNumDataUnits(), getNumParityUnits());
- }
}
return rawEncoder;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
index 3fe8d1b..a847418 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
@@ -17,11 +17,11 @@
*/
package org.apache.hadoop.io.erasurecode.coder;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
-import org.apache.hadoop.io.erasurecode.rawcoder.XORRawDecoder;
/**
* Xor erasure decoder that decodes a block group.
@@ -39,10 +39,10 @@ public class XORErasureDecoder extends AbstractErasureDecoder {
}
@Override
- protected ErasureCodingStep prepareDecodingStep(final ECBlockGroup blockGroup) {
- // May be configured
- RawErasureDecoder rawDecoder = new XORRawDecoder(
- getNumDataUnits(), getNumParityUnits());
+ protected ErasureCodingStep prepareDecodingStep(
+ final ECBlockGroup blockGroup) {
+ RawErasureDecoder rawDecoder = CodecUtil.createXORRawDecoder(getConf(),
+ getNumDataUnits(), getNumParityUnits());
ECBlock[] inputBlocks = getInputBlocks(blockGroup);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java
index 5020896..5c4bcdd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java
@@ -17,11 +17,11 @@
*/
package org.apache.hadoop.io.erasurecode.coder;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
-import org.apache.hadoop.io.erasurecode.rawcoder.XORRawEncoder;
/**
* Xor erasure encoder that encodes a block group.
@@ -39,10 +39,10 @@ public class XORErasureEncoder extends AbstractErasureEncoder {
}
@Override
- protected ErasureCodingStep prepareEncodingStep(final ECBlockGroup blockGroup) {
- // May be configured
- RawErasureEncoder rawEncoder = new XORRawEncoder(
- getNumDataUnits(), getNumParityUnits());
+ protected ErasureCodingStep prepareEncodingStep(
+ final ECBlockGroup blockGroup) {
+ RawErasureEncoder rawEncoder = CodecUtil.createXORRawEncoder(getConf(),
+ getNumDataUnits(), getNumParityUnits());
ECBlock[] inputBlocks = getInputBlocks(blockGroup);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index bf99f17..a7339b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -42,6 +42,7 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
@@ -155,7 +156,8 @@ public class DFSStripedInputStream extends DFSInputStream {
curStripeRange = new StripeRange(0, 0);
readingService =
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
- decoder = new RSRawDecoder(dataBlkNum, parityBlkNum);
+ decoder = CodecUtil.createRSRawDecoder(dfsClient.getConfiguration(),
+ dataBlkNum, parityBlkNum);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Creating an striped input stream for file " + src);
}
@@ -207,8 +209,8 @@ public class DFSStripedInputStream extends DFSInputStream {
// The purpose is to get start offset into each block.
long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema,
cellSize, targetBlockGroup, offsetIntoBlockGroup);
- Preconditions.checkState(
- offsetsForInternalBlocks.length == dataBlkNum + parityBlkNum);
+ Preconditions.checkState(offsetsForInternalBlocks.length ==
+ dataBlkNum + parityBlkNum);
long minOffset = offsetsForInternalBlocks[dataBlkNum];
retry = new ReaderRetryPolicy();
@@ -726,8 +728,10 @@ public class DFSStripedInputStream extends DFSInputStream {
void prepareParityChunk() {
for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
if (alignedStripe.chunks[i] == null) {
- final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
- alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]);
+ final int decodeIndex = convertIndex4Decode(i,
+ dataBlkNum, parityBlkNum);
+ alignedStripe.chunks[i] =
+ new StripingChunk(decodeInputs[decodeIndex]);
alignedStripe.chunks[i].addByteArraySlice(0,
(int) alignedStripe.getSpanInBlock());
break;
@@ -807,7 +811,8 @@ public class DFSStripedInputStream extends DFSInputStream {
parityBlkNum);
decodeInputs[decodeIndex] = ByteBuffer.allocateDirect(
(int) alignedStripe.range.spanInBlock);
- alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]);
+ alignedStripe.chunks[i] =
+ new StripingChunk(decodeInputs[decodeIndex]);
if (blockReaders[i] == null) {
prepareParityBlockReader(i);
}
@@ -839,7 +844,8 @@ public class DFSStripedInputStream extends DFSInputStream {
// decoders to work
final int span = (int) alignedStripe.getSpanInBlock();
for (int i = 0; i < alignedStripe.chunks.length; i++) {
- final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
+ final int decodeIndex = convertIndex4Decode(i,
+ dataBlkNum, parityBlkNum);
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state == StripingChunk.ALLZERO) {
for (int j = 0; j < span; j++) {
@@ -857,7 +863,8 @@ public class DFSStripedInputStream extends DFSInputStream {
for (int i = 0; i < alignedStripe.chunks.length; i++) {
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state == StripingChunk.MISSING) {
- decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
+ decodeIndices[pos++] = convertIndex4Decode(i,
+ dataBlkNum, parityBlkNum);
}
}
decodeIndices = Arrays.copyOf(decodeIndices, pos);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 0935d5c..bdd3352 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
@@ -247,13 +248,16 @@ public class DFSStripedOutputStream extends DFSOutputStream {
numDataBlocks = schema.getNumDataUnits();
numAllBlocks = numDataBlocks + numParityBlocks;
- encoder = new RSRawEncoder(numDataBlocks, numParityBlocks);
+ encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
+ numDataBlocks, numParityBlocks);
- coordinator = new Coordinator(dfsClient.getConf(), numDataBlocks, numAllBlocks);
+ coordinator = new Coordinator(dfsClient.getConf(),
+ numDataBlocks, numAllBlocks);
try {
cellBuffers = new CellBuffers(numParityBlocks);
} catch (InterruptedException ie) {
- throw DFSUtil.toInterruptedIOException("Failed to create cell buffers", ie);
+ throw DFSUtil.toInterruptedIOException(
+ "Failed to create cell buffers", ie);
}
List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
@@ -318,7 +322,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
}
}
- private void handleStreamerFailure(String err, Exception e) throws IOException {
+ private void handleStreamerFailure(String err,
+ Exception e) throws IOException {
LOG.warn("Failed: " + err + ", " + this, e);
getCurrentStreamer().setIsFailed(true);
checkStreamers();
@@ -487,7 +492,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
return;
}
- final int firstCellSize = (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize);
+ final int firstCellSize =
+ (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize);
final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize?
firstCellSize : cellSize;
final ByteBuffer[] buffers = cellBuffers.getBuffers();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 6f3857f..3c9adc4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECReco
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
@@ -112,7 +113,7 @@ public final class ErasureCodingWorker {
}
private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
- return new RSRawDecoder(numDataUnits, numParityUnits);
+ return CodecUtil.createRSRawDecoder(conf, numDataUnits, numParityUnits);
}
private void initializeStripedReadThreadPool(int num) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index b53983b..b29d582 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -37,8 +37,10 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -230,8 +232,9 @@ public class TestDFSStripedInputStream {
for (int m : missingBlkIdx) {
decodeInputs[m] = null;
}
- RSRawDecoder rsRawDecoder = new RSRawDecoder(DATA_BLK_NUM, PARITY_BLK_NUM);
- rsRawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
+ RawErasureDecoder rawDecoder = CodecUtil.createRSRawDecoder(conf,
+ DATA_BLK_NUM, PARITY_BLK_NUM);
+ rawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE;
System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index e041dbe..3f40dee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.test.GenericTestUtils;
@@ -43,7 +44,8 @@ import org.junit.Before;
import org.junit.Test;
public class TestDFSStripedOutputStream {
- public static final Log LOG = LogFactory.getLog(TestDFSStripedOutputStream.class);
+ public static final Log LOG = LogFactory.getLog(
+ TestDFSStripedOutputStream.class);
static {
GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
@@ -55,6 +57,7 @@ public class TestDFSStripedOutputStream {
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
+ private Configuration conf;
private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final int stripesPerBlock = 4;
private final int blockSize = cellSize * stripesPerBlock;
@@ -62,7 +65,7 @@ public class TestDFSStripedOutputStream {
@Before
public void setup() throws IOException {
int numDNs = dataBlocks + parityBlocks + 2;
- Configuration conf = new Configuration();
+ conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null, 0);
@@ -140,7 +143,8 @@ public class TestDFSStripedOutputStream {
@Test
public void testFileMoreThanABlockGroup2() throws IOException {
- testOneFile("/MoreThanABlockGroup2", blockSize * dataBlocks + cellSize+ 123);
+ testOneFile("/MoreThanABlockGroup2",
+ blockSize * dataBlocks + cellSize+ 123);
}
@@ -251,13 +255,14 @@ public class TestDFSStripedOutputStream {
}
}
- static void verifyParity(final long size, final int cellSize,
+ void verifyParity(final long size, final int cellSize,
byte[][] dataBytes, byte[][] parityBytes) {
- verifyParity(size, cellSize, dataBytes, parityBytes, -1);
+ verifyParity(conf, size, cellSize, dataBytes, parityBytes, -1);
}
- static void verifyParity(final long size, final int cellSize,
- byte[][] dataBytes, byte[][] parityBytes, int killedDnIndex) {
+ static void verifyParity(Configuration conf, final long size,
+ final int cellSize, byte[][] dataBytes,
+ byte[][] parityBytes, int killedDnIndex) {
// verify the parity blocks
int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
size, cellSize, dataBytes.length, dataBytes.length);
@@ -275,7 +280,8 @@ public class TestDFSStripedOutputStream {
}
}
final RawErasureEncoder encoder =
- new RSRawEncoder(dataBytes.length, parityBytes.length);
+ CodecUtil.createRSRawEncoder(conf,
+ dataBytes.length, parityBytes.length);
encoder.encode(dataBytes, expectedParityBytes);
for (int i = 0; i < parityBytes.length; i++) {
if (i != killedDnIndex) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e299fe86/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
index c232e13..d2e0458 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -335,7 +335,7 @@ public class TestDFSStripedOutputStreamWithFailure {
}
// check parity
- TestDFSStripedOutputStream.verifyParity(
+ TestDFSStripedOutputStream.verifyParity(dfs.getConf(),
lbs.getLocatedBlocks().get(group).getBlockSize(),
CELL_SIZE, dataBlockBytes, parityBlockBytes,
killedDnIndex - dataBlockBytes.length);