You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/04 19:57:40 UTC
[06/50] hadoop git commit: HDFS-7782. Erasure coding: pread from
files in striped layout. Contributed by Zhe Zhang and Jing Zhao
HDFS-7782. Erasure coding: pread from files in striped layout. Contributed by Zhe Zhang and Jing Zhao
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fb386505
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fb386505
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fb386505
Branch: refs/heads/HDFS-7285
Commit: fb386505a86d739e034664b0b58236ed9dded08c
Parents: c284ac0
Author: Zhe Zhang <zh...@apache.org>
Authored: Tue Apr 7 11:20:13 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 4 10:11:40 2015 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/protocol/LocatedBlock.java | 4 +
.../java/org/apache/hadoop/hdfs/DFSClient.java | 55 +++
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +-
.../org/apache/hadoop/hdfs/DFSInputStream.java | 80 +++-
.../hadoop/hdfs/DFSStripedInputStream.java | 367 +++++++++++++++++++
.../hdfs/protocol/LocatedStripedBlock.java | 5 +
.../blockmanagement/BlockInfoStriped.java | 6 +-
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 92 ++++-
.../apache/hadoop/hdfs/TestReadStripedFile.java | 304 +++++++++++++++
.../namenode/TestRecoverStripedBlocks.java | 88 +----
10 files changed, 896 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb386505/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index 4e8f202..a9596bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -203,4 +203,8 @@ public class LocatedBlock {
+ "; locs=" + Arrays.asList(locs)
+ "}";
}
+
+ public boolean isStriped() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb386505/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 09ffd95..5ad043b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -237,6 +237,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
new DFSHedgedReadMetrics();
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
+ private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
private final Sampler<?> traceSampler;
public DfsClientConf getConf() {
@@ -373,6 +374,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize());
}
+ numThreads = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE,
+ DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE);
+ if (numThreads <= 0) {
+ LOG.warn("The value of "
+ + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE
+ + " must be greater than 0. The current setting is " + numThreads
+ + ". Reset it to the default value "
+ + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE);
+ numThreads =
+ DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE;
+ }
+ this.initThreadsNumForStripedReads(numThreads);
this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
@@ -3153,11 +3167,52 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
LOG.debug("Using hedged reads; pool threads=" + num);
}
}
+
+ /**
+ * Create thread pool for parallel reading in striped layout,
+ * STRIPED_READ_THREAD_POOL, if it does not already exist.
+ * @param num Number of threads for striped reads thread pool.
+ */
+ private void initThreadsNumForStripedReads(int num) {
+ assert num > 0;
+ if (STRIPED_READ_THREAD_POOL != null) {
+ return;
+ }
+ synchronized (DFSClient.class) {
+ if (STRIPED_READ_THREAD_POOL == null) {
+ STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new Daemon.DaemonFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = super.newThread(r);
+ t.setName("stripedRead-" + threadIndex.getAndIncrement());
+ return t;
+ }
+ }, new ThreadPoolExecutor.CallerRunsPolicy() {
+ @Override
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
+ LOG.info("Execution for striped reading rejected, "
+ + "Executing in current thread");
+ // will run in the current thread
+ super.rejectedExecution(runnable, e);
+ }
+ });
+ STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
+ }
+ }
+ }
ThreadPoolExecutor getHedgedReadsThreadPool() {
return HEDGED_READ_THREAD_POOL;
}
+ ThreadPoolExecutor getStripedReadsThreadPool() {
+ return STRIPED_READ_THREAD_POOL;
+ }
+
boolean isHedgedReadsEnabled() {
return (HEDGED_READ_THREAD_POOL != null) &&
HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb386505/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 2dbfbd0..42c5f94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -660,7 +660,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.reject-unresolved-dn-topology-mapping";
public static final boolean DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT =
false;
-
+
+ public static final String DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE =
+ "dfs.client.striped.read.threadpool.size";
+ // With default 3+2 schema, each normal read could span 3 DNs. So this
+ // default value accommodates 6 read streams
+ public static final int DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE = 18;
+
// Slow io warning log threshold settings for dfsclient and datanode.
public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
"dfs.datanode.slow.io.warning.threshold.ms";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb386505/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 3290223..79bbd54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -43,6 +43,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
@@ -93,7 +94,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@VisibleForTesting
public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
- private final DFSClient dfsClient;
+ protected final DFSClient dfsClient;
private AtomicBoolean closed = new AtomicBoolean(false);
private final String src;
private final boolean verifyChecksum;
@@ -440,7 +441,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @return located block
* @throws IOException
*/
- private LocatedBlock getBlockAt(long offset) throws IOException {
+ protected LocatedBlock getBlockAt(long offset) throws IOException {
synchronized(infoLock) {
assert (locatedBlocks != null) : "locatedBlocks is null";
@@ -712,7 +713,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* Wraps different possible read implementations so that readBuffer can be
* strategy-agnostic.
*/
- private interface ReaderStrategy {
+ interface ReaderStrategy {
public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException;
}
@@ -1055,7 +1056,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
return errMsgr.toString();
}
- private void fetchBlockByteRange(long blockStartOffset, long start, long end,
+ protected void fetchBlockByteRange(long blockStartOffset, long start, long end,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
@@ -1097,13 +1098,42 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
};
}
+ /**
+ * Used when reading contiguous blocks
+ */
private void actualGetFromOneDataNode(final DNAddrPair datanode,
long blockStartOffset, final long start, final long end, byte[] buf,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
+ final int length = (int) (end - start + 1);
+ actualGetFromOneDataNode(datanode, block, start, end, buf,
+ new int[]{offset}, new int[]{length}, corruptedBlockMap);
+ }
+
+ /**
+ * Read data from one DataNode.
+ * @param datanode the datanode from which to read data
+ * @param block the block to read
+ * @param startInBlk the startInBlk offset of the block
+ * @param endInBlk the endInBlk offset of the block
+ * @param buf the given byte array into which the data is read
+ * @param offsets the data may be read into multiple segments of the buf
+ * (when reading a striped block). this array indicates the
+ * offset of each buf segment.
+ * @param lengths the length of each buf segment
+ * @param corruptedBlockMap map recording list of datanodes with corrupted
+ * block replica
+ */
+ void actualGetFromOneDataNode(final DNAddrPair datanode,
+ LocatedBlock block, final long startInBlk, final long endInBlk,
+ byte[] buf, int[] offsets, int[] lengths,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
+ final int len = (int) (endInBlk - startInBlk + 1);
+ checkReadPortions(offsets, lengths, len);
while (true) {
// cached block locations may have been updated by chooseDataNode()
@@ -1113,15 +1143,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
BlockReader reader = null;
try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
- int len = (int) (end - start + 1);
reader = getBlockReader(block, start, len, datanode.addr,
datanode.storageType, datanode.info);
- int nread = reader.readAll(buf, offset, len);
- updateReadStatistics(readStatistics, nread, reader);
-
- if (nread != len) {
- throw new IOException("truncated return from reader.read(): " +
- "excpected " + len + ", got " + nread);
+ for (int i = 0; i < offsets.length; i++) {
+ int nread = reader.readAll(buf, offsets[i], lengths[i]);
+ updateReadStatistics(readStatistics, nread, reader);
+ if (nread != len) {
+ throw new IOException("truncated return from reader.read(): " +
+ "excpected " + len + ", got " + nread);
+ }
}
DFSClientFaultInjector.get().readFromDatanodeDelay();
return;
@@ -1166,7 +1196,26 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/**
- * Like {@link #fetchBlockByteRange} except we start up a second, parallel,
+ * This method verifies that the read portions are valid and do not overlap
+ * with each other.
+ */
+ private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
+ Preconditions.checkArgument(offsets.length == lengths.length &&
+ offsets.length > 0);
+ int sum = 0;
+ for (int i = 0; i < lengths.length; i++) {
+ if (i > 0) {
+ int gap = offsets[i] - offsets[i - 1];
+ // make sure read portions do not overlap with each other
+ Preconditions.checkArgument(gap >= lengths[i - 1]);
+ }
+ sum += lengths[i];
+ }
+ Preconditions.checkArgument(sum == totalLen);
+ }
+
+ /**
+ * Like {@link #fetchBlockByteRange}except we start up a second, parallel,
* 'hedged' read if the first read is taking longer than configured amount of
* time. We then wait on which ever read returns first.
*/
@@ -1385,10 +1434,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
- if (dfsClient.isHedgedReadsEnabled()) {
+ if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
- targetStart + bytesToRead - 1, buffer, offset,
- corruptedBlockMap);
+ targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
} else {
fetchBlockByteRange(blk.getStartOffset(), targetStart,
targetStart + bytesToRead - 1, buffer, offset,
@@ -1584,7 +1632,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/** Utility class to encapsulate data node info and its address. */
- private static final class DNAddrPair {
+ static final class DNAddrPair {
final DatanodeInfo info;
final InetSocketAddress addr;
final StorageType storageType;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb386505/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
new file mode 100644
index 0000000..077b0f8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+
+/******************************************************************************
+ * DFSStripedInputStream reads from striped block groups, illustrated below:
+ *
+ * | <- Striped Block Group -> |
+ * blk_0 blk_1 blk_2 <- A striped block group has
+ * | | | {@link #groupSize} blocks
+ * v v v
+ * +------+ +------+ +------+
+ * |cell_0| |cell_1| |cell_2| <- The logical read order should be
+ * +------+ +------+ +------+ cell_0, cell_1, ...
+ * |cell_3| |cell_4| |cell_5|
+ * +------+ +------+ +------+
+ * |cell_6| |cell_7| |cell_8|
+ * +------+ +------+ +------+
+ * |cell_9|
+ * +------+ <- A cell contains {@link #cellSize} bytes of data
+ *
+ * Three styles of read will eventually be supported:
+ * 1. Stateful read: TODO: HDFS-8033
+ * 2. pread without decode support
+ * This is implemented by calculating the portion of read from each block and
+ * issuing requests to each DataNode in parallel.
+ * 3. pread with decode support: TODO: will be supported after HDFS-7678
+ *****************************************************************************/
+public class DFSStripedInputStream extends DFSInputStream {
+ /**
+ * This method plans the read portion from each block in the stripe
+ * @param groupSize The size / width of the striping group
+ * @param cellSize The size of each striping cell
+ * @param startInBlk Starting offset in the striped block
+ * @param len Length of the read request
+ * @param bufOffset Initial offset in the result buffer
+ * @return array of {@link ReadPortion}, each representing the portion of I/O
+ * for an individual block in the group
+ */
+ @VisibleForTesting
+ static ReadPortion[] planReadPortions(final int groupSize,
+ final int cellSize, final long startInBlk, final int len, int bufOffset) {
+ ReadPortion[] results = new ReadPortion[groupSize];
+ for (int i = 0; i < groupSize; i++) {
+ results[i] = new ReadPortion();
+ }
+
+ // cellIdxInBlk is the index of the cell in the block
+ // E.g., cell_3 is the 2nd cell in blk_0
+ int cellIdxInBlk = (int) (startInBlk / (cellSize * groupSize));
+
+ // blkIdxInGroup is the index of the block in the striped block group
+ // E.g., blk_2 is the 3rd block in the group
+ final int blkIdxInGroup = (int) (startInBlk / cellSize % groupSize);
+ results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
+ startInBlk % cellSize;
+ boolean crossStripe = false;
+ for (int i = 1; i < groupSize; i++) {
+ if (blkIdxInGroup + i >= groupSize && !crossStripe) {
+ cellIdxInBlk++;
+ crossStripe = true;
+ }
+ results[(blkIdxInGroup + i) % groupSize].startOffsetInBlock =
+ cellSize * cellIdxInBlk;
+ }
+
+ int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
+ results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
+ results[blkIdxInGroup].lengths.add(firstCellLen);
+ results[blkIdxInGroup].readLength += firstCellLen;
+
+ int i = (blkIdxInGroup + 1) % groupSize;
+ for (int done = firstCellLen; done < len; done += cellSize) {
+ ReadPortion rp = results[i];
+ rp.offsetsInBuf.add(done + bufOffset);
+ final int readLen = Math.min(len - done, cellSize);
+ rp.lengths.add(readLen);
+ rp.readLength += readLen;
+ i = (i + 1) % groupSize;
+ }
+ return results;
+ }
+
+ /**
+ * This method parses a striped block group into individual blocks.
+ *
+ * @param bg The striped block group
+ * @param dataBlkNum the number of data blocks
+ * @return An array containing the blocks in the group
+ */
+ @VisibleForTesting
+ static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
+ int dataBlkNum, int cellSize) {
+ int locatedBGSize = bg.getBlockIndices().length;
+ // TODO not considering missing blocks for now, only identify data blocks
+ LocatedBlock[] lbs = new LocatedBlock[dataBlkNum];
+ for (short i = 0; i < locatedBGSize; i++) {
+ final int idx = bg.getBlockIndices()[i];
+ if (idx < dataBlkNum && lbs[idx] == null) {
+ lbs[idx] = constructInternalBlock(bg, i, cellSize, idx);
+ }
+ }
+ return lbs;
+ }
+
+ private static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
+ int idxInReturnedLocs, int cellSize, int idxInBlockGroup) {
+ final ExtendedBlock blk = new ExtendedBlock(bg.getBlock());
+ blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup);
+ // TODO: fix the numBytes computation
+
+ return new LocatedBlock(blk,
+ new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
+ new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
+ new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
+ bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
+ null);
+ }
+
+
+ private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
+
+ DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)
+ throws IOException {
+ super(dfsClient, src, verifyChecksum);
+ DFSClient.LOG.debug("Creating an striped input stream for file " + src);
+ }
+
+ @Override
+ public synchronized int read(final ByteBuffer buf) throws IOException {
+ throw new UnsupportedActionException("Stateful read is not supported");
+ }
+
+ @Override
+ public synchronized int read(final byte buf[], int off, int len)
+ throws IOException {
+ throw new UnsupportedActionException("Stateful read is not supported");
+ }
+
+ /**
+ * | <--------- LocatedStripedBlock (ID = 0) ---------> |
+ * LocatedBlock (0) | LocatedBlock (1) | LocatedBlock (2)
+ * ^
+ * offset
+ * On a striped file, the super method {@link DFSInputStream#getBlockAt}
+ * treats a striped block group as a single {@link LocatedBlock} object,
+ * which includes target in its range. This method adds the logic of:
+ * 1. Analyzing the index of required block based on offset
+ * 2. Parsing the block group to obtain the block location on that index
+ */
+ @Override
+ protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
+ LocatedBlock lb = super.getBlockAt(blkStartOffset);
+ assert lb instanceof LocatedStripedBlock : "NameNode should return a " +
+ "LocatedStripedBlock for a striped file";
+
+ int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize)
+ % groupSize);
+ // If indexing information is returned, iterate through the index array
+ // to find the entry for position idx in the group
+ LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
+ int i = 0;
+ for (; i < lsb.getBlockIndices().length; i++) {
+ if (lsb.getBlockIndices()[i] == idx) {
+ break;
+ }
+ }
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("getBlockAt for striped blocks, offset="
+ + blkStartOffset + ". Obtained block " + lb + ", idx=" + idx);
+ }
+ return constructInternalBlock(lsb, i, cellSize, idx);
+ }
+
+ private LocatedBlock getBlockGroupAt(long offset) throws IOException {
+ return super.getBlockAt(offset);
+ }
+
+ /**
+ * Real implementation of pread.
+ */
+ @Override
+ protected void fetchBlockByteRange(LocatedBlock block, long start,
+ long end, byte[] buf, int offset,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
+ Map<Future<Void>, Integer> futures = new HashMap<>();
+ CompletionService<Void> stripedReadsService =
+ new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
+ int len = (int) (end - start + 1);
+
+ // Refresh the striped block group
+ block = getBlockGroupAt(block.getStartOffset());
+ assert block instanceof LocatedStripedBlock : "NameNode" +
+ " should return a LocatedStripedBlock for a striped file";
+ LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
+
+ // Planning the portion of I/O for each shard
+ ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, start,
+ len, offset);
+
+ // Parse group to get chosen DN location
+ LocatedBlock[] blks = parseStripedBlockGroup(blockGroup, groupSize, cellSize);
+
+ for (short i = 0; i < groupSize; i++) {
+ ReadPortion rp = readPortions[i];
+ if (rp.readLength <= 0) {
+ continue;
+ }
+ DatanodeInfo loc = blks[i].getLocations()[0];
+ StorageType type = blks[i].getStorageTypes()[0];
+ DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
+ loc.getXferAddr(dfsClient.getConf().connectToDnViaHostname)), type);
+ Callable<Void> readCallable = getFromOneDataNode(dnAddr, blks[i],
+ rp.startOffsetInBlock, rp.startOffsetInBlock + rp.readLength - 1, buf,
+ rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
+ Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
+ DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
+ futures.put(getFromDNRequest, (int) i);
+ }
+ while (!futures.isEmpty()) {
+ try {
+ waitNextCompletion(stripedReadsService, futures);
+ } catch (InterruptedException ie) {
+ // Ignore and retry
+ }
+ }
+ }
+
+ private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
+ final LocatedBlock block, final long start, final long end,
+ final byte[] buf, final int[] offsets, final int[] lengths,
+ final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
+ final int hedgedReadId) {
+ final Span parentSpan = Trace.currentSpan();
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ TraceScope scope =
+ Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan);
+ try {
+ actualGetFromOneDataNode(datanode, block, start,
+ end, buf, offsets, lengths, corruptedBlockMap);
+ } finally {
+ scope.close();
+ }
+ return null;
+ }
+ };
+ }
+
+ private void waitNextCompletion(CompletionService<Void> stripedReadsService,
+ Map<Future<Void>, Integer> futures) throws InterruptedException {
+ if (futures.isEmpty()) {
+ throw new InterruptedException("Futures already empty");
+ }
+ Future<Void> future = null;
+ try {
+ future = stripedReadsService.take();
+ future.get();
+ futures.remove(future);
+ } catch (ExecutionException | CancellationException e) {
+ // already logged in the Callable
+ futures.remove(future);
+ }
+ throw new InterruptedException("let's retry");
+ }
+
+ public void setCellSize(int cellSize) {
+ this.cellSize = cellSize;
+ }
+
+ /**
+ * This class represents the portion of I/O associated with each block in the
+ * striped block group.
+ */
+ static class ReadPortion {
+ /**
+ * startOffsetInBlock
+ * |
+ * v
+ * |<-lengths[0]->|<- lengths[1] ->|<-lengths[2]->|
+ * +------------------+------------------+----------------+
+ * | cell_0 | cell_3 | cell_6 | <- blk_0
+ * +------------------+------------------+----------------+
+ * _/ \_______________________
+ * | |
+ * v offsetsInBuf[0] v offsetsInBuf[1]
+ * +------------------------------------------------------+
+ * | cell_0 | cell_1 and cell_2 |cell_3 ...| <- buf
+ * | (partial) | (from blk_1 and blk_2) | |
+ * +------------------------------------------------------+
+ */
+ private long startOffsetInBlock = 0;
+ private long readLength = 0;
+ private final List<Integer> offsetsInBuf = new ArrayList<>();
+ private final List<Integer> lengths = new ArrayList<>();
+
+ int[] getOffsets() {
+ int[] offsets = new int[offsetsInBuf.size()];
+ for (int i = 0; i < offsets.length; i++) {
+ offsets[i] = offsetsInBuf.get(i);
+ }
+ return offsets;
+ }
+
+ int[] getLengths() {
+ int[] lens = new int[this.lengths.size()];
+ for (int i = 0; i < lens.length; i++) {
+ lens[i] = this.lengths.get(i);
+ }
+ return lens;
+ }
+
+ long getReadLength() {
+ return readLength;
+ }
+
+ long getStartOffsetInBlock() {
+ return startOffsetInBlock;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb386505/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
index 97e3a69..98614db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
@@ -65,4 +65,9 @@ public class LocatedStripedBlock extends LocatedBlock {
public int[] getBlockIndices() {
return this.blockIndices;
}
+
+ @Override
+ public boolean isStriped() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb386505/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 4a85efb..20b0c5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import java.io.DataOutput;
import java.io.IOException;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
/**
* Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
@@ -203,8 +203,8 @@ public class BlockInfoStriped extends BlockInfo {
// In case striped blocks, total usage by this striped blocks should
// be the total of data blocks and parity blocks because
// `getNumBytes` is the total of actual data block size.
- return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CHUNK_SIZE) + 1)
- * BLOCK_STRIPED_CHUNK_SIZE * parityBlockNum + getNumBytes();
+ return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CELL_SIZE) + 1)
+ * BLOCK_STRIPED_CELL_SIZE * parityBlockNum + getNumBytes();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb386505/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 4fb9ce4..2c11179 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -64,6 +64,12 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
@@ -100,6 +106,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -107,7 +114,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@@ -123,8 +129,10 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
@@ -132,7 +140,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
-import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
@@ -152,12 +159,8 @@ import org.apache.log4j.Level;
import org.junit.Assume;
import org.mockito.internal.util.reflection.Whitebox;
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
/** Utilities for HDFS tests */
public class DFSTestUtil {
@@ -1807,4 +1810,77 @@ public class DFSTestUtil {
reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
return reports;
}
+
+ public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
+ int numBlocks, int numStripesPerBlk) throws Exception {
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ dfs.mkdirs(dir);
+ dfs.getClient().createErasureCodingZone(dir.toString());
+
+ FSDataOutputStream out = null;
+ try {
+ out = dfs.create(file, (short) 1); // create an empty file
+
+ FSNamesystem ns = cluster.getNamesystem();
+ FSDirectory fsdir = ns.getFSDirectory();
+ INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
+
+ ExtendedBlock previous = null;
+ for (int i = 0; i < numBlocks; i++) {
+ Block newBlock = createBlock(cluster.getDataNodes(), dfs, ns,
+ file.toString(), fileNode, dfs.getClient().getClientName(),
+ previous, numStripesPerBlk);
+ previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
+ }
+
+ dfs.getClient().namenode.complete(file.toString(),
+ dfs.getClient().getClientName(), previous, fileNode.getId());
+ } finally {
+ IOUtils.cleanup(null, out);
+ }
+ }
+
+ static Block createBlock(List<DataNode> dataNodes, DistributedFileSystem fs,
+ FSNamesystem ns, String file, INodeFile fileNode, String clientName,
+ ExtendedBlock previous, int numStripes) throws Exception {
+ fs.getClient().namenode.addBlock(file, clientName, previous, null,
+ fileNode.getId(), null);
+
+ final BlockInfo lastBlock = fileNode.getLastBlock();
+ final int groupSize = fileNode.getBlockReplication();
+ // 1. RECEIVING_BLOCK IBR
+ int i = 0;
+ for (DataNode dn : dataNodes) {
+ if (i < groupSize) {
+ final Block block = new Block(lastBlock.getBlockId() + i++, 0,
+ lastBlock.getGenerationStamp());
+ DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+ .makeReportForReceivedBlock(block,
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
+ for (StorageReceivedDeletedBlocks report : reports) {
+ ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+ }
+ }
+ }
+
+ // 2. RECEIVED_BLOCK IBR
+ i = 0;
+ for (DataNode dn : dataNodes) {
+ if (i < groupSize) {
+ final Block block = new Block(lastBlock.getBlockId() + i++,
+ numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp());
+ DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+ .makeReportForReceivedBlock(block,
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+ for (StorageReceivedDeletedBlocks report : reports) {
+ ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+ }
+ }
+ }
+
+ lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS);
+ return lastBlock;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb386505/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
new file mode 100644
index 0000000..0032bdd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestReadStripedFile {
+
+ public static final Log LOG = LogFactory.getLog(TestReadStripedFile.class);
+
+ private MiniDFSCluster cluster;
+ private Configuration conf = new Configuration();
+ private DistributedFileSystem fs;
+ private final Path dirPath = new Path("/striped");
+ private Path filePath = new Path(dirPath, "file");
+ private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS;
+ private final short TOTAL_SIZE = HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
+ private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private final int NUM_STRIPE_PER_BLOCK = 2;
+ private final int BLOCKSIZE = 2 * GROUP_SIZE * CELLSIZE;
+
+ @Before
+ public void setup() throws IOException {
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
+ SimulatedFSDataset.setFactory(conf);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(TOTAL_SIZE)
+ .build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private void testPlanReadPortions(int startInBlk, int length,
+ int bufferOffset, int[] readLengths, int[] offsetsInBlock,
+ int[][] bufferOffsets, int[][] bufferLengths) {
+ ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE,
+ CELLSIZE, startInBlk, length, bufferOffset);
+ assertEquals(GROUP_SIZE, results.length);
+
+ for (int i = 0; i < GROUP_SIZE; i++) {
+ assertEquals(readLengths[i], results[i].getReadLength());
+ assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
+ final int[] bOffsets = results[i].getOffsets();
+ assertArrayEquals(bufferOffsets[i], bOffsets);
+ final int[] bLengths = results[i].getLengths();
+ assertArrayEquals(bufferLengths[i], bLengths);
+ }
+ }
+
+ /**
+ * Test {@link DFSStripedInputStream#planReadPortions}
+ */
+ @Test
+ public void testPlanReadPortions() {
+ /**
+ * start block offset is 0, read cellSize - 10
+ */
+ testPlanReadPortions(0, CELLSIZE - 10, 0,
+ new int[]{CELLSIZE - 10, 0, 0}, new int[]{0, 0, 0},
+ new int[][]{new int[]{0}, new int[]{}, new int[]{}},
+ new int[][]{new int[]{CELLSIZE - 10}, new int[]{}, new int[]{}});
+
+ /**
+ * start block offset is 0, read 3 * cellSize
+ */
+ testPlanReadPortions(0, GROUP_SIZE * CELLSIZE, 0,
+ new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, new int[]{0, 0, 0},
+ new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{CELLSIZE * 2}},
+ new int[][]{new int[]{CELLSIZE}, new int[]{CELLSIZE}, new int[]{CELLSIZE}});
+
+ /**
+ * start block offset is 0, read cellSize + 10
+ */
+ testPlanReadPortions(0, CELLSIZE + 10, 0,
+ new int[]{CELLSIZE, 10, 0}, new int[]{0, 0, 0},
+ new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{}},
+ new int[][]{new int[]{CELLSIZE}, new int[]{10}, new int[]{}});
+
+ /**
+ * start block offset is 0, read 5 * cellSize + 10, buffer start offset is 100
+ */
+ testPlanReadPortions(0, 5 * CELLSIZE + 10, 100,
+ new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE + 10}, new int[]{0, 0, 0},
+ new int[][]{new int[]{100, 100 + CELLSIZE * GROUP_SIZE},
+ new int[]{100 + CELLSIZE, 100 + CELLSIZE * 4},
+ new int[]{100 + CELLSIZE * 2, 100 + CELLSIZE * 5}},
+ new int[][]{new int[]{CELLSIZE, CELLSIZE},
+ new int[]{CELLSIZE, CELLSIZE},
+ new int[]{CELLSIZE, 10}});
+
+ /**
+ * start block offset is 2, read 3 * cellSize
+ */
+ testPlanReadPortions(2, GROUP_SIZE * CELLSIZE, 100,
+ new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
+ new int[]{2, 0, 0},
+ new int[][]{new int[]{100, 100 + GROUP_SIZE * CELLSIZE - 2},
+ new int[]{100 + CELLSIZE - 2},
+ new int[]{100 + CELLSIZE * 2 - 2}},
+ new int[][]{new int[]{CELLSIZE - 2, 2},
+ new int[]{CELLSIZE},
+ new int[]{CELLSIZE}});
+
+ /**
+ * start block offset is 2, read 3 * cellSize + 10
+ */
+ testPlanReadPortions(2, GROUP_SIZE * CELLSIZE + 10, 0,
+ new int[]{CELLSIZE + 10, CELLSIZE, CELLSIZE},
+ new int[]{2, 0, 0},
+ new int[][]{new int[]{0, GROUP_SIZE * CELLSIZE - 2},
+ new int[]{CELLSIZE - 2},
+ new int[]{CELLSIZE * 2 - 2}},
+ new int[][]{new int[]{CELLSIZE - 2, 12},
+ new int[]{CELLSIZE},
+ new int[]{CELLSIZE}});
+
+ /**
+ * start block offset is cellSize * 2 - 1, read 5 * cellSize + 10
+ */
+ testPlanReadPortions(CELLSIZE * 2 - 1, 5 * CELLSIZE + 10, 0,
+ new int[]{CELLSIZE * 2, CELLSIZE + 10, CELLSIZE * 2},
+ new int[]{CELLSIZE, CELLSIZE - 1, 0},
+ new int[][]{new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1},
+ new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1},
+ new int[]{1, 3 * CELLSIZE + 1}},
+ new int[][]{new int[]{CELLSIZE, CELLSIZE},
+ new int[]{1, CELLSIZE, 9},
+ new int[]{CELLSIZE, CELLSIZE}});
+
+ /**
+ * start block offset is cellSize * 6 - 1, read 7 * cellSize + 10
+ */
+ testPlanReadPortions(CELLSIZE * 6 - 1, 7 * CELLSIZE + 10, 0,
+ new int[]{CELLSIZE * 3, CELLSIZE * 2 + 9, CELLSIZE * 2 + 1},
+ new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE * 2 - 1},
+ new int[][]{new int[]{1, 3 * CELLSIZE + 1, 6 * CELLSIZE + 1},
+ new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1, 7 * CELLSIZE + 1},
+ new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}},
+ new int[][]{new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
+ new int[]{CELLSIZE, CELLSIZE, 9},
+ new int[]{1, CELLSIZE, CELLSIZE}});
+ }
+
+ private LocatedStripedBlock createDummyLocatedBlock() {
+ final long blockGroupID = -1048576;
+ DatanodeInfo[] locs = new DatanodeInfo[TOTAL_SIZE];
+ String[] storageIDs = new String[TOTAL_SIZE];
+ StorageType[] storageTypes = new StorageType[TOTAL_SIZE];
+ int[] indices = new int[TOTAL_SIZE];
+ for (int i = 0; i < TOTAL_SIZE; i++) {
+ locs[i] = new DatanodeInfo(cluster.getDataNodes().get(i).getDatanodeId());
+ storageIDs[i] = cluster.getDataNodes().get(i).getDatanodeUuid();
+ storageTypes[i] = StorageType.DISK;
+ indices[i] = (i + 2) % GROUP_SIZE;
+ }
+ return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID),
+ locs, storageIDs, storageTypes, indices, 0, false, null);
+ }
+
+ @Test
+ public void testParseDummyStripedBlock() {
+ LocatedStripedBlock lsb = createDummyLocatedBlock();
+ LocatedBlock[] blocks = DFSStripedInputStream.parseStripedBlockGroup(
+ lsb, GROUP_SIZE, CELLSIZE);
+ assertEquals(GROUP_SIZE, blocks.length);
+ for (int j = 0; j < GROUP_SIZE; j++) {
+ assertFalse(blocks[j].isStriped());
+ assertEquals(j,
+ BlockIdManager.getBlockIndex(blocks[j].getBlock().getLocalBlock()));
+ assertEquals(j * CELLSIZE, blocks[j].getStartOffset());
+ }
+ }
+
+ @Test
+ public void testParseStripedBlock() throws Exception {
+ final int numBlocks = 4;
+ DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
+ NUM_STRIPE_PER_BLOCK);
+ LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+ filePath.toString(), 0, BLOCKSIZE * numBlocks);
+
+ assertEquals(4, lbs.locatedBlockCount());
+ List<LocatedBlock> lbList = lbs.getLocatedBlocks();
+ for (LocatedBlock lb : lbList) {
+ assertTrue(lb.isStriped());
+ }
+
+ for (int i = 0; i < numBlocks; i++) {
+ LocatedStripedBlock lsb = (LocatedStripedBlock) (lbs.get(i));
+ LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
+ GROUP_SIZE, CELLSIZE);
+ assertEquals(GROUP_SIZE, blks.length);
+ for (int j = 0; j < GROUP_SIZE; j++) {
+ assertFalse(blks[j].isStriped());
+ assertEquals(j,
+ BlockIdManager.getBlockIndex(blks[j].getBlock().getLocalBlock()));
+ assertEquals(i * BLOCKSIZE + j * CELLSIZE, blks[j].getStartOffset());
+ }
+ }
+ }
+
+ /**
+ * Test {@link DFSStripedInputStream#getBlockAt(long)}
+ */
+ @Test
+ public void testGetBlock() throws Exception {
+ final int numBlocks = 4;
+ DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
+ NUM_STRIPE_PER_BLOCK);
+ LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+ filePath.toString(), 0, BLOCKSIZE * numBlocks);
+ final DFSStripedInputStream in =
+ new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
+
+ List<LocatedBlock> lbList = lbs.getLocatedBlocks();
+ for (LocatedBlock aLbList : lbList) {
+ LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList;
+ LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
+ GROUP_SIZE, CELLSIZE);
+ for (int j = 0; j < GROUP_SIZE; j++) {
+ LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset());
+ assertEquals(blks[j].getBlock(), refreshed.getBlock());
+ assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
+ assertArrayEquals(blks[j].getLocations(), refreshed.getLocations());
+ }
+ }
+ }
+
+ @Test
+ public void testPread() throws Exception {
+ final int numBlocks = 4;
+ DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
+ NUM_STRIPE_PER_BLOCK);
+ LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+ filePath.toString(), 0, BLOCKSIZE);
+
+ assert lbs.get(0) instanceof LocatedStripedBlock;
+ LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
+ for (int i = 0; i < GROUP_SIZE; i++) {
+ Block blk = new Block(bg.getBlock().getBlockId() + i, BLOCKSIZE,
+ bg.getBlock().getGenerationStamp());
+ blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+ cluster.injectBlocks(i, Arrays.asList(blk),
+ bg.getBlock().getBlockPoolId());
+ }
+ DFSStripedInputStream in =
+ new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
+ in.setCellSize(CELLSIZE);
+ int readSize = BLOCKSIZE;
+ byte[] readBuffer = new byte[readSize];
+ int ret = in.read(0, readBuffer, 0, readSize);
+
+ assertEquals(readSize, ret);
+ // TODO: verify read results with patterned data from HDFS-8117
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb386505/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
index d965ae7..b2ff6c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -18,15 +18,11 @@
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@@ -36,19 +32,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
-import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
-import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
-import java.util.UUID;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -84,83 +75,10 @@ public class TestRecoverStripedBlocks {
}
}
- public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
- int numBlocks) throws Exception {
- DistributedFileSystem dfs = cluster.getFileSystem();
- dfs.mkdirs(dir);
- dfs.getClient().getNamenode().createErasureCodingZone(dir.toString());
-
- FSDataOutputStream out = null;
- try {
- out = dfs.create(file, (short) 1); // create an empty file
-
- FSNamesystem ns = cluster.getNamesystem();
- FSDirectory fsdir = ns.getFSDirectory();
- INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
-
- ExtendedBlock previous = null;
- for (int i = 0; i < numBlocks; i++) {
- Block newBlock = createBlock(cluster.getDataNodes(), ns,
- file.toString(), fileNode, dfs.getClient().getClientName(),
- previous);
- previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
- }
-
- ns.completeFile(file.toString(), dfs.getClient().getClientName(),
- previous, fileNode.getId());
- } finally {
- IOUtils.cleanup(null, out);
- }
- }
-
- static Block createBlock(List<DataNode> dataNodes, FSNamesystem ns,
- String file, INodeFile fileNode, String clientName,
- ExtendedBlock previous) throws Exception {
- ns.getAdditionalBlock(file, fileNode.getId(), clientName, previous, null,
- null);
-
- final BlockInfo lastBlock = fileNode.getLastBlock();
- final int groupSize = fileNode.getBlockReplication();
- // 1. RECEIVING_BLOCK IBR
- int i = 0;
- for (DataNode dn : dataNodes) {
- if (i < groupSize) {
- final Block block = new Block(lastBlock.getBlockId() + i++, 0,
- lastBlock.getGenerationStamp());
- DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
- StorageReceivedDeletedBlocks[] reports = DFSTestUtil
- .makeReportForReceivedBlock(block,
- ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
- for (StorageReceivedDeletedBlocks report : reports) {
- ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
- }
- }
- }
-
- // 2. RECEIVED_BLOCK IBR
- i = 0;
- for (DataNode dn : dataNodes) {
- if (i < groupSize) {
- final Block block = new Block(lastBlock.getBlockId() + i++,
- BLOCK_STRIPED_CHUNK_SIZE, lastBlock.getGenerationStamp());
- DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
- StorageReceivedDeletedBlocks[] reports = DFSTestUtil
- .makeReportForReceivedBlock(block,
- ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
- for (StorageReceivedDeletedBlocks report : reports) {
- ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
- }
- }
- }
-
- lastBlock.setNumBytes(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS);
- return lastBlock;
- }
-
@Test
public void testMissingStripedBlock() throws Exception {
final int numBlocks = 4;
- createECFile(cluster, filePath, dirPath, numBlocks);
+ DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, 1);
// make sure the file is complete in NN
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
@@ -172,7 +90,7 @@ public class TestRecoverStripedBlocks {
for (BlockInfo blk : blocks) {
assertTrue(blk.isStriped());
assertTrue(blk.isComplete());
- assertEquals(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
+ assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
final BlockInfoStriped sb = (BlockInfoStriped) blk;
assertEquals(GROUP_SIZE, sb.numNodes());
}