You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/19 07:15:27 UTC
[23/50] hadoop git commit: HDFS-7348. Erasure Coding: DataNode
reconstruct striped blocks. Contributed by Yi Liu.
HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks. Contributed by Yi Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1efb8ed4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1efb8ed4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1efb8ed4
Branch: refs/heads/HDFS-7285
Commit: 1efb8ed4bdc73f6f7602af8e74e4a74446a33497
Parents: dcfe095
Author: Zhe Zhang <zh...@apache.org>
Authored: Tue May 5 16:33:56 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon May 18 22:11:08 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../org/apache/hadoop/hdfs/BlockReader.java | 6 +
.../apache/hadoop/hdfs/BlockReaderLocal.java | 5 +
.../hadoop/hdfs/BlockReaderLocalLegacy.java | 5 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 6 +
.../java/org/apache/hadoop/hdfs/DFSPacket.java | 10 +-
.../apache/hadoop/hdfs/RemoteBlockReader.java | 5 +
.../apache/hadoop/hdfs/RemoteBlockReader2.java | 5 +
.../hadoop/hdfs/server/datanode/DNConf.java | 27 +
.../hadoop/hdfs/server/datanode/DataNode.java | 31 +-
.../erasurecode/ErasureCodingWorker.java | 893 ++++++++++++++++++-
.../hadoop/hdfs/util/StripedBlockUtil.java | 49 +-
.../src/main/resources/hdfs-default.xml | 31 +-
.../hadoop/hdfs/TestRecoverStripedFile.java | 356 ++++++++
14 files changed, 1377 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 7efaa5a..0d2d448 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -175,3 +175,6 @@
HDFS-7672. Handle write failure for stripping blocks and refactor the
existing code in DFSStripedOutputStream and StripedDataStreamer. (szetszwo)
+
+ HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks.
+ (Yi Liu via Zhe Zhang)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
index aa3e8ba..0a5511e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.util.DataChecksum;
/**
* A BlockReader is responsible for reading a single block
@@ -99,4 +100,9 @@ public interface BlockReader extends ByteBufferReadable {
* supported.
*/
ClientMmap getClientMmap(EnumSet<ReadOption> opts);
+
+ /**
+ * @return The DataChecksum used by the read block
+ */
+ DataChecksum getDataChecksum();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index d913f3a..0b2420d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -738,4 +738,9 @@ class BlockReaderLocal implements BlockReader {
void forceUnanchorable() {
replica.getSlot().makeUnanchorable();
}
+
+ @Override
+ public DataChecksum getDataChecksum() {
+ return checksum;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index c16ffdf..04cf733 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -732,4 +732,9 @@ class BlockReaderLocalLegacy implements BlockReader {
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}
+
+ @Override
+ public DataChecksum getDataChecksum() {
+ return checksum;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index fe5ddb6..019f14b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -369,6 +369,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600;
public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads";
public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1;
+ public static final String DFS_DATANODE_STRIPED_READ_THREADS_KEY = "dfs.datanode.stripedread.threads";
+ public static final int DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20;
+ public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size";
+ public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 256 * 1024;
+ public static final String DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis";
+ public static final int DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s
public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
index 9cd1ec1..a26e35e 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@ -37,7 +37,7 @@ import org.apache.htrace.Span;
****************************************************************/
@InterfaceAudience.Private
-class DFSPacket {
+public class DFSPacket {
public static final long HEART_BEAT_SEQNO = -1L;
private static long[] EMPTY = new long[0];
private final long seqno; // sequence number of buffer in block
@@ -80,7 +80,7 @@ class DFSPacket {
* @param checksumSize the size of checksum
* @param lastPacketInBlock if this is the last packet
*/
- DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
+ public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
int checksumSize, boolean lastPacketInBlock) {
this.lastPacketInBlock = lastPacketInBlock;
this.numChunks = 0;
@@ -114,7 +114,7 @@ class DFSPacket {
dataPos += len;
}
- synchronized void writeData(ByteBuffer inBuffer, int len)
+ public synchronized void writeData(ByteBuffer inBuffer, int len)
throws ClosedChannelException {
checkBuffer();
len = len > inBuffer.remaining() ? inBuffer.remaining() : len;
@@ -135,7 +135,7 @@ class DFSPacket {
* @param len the length of checksums to write
* @throws ClosedChannelException
*/
- synchronized void writeChecksum(byte[] inarray, int off, int len)
+ public synchronized void writeChecksum(byte[] inarray, int off, int len)
throws ClosedChannelException {
checkBuffer();
if (len == 0) {
@@ -154,7 +154,7 @@ class DFSPacket {
* @param stm
* @throws IOException
*/
- synchronized void writeTo(DataOutputStream stm) throws IOException {
+ public synchronized void writeTo(DataOutputStream stm) throws IOException {
checkBuffer();
final int dataLen = dataPos - dataStart;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index d70f419..70cce7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -505,4 +505,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}
+
+ @Override
+ public DataChecksum getDataChecksum() {
+ return checksum;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index c368d65..cce44b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -474,4 +474,9 @@ public class RemoteBlockReader2 implements BlockReader {
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}
+
+ @Override
+ public DataChecksum getDataChecksum() {
+ return checksum;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 4b7fbc3..d25642f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -236,6 +236,33 @@ public class DNConf {
}
/**
+ * Returns true if connect to datanode via hostname
+ *
+ * @return boolean true if connect to datanode via hostname
+ */
+ public boolean getConnectToDnViaHostname() {
+ return connectToDnViaHostname;
+ }
+
+ /**
+ * Returns socket timeout
+ *
+ * @return int socket timeout
+ */
+ public int getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ /**
+ * Returns socket write timeout
+ *
+ * @return int socket write timeout
+ */
+ public int getSocketWriteTimeout() {
+ return socketWriteTimeout;
+ }
+
+ /**
* Returns the SaslPropertiesResolver configured for use with
* DataTransferProtocol, or null if not configured.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 221ba38..5eca2c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1161,7 +1161,8 @@ public class DataNode extends ReconfigurableBase
saslClient = new SaslDataTransferClient(dnConf.conf,
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
- ecWorker = new ErasureCodingWorker(conf); // Initialize ErasureCoding worker
+ // Initialize ErasureCoding worker
+ ecWorker = new ErasureCodingWorker(conf, this);
}
/**
@@ -1226,6 +1227,10 @@ public class DataNode extends ReconfigurableBase
return UUID.randomUUID().toString();
}
+ public SaslDataTransferClient getSaslClient() {
+ return saslClient;
+ }
+
/**
* Verify that the DatanodeUuid has been initialized. If this is a new
* datanode then we generate a new Datanode Uuid and persist it to disk.
@@ -1488,7 +1493,7 @@ public class DataNode extends ReconfigurableBase
/**
* Creates either NIO or regular depending on socketWriteTimeout.
*/
- protected Socket newSocket() throws IOException {
+ public Socket newSocket() throws IOException {
return (dnConf.socketWriteTimeout > 0) ?
SocketChannel.open().socket() : new Socket();
}
@@ -2143,11 +2148,8 @@ public class DataNode extends ReconfigurableBase
//
// Header info
//
- Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
- if (isBlockTokenEnabled) {
- accessToken = blockPoolTokenSecretManager.generateToken(b,
- EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
- }
+ Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
+ EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
long writeTimeout = dnConf.socketWriteTimeout +
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
@@ -2214,6 +2216,19 @@ public class DataNode extends ReconfigurableBase
}
}
+ /***
+ * Use BlockTokenSecretManager to generate block token for current user.
+ */
+ public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b,
+ EnumSet<AccessMode> mode) throws IOException {
+ Token<BlockTokenIdentifier> accessToken =
+ BlockTokenSecretManager.DUMMY_TOKEN;
+ if (isBlockTokenEnabled) {
+ accessToken = blockPoolTokenSecretManager.generateToken(b, mode);
+ }
+ return accessToken;
+ }
+
/**
* Returns a new DataEncryptionKeyFactory that generates a key from the
* BlockPoolTokenSecretManager, using the block pool ID of the given block.
@@ -2221,7 +2236,7 @@ public class DataNode extends ReconfigurableBase
* @param block for which the factory needs to create a key
* @return DataEncryptionKeyFactory for block's block pool ID
*/
- DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
+ public DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
final ExtendedBlock block) {
return new DataEncryptionKeyFactory() {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 6430308..c4e568f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -17,15 +17,68 @@
*/
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSPacket;
+import org.apache.hadoop.hdfs.RemoteBlockReader2;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
-import org.apache.hadoop.io.erasurecode.coder.AbstractErasureCoder;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripedReadResult;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
-import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.base.Preconditions;
/**
* ErasureCodingWorker handles the erasure coding recovery work commands. These
@@ -34,41 +87,60 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder;
* commands.
*/
public final class ErasureCodingWorker {
-
+ private final Log LOG = DataNode.LOG;
+
+ private final DataNode datanode;
private Configuration conf;
- RawErasureCoder rawEncoder = null;
- RawErasureCoder rawDecoder = null;
- public ErasureCodingWorker(Configuration conf) {
+ private ThreadPoolExecutor STRIPED_READ_TRHEAD_POOL;
+ private final int STRIPED_READ_THRESHOLD_MILLIS;
+ private final int STRIPED_READ_BUFFER_SIZE;
+
+ public ErasureCodingWorker(Configuration conf, DataNode datanode) {
+ this.datanode = datanode;
this.conf = conf;
- initialize();
- }
- /**
- * Initializes the required resources for handling the erasure coding recovery
- * work.
- */
- public void initialize() {
- // Right now directly used RS coder. Once other coders integration ready, we
- // can load preferred codec here.
- initializeErasureEncoder();
- initializeErasureDecoder();
+ STRIPED_READ_THRESHOLD_MILLIS = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY,
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT);
+ initializeStripedReadThreadPool(conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY,
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT));
+ STRIPED_READ_BUFFER_SIZE = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
}
- private void initializeErasureDecoder() {
- rawDecoder = AbstractErasureCoder.createRawCoder(conf,
- CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, false);
- if (rawDecoder == null) {
- rawDecoder = new RSRawDecoder();
- }
+ private RawErasureEncoder newEncoder() {
+ return new RSRawEncoder();
+ }
+
+ private RawErasureDecoder newDecoder() {
+ return new RSRawDecoder();
}
- private void initializeErasureEncoder() {
- rawEncoder = AbstractErasureCoder.createRawCoder(conf,
- CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, true);
- if (rawEncoder == null) {
- rawEncoder = new RSRawEncoder();
- }
+ private void initializeStripedReadThreadPool(int num) {
+ STRIPED_READ_TRHEAD_POOL = new ThreadPoolExecutor(1, num, 60,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new Daemon.DaemonFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = super.newThread(r);
+ t.setName("stripedRead-" + threadIndex.getAndIncrement());
+ return t;
+ }
+ }, new ThreadPoolExecutor.CallerRunsPolicy() {
+ @Override
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
+ LOG.info("Execution for striped reading rejected, "
+ + "Executing in current thread");
+ // will run in the current thread
+ super.rejectedExecution(runnable, e);
+ }
+ });
+ STRIPED_READ_TRHEAD_POOL.allowCoreThreadTimeOut(true);
}
/**
@@ -78,6 +150,765 @@ public final class ErasureCodingWorker {
* BlockECRecoveryInfo
*/
public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
- // HDFS-7348 : Implement the actual recovery process
+ for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
+ try {
+ new Daemon(new ReconstructAndTransferBlock(recoveryInfo)).start();
+ } catch (Throwable e) {
+ LOG.warn("Failed to recover striped block " +
+ recoveryInfo.getExtendedBlock().getLocalBlock(), e);
+ }
+ }
+ }
+
+ /**
+ * ReconstructAndTransferBlock recover one or more missed striped block in the
+ * striped block group, the minimum number of live striped blocks should be
+ * no less than data block number.
+ *
+ * | <- Striped Block Group -> |
+ * blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group
+ * | | | |
+ * v v v v
+ * +------+ +------+ +------+ +------+
+ * |cell_0| |cell_1| |cell_2| |cell_3| ...
+ * +------+ +------+ +------+ +------+
+ * |cell_4| |cell_5| |cell_6| |cell_7| ...
+ * +------+ +------+ +------+ +------+
+ * |cell_8| |cell_9| |cell10| |cell11| ...
+ * +------+ +------+ +------+ +------+
+ * ... ... ... ...
+ *
+ *
+ * We use following steps to recover striped block group, in each round, we
+ * recover <code>bufferSize</code> data until finish, the
+ * <code>bufferSize</code> is configurable and may be less or larger than
+ * cell size:
+ * step1: read <code>bufferSize</code> data from minimum number of sources
+ * required by recovery.
+ * step2: decode data for targets.
+ * step3: transfer data to targets.
+ *
+ * In step1, try to read <code>bufferSize</code> data from minimum number
+ * of sources , if there is corrupt or stale sources, read from new source
+ * will be scheduled. The best sources are remembered for next round and
+ * may be updated in each round.
+ *
+ * In step2, typically if source blocks we read are all data blocks, we
+ * need to call encode, and if there is one parity block, we need to call
+ * decode. Notice we only read once and recover all missed striped block
+ * if they are more than one.
+ *
+ * In step3, send the recovered data to targets by constructing packet
+ * and send them directly. Same as continuous block replication, we
+ * don't check the packet ack. Since the datanode doing the recovery work
+ * are one of the source datanodes, so the recovered data are sent
+ * remotely.
+ *
+ * There are some points we can do further improvements in next phase:
+ * 1. we can read the block file directly on the local datanode,
+ * currently we use remote block reader. (Notice short-circuit is not
+ * a good choice, see inline comments).
+ * 2. We need to check the packet ack for EC recovery? Since EC recovery
+ * is more expensive than continuous block replication, it needs to
+ * read from several other datanodes, should we make sure the
+ * recovered result received by targets?
+ */
+ private class ReconstructAndTransferBlock implements Runnable {
+ private final int dataBlkNum;
+ private final int parityBlkNum;
+ private final int cellSize;
+
+ private RawErasureEncoder encoder;
+ private RawErasureDecoder decoder;
+
+ // Striped read buffer size
+ private int bufferSize;
+
+ private final ExtendedBlock blockGroup;
+ // position in striped block
+ private long positionInBlock;
+
+ // sources
+ private final short[] liveIndices;
+ private DatanodeInfo[] sources;
+
+ private List<StripedReader> stripedReaders;
+
+ // targets
+ private DatanodeInfo[] targets;
+ private StorageType[] targetStorageTypes;
+
+ private short[] targetIndices;
+ private ByteBuffer[] targetBuffers;
+
+ private Socket[] targetSockets;
+ private DataOutputStream[] targetOutputStreams;
+ private DataInputStream[] targetInputStreams;
+
+ private long[] blockOffset4Targets;
+ private long[] seqNo4Targets;
+
+ private final int WRITE_PACKET_SIZE = 64 * 1024;
+ private DataChecksum checksum;
+ private int maxChunksPerPacket;
+ private byte[] packetBuf;
+ private byte[] checksumBuf;
+ private int bytesPerChecksum;
+ private int checksumSize;
+
+ private CachingStrategy cachingStrategy;
+
+ private Map<Future<Void>, Integer> futures = new HashMap<>();
+ private CompletionService<Void> readService =
+ new ExecutorCompletionService<>(STRIPED_READ_TRHEAD_POOL);
+
+ ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) {
+ ECSchema schema = recoveryInfo.getECSchema();
+ dataBlkNum = schema.getNumDataUnits();
+ parityBlkNum = schema.getNumParityUnits();
+ cellSize = schema.getChunkSize();
+
+ blockGroup = recoveryInfo.getExtendedBlock();
+
+ liveIndices = recoveryInfo.getLiveBlockIndices();
+ sources = recoveryInfo.getSourceDnInfos();
+ stripedReaders = new ArrayList<>(sources.length);
+
+ Preconditions.checkArgument(liveIndices.length >= dataBlkNum,
+ "No enough live striped blocks.");
+ Preconditions.checkArgument(liveIndices.length == sources.length);
+
+ targets = recoveryInfo.getTargetDnInfos();
+ targetStorageTypes = recoveryInfo.getTargetStorageTypes();
+ targetIndices = new short[targets.length];
+ targetBuffers = new ByteBuffer[targets.length];
+
+ targetSockets = new Socket[targets.length];
+ targetOutputStreams = new DataOutputStream[targets.length];
+ targetInputStreams = new DataInputStream[targets.length];
+
+ blockOffset4Targets = new long[targets.length];
+ seqNo4Targets = new long[targets.length];
+
+ for (int i = 0; i < targets.length; i++) {
+ blockOffset4Targets[i] = 0;
+ seqNo4Targets[i] = 0;
+ }
+
+ getTargetIndices();
+ cachingStrategy = CachingStrategy.newDefaultStrategy();
+ }
+
+ private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) {
+ return StripedBlockUtil.constructStripedBlock(blockGroup, cellSize,
+ dataBlkNum, i);
+ }
+
+ private long getBlockLen(ExtendedBlock blockGroup, int i) {
+ return StripedBlockUtil.getStripedBlockLength(blockGroup.getNumBytes(),
+ cellSize, dataBlkNum, i);
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Store the indices of successfully read source
+ // This will be updated after doing real read.
+ int[] success = new int[dataBlkNum];
+
+ int nsuccess = 0;
+ for (int i = 0; i < sources.length && nsuccess < dataBlkNum; i++) {
+ StripedReader reader = new StripedReader(liveIndices[i]);
+ stripedReaders.add(reader);
+
+ BlockReader blockReader = newBlockReader(
+ getBlock(blockGroup, liveIndices[i]), 0, sources[i]);
+ if (blockReader != null) {
+ initChecksumAndBufferSizeIfNeeded(blockReader);
+ reader.blockReader = blockReader;
+ reader.buffer = ByteBuffer.allocate(bufferSize);
+ success[nsuccess++] = i;
+ }
+ }
+
+ if (nsuccess < dataBlkNum) {
+ String error = "Can't find minimum sources required by "
+ + "recovery, block id: " + blockGroup.getBlockId();
+ LOG.warn(error);
+ throw new IOException(error);
+ }
+
+ for (int i = 0; i < targets.length; i++) {
+ targetBuffers[i] = ByteBuffer.allocate(bufferSize);
+ }
+
+ checksumSize = checksum.getChecksumSize();
+ int chunkSize = bytesPerChecksum + checksumSize;
+ maxChunksPerPacket = Math.max(
+ (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN)/chunkSize, 1);
+ int maxPacketSize = chunkSize * maxChunksPerPacket
+ + PacketHeader.PKT_MAX_HEADER_LEN;
+
+ packetBuf = new byte[maxPacketSize];
+ checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)];
+
+ // Store whether the target is success
+ boolean[] targetsStatus = new boolean[targets.length];
+ if (initTargetStreams(targetsStatus) == 0) {
+ String error = "All targets are failed.";
+ LOG.warn(error);
+ throw new IOException(error);
+ }
+
+ long firstStripedBlockLength = getBlockLen(blockGroup, 0);
+ while (positionInBlock < firstStripedBlockLength) {
+ int toRead = Math.min(
+ bufferSize, (int)(firstStripedBlockLength - positionInBlock));
+ // step1: read minimum striped buffer size data required by recovery.
+ nsuccess = readMinimumStripedData4Recovery(success);
+
+ if (nsuccess < dataBlkNum) {
+ String error = "Can't read data from minimum number of sources "
+ + "required by recovery, block id: " + blockGroup.getBlockId();
+ LOG.warn(error);
+ throw new IOException(error);
+ }
+
+ // step2: encode/decode to recover targets
+ long remaining = firstStripedBlockLength - positionInBlock;
+ int toRecoverLen = remaining < bufferSize ?
+ (int)remaining : bufferSize;
+ recoverTargets(success, targetsStatus, toRecoverLen);
+
+ // step3: transfer data
+ if (transferData2Targets(targetsStatus) == 0) {
+ String error = "Transfer failed for all targets.";
+ LOG.warn(error);
+ throw new IOException(error);
+ }
+
+ clearBuffers();
+ positionInBlock += toRead;
+ }
+
+ endTargetBlocks(targetsStatus);
+
+ // Currently we don't check the acks for packets, this is similar as
+ // block replication.
+ } catch (Throwable e) {
+ LOG.warn("Failed to recover striped block: " + blockGroup);
+ } finally {
+ // close block readers
+ for (StripedReader stripedReader : stripedReaders) {
+ closeBlockReader(stripedReader.blockReader);
+ }
+ for (int i = 0; i < targets.length; i++) {
+ IOUtils.closeStream(targetOutputStreams[i]);
+ IOUtils.closeStream(targetInputStreams[i]);
+ IOUtils.closeStream(targetSockets[i]);
+ }
+ }
+ }
+
+ // init checksum from block reader
+ private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) {
+ if (checksum == null) {
+ checksum = blockReader.getDataChecksum();
+ bytesPerChecksum = checksum.getBytesPerChecksum();
+ // The bufferSize is flat to divide bytesPerChecksum
+ int readBufferSize = STRIPED_READ_BUFFER_SIZE;
+ bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum :
+ readBufferSize - readBufferSize % bytesPerChecksum;
+ } else {
+ assert blockReader.getDataChecksum().equals(checksum);
+ }
+ }
+
+ // assume liveIndices is not ordered.
+ private void getTargetIndices() {
+ BitSet bitset = new BitSet(dataBlkNum + parityBlkNum);
+ for (int i = 0; i < sources.length; i++) {
+ bitset.set(liveIndices[i]);
+ }
+ int m = 0;
+ for (int i = 0; i < dataBlkNum + parityBlkNum && m < targets.length; i++) {
+ if (!bitset.get(i)) {
+ targetIndices[m++] = (short)i;
+ }
+ }
+ }
+
+ /**
+ * Read minimum striped buffer size data required by recovery.
+ * <code>success</code> list will be updated after read.
+ *
+ * Initially we only read from <code>dataBlkNum</code> sources,
+ * if timeout or failure for some source, we will try to schedule
+ * read from a new source.
+ */
+ private int readMinimumStripedData4Recovery(int[] success) {
+
+ BitSet used = new BitSet(sources.length);
+ for (int i = 0; i < dataBlkNum; i++) {
+ StripedReader reader = stripedReaders.get(success[i]);
+ Callable<Void> readCallable = readFromBlock(
+ reader.blockReader, reader.buffer);
+ Future<Void> f = readService.submit(readCallable);
+ futures.put(f, success[i]);
+ used.set(success[i]);
+ }
+
+ int nsuccess = 0;
+ while (!futures.isEmpty()) {
+ try {
+ StripedReadResult result =
+ StripedBlockUtil.getNextCompletedStripedRead(
+ readService, futures, STRIPED_READ_THRESHOLD_MILLIS);
+ if (result.state == StripedReadResult.SUCCESSFUL) {
+ success[nsuccess++] = result.index;
+ if (nsuccess >= dataBlkNum) {
+ // cancel remaining reads if we read successfully from minimum
+ // number of sources required for recovery.
+ cancelReads(futures.keySet());
+ futures.clear();
+ break;
+ }
+ } else if (result.state == StripedReadResult.FAILED) {
+ // If read failed for some source, we should not use it anymore
+ // and schedule read from a new source.
+ StripedReader failedReader = stripedReaders.get(result.index);
+ closeBlockReader(failedReader.blockReader);
+ failedReader.blockReader = null;
+ scheduleNewRead(used);
+ } else if (result.state == StripedReadResult.TIMEOUT) {
+ // If timeout, we also schedule a new read.
+ scheduleNewRead(used);
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Read data interrupted.", e);
+ break;
+ }
+ }
+
+ return nsuccess;
+ }
+
+ /**
+ * Return true if need to do encoding to recovery missed striped block.
+ */
+ private boolean shouldEncode(int[] success) {
+ for (int i = 0; i < success.length; i++) {
+ if (stripedReaders.get(success[i]).index >= dataBlkNum) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void paddingBufferToLen(ByteBuffer buffer, int len) {
+ int toPadding = len - buffer.position();
+ for (int i = 0; i < toPadding; i++) {
+ buffer.put((byte) 0);
+ }
+ }
+
+ // Initialize encoder
+ private void initEncoderIfNecessary() {
+ if (encoder == null) {
+ encoder = newEncoder();
+ encoder.initialize(dataBlkNum, parityBlkNum, bufferSize);
+ }
+ }
+
+ // Initialize decoder
+ private void initDecoderIfNecessary() {
+ if (decoder == null) {
+ decoder = newDecoder();
+ decoder.initialize(dataBlkNum, parityBlkNum, bufferSize);
+ }
+ }
+
+ private void recoverTargets(int[] success, boolean[] targetsStatus,
+ int toRecoverLen) {
+ if (shouldEncode(success)) {
+ initEncoderIfNecessary();
+ ByteBuffer[] dataBuffers = new ByteBuffer[dataBlkNum];
+ ByteBuffer[] parityBuffers = new ByteBuffer[parityBlkNum];
+ for (int i = 0; i < dataBlkNum; i++) {
+ StripedReader reader = stripedReaders.get(i);
+ ByteBuffer buffer = reader.buffer;
+ paddingBufferToLen(buffer, toRecoverLen);
+ dataBuffers[i] = (ByteBuffer)buffer.flip();
+ }
+ for (int i = dataBlkNum; i < stripedReaders.size(); i++) {
+ StripedReader reader = stripedReaders.get(i);
+ parityBuffers[reader.index - dataBlkNum] = cleanBuffer(reader.buffer);
+ }
+ for (int i = 0; i < targets.length; i++) {
+ parityBuffers[targetIndices[i] - dataBlkNum] = targetBuffers[i];
+ }
+ for (int i = 0; i < parityBlkNum; i++) {
+ if (parityBuffers[i] == null) {
+ parityBuffers[i] = ByteBuffer.allocate(toRecoverLen);
+ } else {
+ parityBuffers[i].limit(toRecoverLen);
+ }
+ }
+ encoder.encode(dataBuffers, parityBuffers);
+ } else {
+ /////////// TODO: wait for HADOOP-11847 /////////////
+ ////////// The current decode method always try to decode parityBlkNum number of data blocks. ////////////
+ initDecoderIfNecessary();
+ ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum];
+ for (int i = 0; i < success.length; i++) {
+ StripedReader reader = stripedReaders.get(success[i]);
+ ByteBuffer buffer = reader.buffer;
+ paddingBufferToLen(buffer, toRecoverLen);
+ int index = reader.index < dataBlkNum ?
+ reader.index + parityBlkNum : reader.index - dataBlkNum;
+ inputs[index] = (ByteBuffer)buffer.flip();
+ }
+ int[] indices4Decode = new int[parityBlkNum];
+ int m = 0;
+ for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+ if (inputs[i] == null) {
+ inputs[i] = ByteBuffer.allocate(toRecoverLen);
+ indices4Decode[m++] = i;
+ }
+ }
+ ByteBuffer[] outputs = new ByteBuffer[parityBlkNum];
+ m = 0;
+ // targetIndices is subset of indices4Decode
+ for (int i = 0; i < parityBlkNum; i++) {
+ if (m < targetIndices.length &&
+ (indices4Decode[i] - parityBlkNum) == targetIndices[m]) {
+ outputs[i] = targetBuffers[m++];
+ outputs[i].limit(toRecoverLen);
+ } else {
+ outputs[i] = ByteBuffer.allocate(toRecoverLen);
+ }
+ }
+
+ decoder.decode(inputs, indices4Decode, outputs);
+
+ for (int i = 0; i < targets.length; i++) {
+ if (targetsStatus[i]) {
+ long blockLen = getBlockLen(blockGroup, targetIndices[i]);
+ long remaining = blockLen - positionInBlock;
+ if (remaining < 0) {
+ targetBuffers[i].limit(0);
+ } else if (remaining < toRecoverLen) {
+ targetBuffers[i].limit((int)remaining);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Schedule read from a new source, we first try un-initial source,
+ * then try un-used source in this round and bypass failed source.
+ */
+ private void scheduleNewRead(BitSet used) {
+ StripedReader reader = null;
+ int m = stripedReaders.size();
+ while (m < sources.length && reader == null) {
+ reader = new StripedReader(liveIndices[m]);
+ BlockReader blockReader = newBlockReader(
+ getBlock(blockGroup, liveIndices[m]), positionInBlock, sources[m]);
+ stripedReaders.add(reader);
+ if (blockReader != null) {
+ assert blockReader.getDataChecksum().equals(checksum);
+ reader.blockReader = blockReader;
+ reader.buffer = ByteBuffer.allocate(bufferSize);
+ } else {
+ m++;
+ reader = null;
+ }
+ }
+
+ for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
+ StripedReader r = stripedReaders.get(i);
+ if (r.blockReader != null && !used.get(i)) {
+ closeBlockReader(r.blockReader);
+ r.blockReader = newBlockReader(
+ getBlock(blockGroup, liveIndices[i]), positionInBlock,
+ sources[i]);
+ if (r.blockReader != null) {
+ m = i;
+ reader = r;
+ }
+ }
+ }
+
+ if (reader != null) {
+ Callable<Void> readCallable = readFromBlock(
+ reader.blockReader, reader.buffer);
+ Future<Void> f = readService.submit(readCallable);
+ futures.put(f, m);
+ used.set(m);
+ }
+ }
+
+ // cancel all reads.
+ private void cancelReads(Collection<Future<Void>> futures) {
+ for (Future<Void> future : futures) {
+ future.cancel(true);
+ }
+ }
+
+ private Callable<Void> readFromBlock(final BlockReader reader,
+ final ByteBuffer buf) {
+ return new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ actualReadFromBlock(reader, buf);
+ return null;
+ } catch (IOException e) {
+ LOG.info(e.getMessage());
+ throw e;
+ }
+ }
+
+ };
+ }
+
+ /**
+ * Read bytes from block
+ */
+ private void actualReadFromBlock(BlockReader reader, ByteBuffer buf)
+ throws IOException {
+ int len = buf.remaining();
+ int n = 0;
+ while (n < len) {
+ int nread = reader.read(buf);
+ if (nread <= 0) {
+ break;
+ }
+ n += nread;
+ }
+ }
+
+ // close block reader
+ private void closeBlockReader(BlockReader blockReader) {
+ try {
+ if (blockReader != null) {
+ blockReader.close();
+ }
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+ private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
+ return NetUtils.createSocketAddr(dnInfo.getXferAddr(
+ datanode.getDnConf().getConnectToDnViaHostname()));
+ }
+
+ private BlockReader newBlockReader(final ExtendedBlock block,
+ long startOffset, DatanodeInfo dnInfo) {
+ try {
+ InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo);
+ Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
+ block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ));
+ /*
+ * This can be further improved if the replica is local, then we can
+ * read directly from DN and need to check the replica is FINALIZED
+ * state, notice we should not use short-circuit local read which
+ * requires config for domain-socket in UNIX or legacy config in Windows.
+ */
+ return RemoteBlockReader2.newBlockReader(
+ "dummy", block, blockToken, startOffset, block.getNumBytes(), true,
+ "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo,
+ null, cachingStrategy);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+ throws IOException {
+ Peer peer = null;
+ boolean success = false;
+ Socket sock = null;
+ final int socketTimeout = datanode.getDnConf().getSocketTimeout();
+ try {
+ sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+ NetUtils.connect(sock, addr, socketTimeout);
+ peer = TcpPeerServer.peerFromSocketAndKey(datanode.getSaslClient(),
+ sock, datanode.getDataEncryptionKeyFactoryForBlock(b),
+ blockToken, datanodeId);
+ peer.setReadTimeout(socketTimeout);
+ success = true;
+ return peer;
+ } finally {
+ if (!success) {
+ IOUtils.cleanup(LOG, peer);
+ IOUtils.closeSocket(sock);
+ }
+ }
+ }
+
+ /**
+ * Send data to targets
+ */
+ private int transferData2Targets(boolean[] targetsStatus) {
+ int nsuccess = 0;
+ for (int i = 0; i < targets.length; i++) {
+ if (targetsStatus[i]) {
+ boolean success = false;
+ try {
+ ByteBuffer buffer = targetBuffers[i];
+
+ if (buffer.remaining() == 0) {
+ continue;
+ }
+
+ checksum.calculateChunkedSums(
+ buffer.array(), 0, buffer.remaining(), checksumBuf, 0);
+
+ int ckOff = 0;
+ while (buffer.remaining() > 0) {
+ DFSPacket packet = new DFSPacket(packetBuf, maxChunksPerPacket,
+ blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, false);
+ int maxBytesToPacket = maxChunksPerPacket * bytesPerChecksum;
+ int toWrite = buffer.remaining() > maxBytesToPacket ?
+ maxBytesToPacket : buffer.remaining();
+ int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * checksumSize;
+ packet.writeChecksum(checksumBuf, ckOff, ckLen);
+ ckOff += ckLen;
+ packet.writeData(buffer, toWrite);
+
+ // Send packet
+ packet.writeTo(targetOutputStreams[i]);
+
+ blockOffset4Targets[i] += toWrite;
+ nsuccess++;
+ success = true;
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ }
+ targetsStatus[i] = success;
+ }
+ }
+ return nsuccess;
+ }
+
+ /**
+ * clear all buffers
+ */
+ private void clearBuffers() {
+ for (StripedReader stripedReader : stripedReaders) {
+ if (stripedReader.buffer != null) {
+ stripedReader.buffer.clear();
+ }
+ }
+
+ for (int i = 0; i < targetBuffers.length; i++) {
+ if (targetBuffers[i] != null) {
+ cleanBuffer(targetBuffers[i]);
+ }
+ }
+ }
+
+ private ByteBuffer cleanBuffer(ByteBuffer buffer) {
+ Arrays.fill(buffer.array(), (byte) 0);
+ return (ByteBuffer)buffer.clear();
+ }
+
+ // send an empty packet to mark the end of the block
+ private void endTargetBlocks(boolean[] targetsStatus) {
+ for (int i = 0; i < targets.length; i++) {
+ if (targetsStatus[i]) {
+ try {
+ DFSPacket packet = new DFSPacket(packetBuf, 0,
+ blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, true);
+ packet.writeTo(targetOutputStreams[i]);
+ targetOutputStreams[i].flush();
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ }
+ }
+ }
+ }
+
+ /**
+ * Initialize output/input streams for transferring data to target
+ * and send create block request.
+ */
+ private int initTargetStreams(boolean[] targetsStatus) {
+ int nsuccess = 0;
+ for (int i = 0; i < targets.length; i++) {
+ Socket socket = null;
+ DataOutputStream out = null;
+ DataInputStream in = null;
+ boolean success = false;
+ try {
+ InetSocketAddress targetAddr =
+ getSocketAddress4Transfer(targets[i]);
+ socket = datanode.newSocket();
+ NetUtils.connect(socket, targetAddr,
+ datanode.getDnConf().getSocketTimeout());
+ socket.setSoTimeout(datanode.getDnConf().getSocketTimeout());
+
+ ExtendedBlock block = getBlock(blockGroup, targetIndices[i]);
+ Token<BlockTokenIdentifier> blockToken =
+ datanode.getBlockAccessToken(block,
+ EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
+
+ long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
+ OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout);
+ InputStream unbufIn = NetUtils.getInputStream(socket);
+ DataEncryptionKeyFactory keyFactory =
+ datanode.getDataEncryptionKeyFactoryForBlock(block);
+ IOStreamPair saslStreams = datanode.getSaslClient().socketSend(
+ socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]);
+
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
+
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+ HdfsServerConstants.SMALL_BUFFER_SIZE));
+ in = new DataInputStream(unbufIn);
+
+ DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId());
+ new Sender(out).writeBlock(block, targetStorageTypes[i],
+ blockToken, "", new DatanodeInfo[]{targets[i]},
+ new StorageType[]{targetStorageTypes[i]}, source,
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0,
+ checksum, cachingStrategy, false, false, null);
+
+ targetSockets[i] = socket;
+ targetOutputStreams[i] = out;
+ targetInputStreams[i] = in;
+ nsuccess++;
+ success = true;
+ } catch (Throwable e) {
+ LOG.warn(e.getMessage());
+ } finally {
+ if (!success) {
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(socket);
+ }
+ }
+ targetsStatus[i] = success;
+ }
+ return nsuccess;
+ }
+ }
+
+ private class StripedReader {
+ short index;
+ BlockReader blockReader;
+ ByteBuffer buffer;
+
+ public StripedReader(short index) {
+ this.index = index;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 24d4bfb..45bbf6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.util;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -77,10 +78,8 @@ public class StripedBlockUtil {
public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
int idxInReturnedLocs, int cellSize, int dataBlkNum,
int idxInBlockGroup) {
- final ExtendedBlock blk = new ExtendedBlock(bg.getBlock());
- blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup);
- blk.setNumBytes(getInternalBlockLength(bg.getBlockSize(),
- cellSize, dataBlkNum, idxInBlockGroup));
+ final ExtendedBlock blk = constructInternalBlock(
+ bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup);
return new LocatedBlock(blk,
new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
@@ -91,6 +90,44 @@ public class StripedBlockUtil {
}
/**
+ * This method creates an internal {@link ExtendedBlock} at the given index
+ * of a block group.
+ */
+ public static ExtendedBlock constructInternalBlock(ExtendedBlock blockGroup,
+ int cellSize, int dataBlkNum, int idxInBlockGroup) {
+ ExtendedBlock block = new ExtendedBlock(blockGroup);
+ block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup);
+ block.setNumBytes(getInternalBlockLength(blockGroup.getNumBytes(),
+ cellSize, dataBlkNum, idxInBlockGroup));
+ return block;
+ }
+
+ /**
+ * This method creates an internal {@link ExtendedBlock} at the given index
+ * of a block group, for both data and parity block.
+ */
+ public static ExtendedBlock constructStripedBlock(ExtendedBlock blockGroup,
+ int cellSize, int dataBlkNum, int idxInBlockGroup) {
+ ExtendedBlock block = new ExtendedBlock(blockGroup);
+ block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup);
+ block.setNumBytes(getStripedBlockLength(blockGroup.getNumBytes(), cellSize,
+ dataBlkNum, idxInBlockGroup));
+ return block;
+ }
+
+ /**
+ * Returns an internal block length at the given index of a block group,
+ * for both data and parity block.
+ */
+ public static long getStripedBlockLength(long numBytes, int cellSize,
+ int dataBlkNum, int idxInBlockGroup) {
+ // parity block length is the same as the first striped block length.
+ return StripedBlockUtil.getInternalBlockLength(
+ numBytes, cellSize, dataBlkNum,
+ idxInBlockGroup < dataBlkNum ? idxInBlockGroup : 0);
+ }
+
+ /**
* Get the size of an internal block at the given index of a block group
*
* @param dataSize Size of the block group only counting data blocks
@@ -208,8 +245,8 @@ public class StripedBlockUtil {
* @throws InterruptedException
*/
public static StripedReadResult getNextCompletedStripedRead(
- CompletionService<Void> readService, Map<Future<Void>,
- Integer> futures, final long threshold) throws InterruptedException {
+ CompletionService<Void> readService, Map<Future<Void>, Integer> futures,
+ final long threshold) throws InterruptedException {
Preconditions.checkArgument(!futures.isEmpty());
Preconditions.checkArgument(threshold > 0);
Future<Void> future = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 7f0730b..f802128 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2312,11 +2312,11 @@
</description>
</property>
- <property>
- <name>dfs.datanode.block-pinning.enabled</name>
- <value>false</value>
- <description>Whether pin blocks on favored DataNode.</description>
- </property>
+<property>
+ <name>dfs.datanode.block-pinning.enabled</name>
+ <value>false</value>
+ <description>Whether pin blocks on favored DataNode.</description>
+</property>
<property>
<name>dfs.client.block.write.locateFollowingBlock.initial.delay.ms</name>
@@ -2354,4 +2354,25 @@
</description>
</property>
+<property>
+ <name>dfs.datanode.stripedread.threshold.millis</name>
+ <value>5000</value>
+ <description>datanode striped read threshold in millisecond.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.stripedread.threads</name>
+ <value>20</value>
+ <description>datanode striped read thread pool size.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.stripedread.buffer.size</name>
+ <value>262144</value>
+ <description>datanode striped read buffer size.
+ </description>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb8ed4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
new file mode 100644
index 0000000..b4f05d4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestRecoverStripedFile {
+ public static final Log LOG = LogFactory.getLog(TestRecoverStripedFile.class);
+
+ private static final int dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS;
+ private static final int parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS;
+ private static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private static final int blockSize = cellSize * 3;
+ private static final int groupSize = dataBlkNum + parityBlkNum;
+ private static final int dnNum = groupSize + parityBlkNum;
+
+ private MiniDFSCluster cluster;
+ private Configuration conf;
+ private DistributedFileSystem fs;
+ // Map: DatanodeID -> datanode index in cluster
+ private Map<DatanodeID, Integer> dnMap = new HashMap<DatanodeID, Integer>();
+
+ @Before
+ public void setup() throws IOException {
+ conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, cellSize - 1);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();;
+ cluster.waitActive();
+
+ fs = cluster.getFileSystem();
+ fs.getClient().createErasureCodingZone("/", null);
+
+ List<DataNode> datanodes = cluster.getDataNodes();
+ for (int i = 0; i < dnNum; i++) {
+ dnMap.put(datanodes.get(i).getDatanodeId(), i);
+ }
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test(timeout = 120000)
+ public void testRecoverOneParityBlock() throws Exception {
+ int fileLen = 10 * blockSize + blockSize/10;
+ assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen, 0, 1);
+ }
+
+ @Test(timeout = 120000)
+ public void testRecoverThreeParityBlocks() throws Exception {
+ int fileLen = 3 * blockSize + blockSize/10;
+ assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, 0, 3);
+ }
+
+ @Test(timeout = 120000)
+ public void testRecoverThreeDataBlocks() throws Exception {
+ int fileLen = 3 * blockSize + blockSize/10;
+ assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, 1, 3);
+ }
+
+ @Test(timeout = 120000)
+ public void testRecoverOneDataBlock() throws Exception {
+ ////TODO: TODO: wait for HADOOP-11847
+ //int fileLen = 10 * blockSize + blockSize/10;
+ //assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1);
+ }
+
+ @Test(timeout = 120000)
+ public void testRecoverAnyBlocks() throws Exception {
+ ////TODO: TODO: wait for HADOOP-11847
+ //int fileLen = 3 * blockSize + blockSize/10;
+ //assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2);
+ }
+
+ /**
+ * Test the file blocks recovery.
+ * 1. Check the replica is recovered in the target datanode,
+ * and verify the block replica length, generationStamp and content.
+ * 2. Read the file and verify content.
+ */
+ private void assertFileBlocksRecovery(String fileName, int fileLen,
+ int recovery, int toRecoverBlockNum) throws Exception {
+ if (recovery != 0 && recovery != 1 && recovery != 2) {
+ Assert.fail("Invalid recovery: 0 is to recovery parity blocks,"
+ + "1 is to recovery data blocks, 2 is any.");
+ }
+ if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
+ Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);
+ }
+
+ Path file = new Path(fileName);
+
+ testCreateStripedFile(file, fileLen);
+
+ LocatedBlocks locatedBlocks = getLocatedBlocks(file);
+ assertEquals(locatedBlocks.getFileLength(), fileLen);
+
+ LocatedStripedBlock lastBlock =
+ (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
+
+ DatanodeInfo[] storageInfos = lastBlock.getLocations();
+ int[] indices = lastBlock.getBlockIndices();
+
+ BitSet bitset = new BitSet(dnNum);
+ for (DatanodeInfo storageInfo : storageInfos) {
+ bitset.set(dnMap.get(storageInfo));
+ }
+
+ int[] toDead = new int[toRecoverBlockNum];
+ int n = 0;
+ for (int i = 0; i < indices.length; i++) {
+ if (n < toRecoverBlockNum) {
+ if (recovery == 0) {
+ if (indices[i] >= dataBlkNum) {
+ toDead[n++] = i;
+ }
+ } else if (recovery == 1) {
+ if (indices[i] < dataBlkNum) {
+ toDead[n++] = i;
+ }
+ } else {
+ toDead[n++] = i;
+ }
+ } else {
+ break;
+ }
+ }
+
+ DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum];
+ int[] deadDnIndices = new int[toRecoverBlockNum];
+ ExtendedBlock[] blocks = new ExtendedBlock[toRecoverBlockNum];
+ File[] replicas = new File[toRecoverBlockNum];
+ File[] metadatas = new File[toRecoverBlockNum];
+ byte[][] replicaContents = new byte[toRecoverBlockNum][];
+ for (int i = 0; i < toRecoverBlockNum; i++) {
+ dataDNs[i] = storageInfos[toDead[i]];
+ deadDnIndices[i] = dnMap.get(dataDNs[i]);
+
+ // Check the block replica file on deadDn before it dead.
+ blocks[i] = StripedBlockUtil.constructStripedBlock(
+ lastBlock.getBlock(), cellSize, dataBlkNum, indices[toDead[i]]);
+ replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]);
+ metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]);
+ // the block replica on the datanode should be the same as expected
+ assertEquals(replicas[i].length(),
+ StripedBlockUtil.getStripedBlockLength(
+ lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]]));
+ assertTrue(metadatas[i].getName().
+ endsWith(blocks[i].getGenerationStamp() + ".meta"));
+ replicaContents[i] = readReplica(replicas[i]);
+ }
+
+ try {
+ DatanodeID[] dnIDs = new DatanodeID[toRecoverBlockNum];
+ for (int i = 0; i < toRecoverBlockNum; i++) {
+ /*
+ * Kill the datanode which contains one replica
+ * We need to make sure it dead in namenode: clear its update time and
+ * trigger NN to check heartbeat.
+ */
+ DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]);
+ dn.shutdown();
+ dnIDs[i] = dn.getDatanodeId();
+ }
+ setDataNodesDead(dnIDs);
+
+
+ // Check the locatedBlocks of the file again
+ locatedBlocks = getLocatedBlocks(file);
+ lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
+ storageInfos = lastBlock.getLocations();
+ assertEquals(storageInfos.length, groupSize - toRecoverBlockNum);
+
+ int[] targetDNs = new int[dnNum - groupSize];
+ n = 0;
+ for (int i = 0; i < dnNum; i++) {
+ if (!bitset.get(i)) { // not contain replica of the block.
+ targetDNs[n++] = i;
+ }
+ }
+
+ waitForRecoveryFinished(file);
+
+ targetDNs = sortTargetsByReplicas(blocks, targetDNs);
+
+ // Check the replica on the new target node.
+ for (int i = 0; i < toRecoverBlockNum; i++) {
+ File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]);
+ File metadataAfterRecovery =
+ cluster.getBlockMetadataFile(targetDNs[i], blocks[i]);
+ assertEquals(replicaAfterRecovery.length(), replicas[i].length());
+ assertTrue(metadataAfterRecovery.getName().
+ endsWith(blocks[i].getGenerationStamp() + ".meta"));
+ byte[] replicaContentAfterRecovery = readReplica(replicaAfterRecovery);
+
+ Assert.assertArrayEquals(replicaContents[i], replicaContentAfterRecovery);
+ }
+ } finally {
+ for (int i = 0; i < toRecoverBlockNum; i++) {
+ restartDataNode(toDead[i]);
+ }
+ cluster.waitActive();
+ }
+ fs.delete(file, true);
+ }
+
+ private void setDataNodesDead(DatanodeID[] dnIDs) throws IOException {
+ for (DatanodeID dn : dnIDs) {
+ DatanodeDescriptor dnd =
+ NameNodeAdapter.getDatanode(cluster.getNamesystem(), dn);
+ DFSTestUtil.setDatanodeDead(dnd);
+ }
+
+ BlockManagerTestUtil.checkHeartbeat(cluster.getNamesystem().getBlockManager());
+ }
+
+ private void restartDataNode(int dn) {
+ try {
+ cluster.restartDataNode(dn, true, true);
+ } catch (IOException e) {
+ }
+ }
+
+ private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) {
+ int[] result = new int[blocks.length];
+ for (int i = 0; i < blocks.length; i++) {
+ result[i] = -1;
+ for (int j = 0; j < targetDNs.length; j++) {
+ if (targetDNs[j] != -1) {
+ File replica = cluster.getBlockFile(targetDNs[j], blocks[i]);
+ if (replica != null) {
+ result[i] = targetDNs[j];
+ targetDNs[j] = -1;
+ break;
+ }
+ }
+ }
+ if (result[i] == -1) {
+ Assert.fail("Failed to recover striped block: " + blocks[i].getBlockId());
+ }
+ }
+ return result;
+ }
+
+ private byte[] readReplica(File replica) throws IOException {
+ int length = (int)replica.length();
+ ByteArrayOutputStream content = new ByteArrayOutputStream(length);
+ FileInputStream in = new FileInputStream(replica);
+ try {
+ byte[] buffer = new byte[1024];
+ int total = 0;
+ while (total < length) {
+ int n = in.read(buffer);
+ if (n <= 0) {
+ break;
+ }
+ content.write(buffer, 0, n);
+ total += n;
+ }
+ if (total < length) {
+ Assert.fail("Failed to read all content of replica");
+ }
+ return content.toByteArray();
+ } finally {
+ in.close();
+ }
+ }
+
+ private LocatedBlocks waitForRecoveryFinished(Path file) throws Exception {
+ final int ATTEMPTS = 60;
+ for (int i = 0; i < ATTEMPTS; i++) {
+ LocatedBlocks locatedBlocks = getLocatedBlocks(file);
+ LocatedStripedBlock lastBlock =
+ (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
+ DatanodeInfo[] storageInfos = lastBlock.getLocations();
+ if (storageInfos.length >= groupSize) {
+ return locatedBlocks;
+ }
+ Thread.sleep(1000);
+ }
+ throw new IOException ("Time out waiting for EC block recovery.");
+ }
+
+ private LocatedBlocks getLocatedBlocks(Path file) throws IOException {
+ return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE);
+ }
+
+ private void testCreateStripedFile(Path file, int dataLen)
+ throws IOException {
+ final byte[] data = new byte[dataLen];
+ DFSUtil.getRandom().nextBytes(data);
+ writeContents(file, data);
+ }
+
+ void writeContents(Path file, byte[] contents)
+ throws IOException {
+ FSDataOutputStream out = fs.create(file);
+ try {
+ out.write(contents, 0, contents.length);
+ } finally {
+ out.close();
+ }
+ }
+}