You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/04/13 23:08:32 UTC
[01/12] hadoop git commit: HDFS-7782. Erasure coding: pread from
files in striped layout. Contributed by Zhe Zhang and Jing Zhao
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7285 21d0ce4cc -> 6e202bac1 (forced update)
HDFS-7782. Erasure coding: pread from files in striped layout. Contributed by Zhe Zhang and Jing Zhao
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ae8f0c12
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ae8f0c12
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ae8f0c12
Branch: refs/heads/HDFS-7285
Commit: ae8f0c12db2db9212e7e50a4219d24a3c27110e3
Parents: 38fa860
Author: Zhe Zhang <zh...@apache.org>
Authored: Tue Apr 7 11:20:13 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:12 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hdfs/DFSClient.java | 55 +++
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +-
.../org/apache/hadoop/hdfs/DFSInputStream.java | 79 +++-
.../hadoop/hdfs/DFSStripedInputStream.java | 367 +++++++++++++++++++
.../hadoop/hdfs/protocol/HdfsConstants.java | 2 +-
.../hadoop/hdfs/protocol/LocatedBlock.java | 4 +
.../hdfs/protocol/LocatedStripedBlock.java | 5 +
.../blockmanagement/BlockInfoStriped.java | 6 +-
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 92 ++++-
.../apache/hadoop/hdfs/TestReadStripedFile.java | 304 +++++++++++++++
.../namenode/TestRecoverStripedBlocks.java | 88 +----
11 files changed, 897 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index cd06e09..2ef1d36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -236,6 +236,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
new DFSHedgedReadMetrics();
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
+ private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
private final Sampler<?> traceSampler;
public DfsClientConf getConf() {
@@ -376,6 +377,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
if (numThreads > 0) {
this.initThreadsNumForHedgedReads(numThreads);
}
+ numThreads = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE,
+ DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE);
+ if (numThreads <= 0) {
+ LOG.warn("The value of "
+ + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE
+ + " must be greater than 0. The current setting is " + numThreads
+ + ". Reset it to the default value "
+ + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE);
+ numThreads =
+ DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE;
+ }
+ this.initThreadsNumForStripedReads(numThreads);
this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
@@ -3148,6 +3162,43 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
+ /**
+ * Create thread pool for parallel reading in striped layout,
+ * STRIPED_READ_THREAD_POOL, if it does not already exist.
+ * @param num Number of threads for striped reads thread pool.
+ */
+ private void initThreadsNumForStripedReads(int num) {
+ assert num > 0;
+ if (STRIPED_READ_THREAD_POOL != null) {
+ return;
+ }
+ synchronized (DFSClient.class) {
+ if (STRIPED_READ_THREAD_POOL == null) {
+ STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new Daemon.DaemonFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = super.newThread(r);
+ t.setName("stripedRead-" + threadIndex.getAndIncrement());
+ return t;
+ }
+ }, new ThreadPoolExecutor.CallerRunsPolicy() {
+ @Override
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
+ LOG.info("Execution for striped reading rejected, "
+ + "Executing in current thread");
+ // will run in the current thread
+ super.rejectedExecution(runnable, e);
+ }
+ });
+ STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
+ }
+ }
+ }
+
long getHedgedReadTimeout() {
return this.hedgedReadThresholdMillis;
}
@@ -3161,6 +3212,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return HEDGED_READ_THREAD_POOL;
}
+ ThreadPoolExecutor getStripedReadsThreadPool() {
+ return STRIPED_READ_THREAD_POOL;
+ }
+
boolean isHedgedReadsEnabled() {
return (HEDGED_READ_THREAD_POOL != null) &&
HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 6185604..35e279d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -645,7 +645,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.reject-unresolved-dn-topology-mapping";
public static final boolean DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT =
false;
-
+
+ public static final String DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE =
+ "dfs.client.striped.read.threadpool.size";
+ // With default 3+2 schema, each normal read could span 3 DNs. So this
+ // default value accommodates 6 read streams
+ public static final int DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE = 18;
+
// Slow io warning log threshold settings for dfsclient and datanode.
public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
"dfs.datanode.slow.io.warning.threshold.ms";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index dd0f6fe..e94c41a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -43,6 +43,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
@@ -93,7 +94,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@VisibleForTesting
public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
- private final DFSClient dfsClient;
+ protected final DFSClient dfsClient;
private AtomicBoolean closed = new AtomicBoolean(false);
private final String src;
private final boolean verifyChecksum;
@@ -440,7 +441,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @return located block
* @throws IOException
*/
- private LocatedBlock getBlockAt(long offset) throws IOException {
+ protected LocatedBlock getBlockAt(long offset) throws IOException {
synchronized(infoLock) {
assert (locatedBlocks != null) : "locatedBlocks is null";
@@ -705,7 +706,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* Wraps different possible read implementations so that readBuffer can be
* strategy-agnostic.
*/
- private interface ReaderStrategy {
+ interface ReaderStrategy {
public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException;
}
@@ -1048,7 +1049,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
return errMsgr.toString();
}
- private void fetchBlockByteRange(long blockStartOffset, long start, long end,
+ protected void fetchBlockByteRange(long blockStartOffset, long start, long end,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
@@ -1090,13 +1091,42 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
};
}
+ /**
+ * Used when reading contiguous blocks
+ */
private void actualGetFromOneDataNode(final DNAddrPair datanode,
long blockStartOffset, final long start, final long end, byte[] buf,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
+ final int length = (int) (end - start + 1);
+ actualGetFromOneDataNode(datanode, block, start, end, buf,
+ new int[]{offset}, new int[]{length}, corruptedBlockMap);
+ }
+
+ /**
+ * Read data from one DataNode.
+ * @param datanode the datanode from which to read data
+ * @param block the block to read
+ * @param startInBlk the startInBlk offset of the block
+ * @param endInBlk the endInBlk offset of the block
+ * @param buf the given byte array into which the data is read
+ * @param offsets the data may be read into multiple segments of the buf
+ * (when reading a striped block). this array indicates the
+ * offset of each buf segment.
+ * @param lengths the length of each buf segment
+ * @param corruptedBlockMap map recording list of datanodes with corrupted
+ * block replica
+ */
+ void actualGetFromOneDataNode(final DNAddrPair datanode,
+ LocatedBlock block, final long startInBlk, final long endInBlk,
+ byte[] buf, int[] offsets, int[] lengths,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
+ final int len = (int) (endInBlk - startInBlk + 1);
+ checkReadPortions(offsets, lengths, len);
while (true) {
// cached block locations may have been updated by chooseDataNode()
@@ -1117,7 +1147,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
- int len = (int) (end - start + 1);
reader = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
@@ -1126,7 +1155,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
setFileName(src).
setBlock(block.getBlock()).
setBlockToken(blockToken).
- setStartOffset(start).
+ setStartOffset(startInBlk).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(len).
@@ -1136,12 +1165,14 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
- int nread = reader.readAll(buf, offset, len);
- updateReadStatistics(readStatistics, nread, reader);
+ for (int i = 0; i < offsets.length; i++) {
+ int nread = reader.readAll(buf, offsets[i], lengths[i]);
+ updateReadStatistics(readStatistics, nread, reader);
- if (nread != len) {
- throw new IOException("truncated return from reader.read(): " +
- "excpected " + len + ", got " + nread);
+ if (nread != len) {
+ throw new IOException("truncated return from reader.read(): " +
+ "excpected " + len + ", got " + nread);
+ }
}
DFSClientFaultInjector.get().readFromDatanodeDelay();
return;
@@ -1187,6 +1218,25 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/**
+ * This method verifies that the read portions are valid and do not overlap
+ * with each other.
+ */
+ private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
+ Preconditions.checkArgument(offsets.length == lengths.length &&
+ offsets.length > 0);
+ int sum = 0;
+ for (int i = 0; i < lengths.length; i++) {
+ if (i > 0) {
+ int gap = offsets[i] - offsets[i - 1];
+ // make sure read portions do not overlap with each other
+ Preconditions.checkArgument(gap >= lengths[i - 1]);
+ }
+ sum += lengths[i];
+ }
+ Preconditions.checkArgument(sum == totalLen);
+ }
+
+ /**
* Like {@link #fetchBlockByteRange(LocatedBlock, long, long, byte[],
* int, Map)} except we start up a second, parallel, 'hedged' read
* if the first read is taking longer than configured amount of
@@ -1407,10 +1457,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
- if (dfsClient.isHedgedReadsEnabled()) {
+ if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
- targetStart + bytesToRead - 1, buffer, offset,
- corruptedBlockMap);
+ targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
} else {
fetchBlockByteRange(blk.getStartOffset(), targetStart,
targetStart + bytesToRead - 1, buffer, offset,
@@ -1606,7 +1655,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/** Utility class to encapsulate data node info and its address. */
- private static final class DNAddrPair {
+ static final class DNAddrPair {
final DatanodeInfo info;
final InetSocketAddress addr;
final StorageType storageType;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
new file mode 100644
index 0000000..077b0f8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+
+/******************************************************************************
+ * DFSStripedInputStream reads from striped block groups, illustrated below:
+ *
+ * | <- Striped Block Group -> |
+ * blk_0 blk_1 blk_2 <- A striped block group has
+ * | | | {@link #groupSize} blocks
+ * v v v
+ * +------+ +------+ +------+
+ * |cell_0| |cell_1| |cell_2| <- The logical read order should be
+ * +------+ +------+ +------+ cell_0, cell_1, ...
+ * |cell_3| |cell_4| |cell_5|
+ * +------+ +------+ +------+
+ * |cell_6| |cell_7| |cell_8|
+ * +------+ +------+ +------+
+ * |cell_9|
+ * +------+ <- A cell contains {@link #cellSize} bytes of data
+ *
+ * Three styles of read will eventually be supported:
+ * 1. Stateful read: TODO: HDFS-8033
+ * 2. pread without decode support
+ * This is implemented by calculating the portion of read from each block and
+ * issuing requests to each DataNode in parallel.
+ * 3. pread with decode support: TODO: will be supported after HDFS-7678
+ *****************************************************************************/
+public class DFSStripedInputStream extends DFSInputStream {
+ /**
+ * This method plans the read portion from each block in the stripe
+ * @param groupSize The size / width of the striping group
+ * @param cellSize The size of each striping cell
+ * @param startInBlk Starting offset in the striped block
+ * @param len Length of the read request
+ * @param bufOffset Initial offset in the result buffer
+ * @return array of {@link ReadPortion}, each representing the portion of I/O
+ * for an individual block in the group
+ */
+ @VisibleForTesting
+ static ReadPortion[] planReadPortions(final int groupSize,
+ final int cellSize, final long startInBlk, final int len, int bufOffset) {
+ ReadPortion[] results = new ReadPortion[groupSize];
+ for (int i = 0; i < groupSize; i++) {
+ results[i] = new ReadPortion();
+ }
+
+ // cellIdxInBlk is the index of the cell in the block
+ // E.g., cell_3 is the 2nd cell in blk_0
+ int cellIdxInBlk = (int) (startInBlk / (cellSize * groupSize));
+
+ // blkIdxInGroup is the index of the block in the striped block group
+ // E.g., blk_2 is the 3rd block in the group
+ final int blkIdxInGroup = (int) (startInBlk / cellSize % groupSize);
+ results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
+ startInBlk % cellSize;
+ boolean crossStripe = false;
+ for (int i = 1; i < groupSize; i++) {
+ if (blkIdxInGroup + i >= groupSize && !crossStripe) {
+ cellIdxInBlk++;
+ crossStripe = true;
+ }
+ results[(blkIdxInGroup + i) % groupSize].startOffsetInBlock =
+ cellSize * cellIdxInBlk;
+ }
+
+ int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
+ results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
+ results[blkIdxInGroup].lengths.add(firstCellLen);
+ results[blkIdxInGroup].readLength += firstCellLen;
+
+ int i = (blkIdxInGroup + 1) % groupSize;
+ for (int done = firstCellLen; done < len; done += cellSize) {
+ ReadPortion rp = results[i];
+ rp.offsetsInBuf.add(done + bufOffset);
+ final int readLen = Math.min(len - done, cellSize);
+ rp.lengths.add(readLen);
+ rp.readLength += readLen;
+ i = (i + 1) % groupSize;
+ }
+ return results;
+ }
+
+ /**
+ * This method parses a striped block group into individual blocks.
+ *
+ * @param bg The striped block group
+ * @param dataBlkNum the number of data blocks
+ * @return An array containing the blocks in the group
+ */
+ @VisibleForTesting
+ static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
+ int dataBlkNum, int cellSize) {
+ int locatedBGSize = bg.getBlockIndices().length;
+ // TODO not considering missing blocks for now, only identify data blocks
+ LocatedBlock[] lbs = new LocatedBlock[dataBlkNum];
+ for (short i = 0; i < locatedBGSize; i++) {
+ final int idx = bg.getBlockIndices()[i];
+ if (idx < dataBlkNum && lbs[idx] == null) {
+ lbs[idx] = constructInternalBlock(bg, i, cellSize, idx);
+ }
+ }
+ return lbs;
+ }
+
+ private static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
+ int idxInReturnedLocs, int cellSize, int idxInBlockGroup) {
+ final ExtendedBlock blk = new ExtendedBlock(bg.getBlock());
+ blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup);
+ // TODO: fix the numBytes computation
+
+ return new LocatedBlock(blk,
+ new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
+ new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
+ new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
+ bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
+ null);
+ }
+
+
+ private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
+
+ DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)
+ throws IOException {
+ super(dfsClient, src, verifyChecksum);
+ DFSClient.LOG.debug("Creating an striped input stream for file " + src);
+ }
+
+ @Override
+ public synchronized int read(final ByteBuffer buf) throws IOException {
+ throw new UnsupportedActionException("Stateful read is not supported");
+ }
+
+ @Override
+ public synchronized int read(final byte buf[], int off, int len)
+ throws IOException {
+ throw new UnsupportedActionException("Stateful read is not supported");
+ }
+
+ /**
+ * | <--------- LocatedStripedBlock (ID = 0) ---------> |
+ * LocatedBlock (0) | LocatedBlock (1) | LocatedBlock (2)
+ * ^
+ * offset
+ * On a striped file, the super method {@link DFSInputStream#getBlockAt}
+ * treats a striped block group as a single {@link LocatedBlock} object,
+ * which includes target in its range. This method adds the logic of:
+ * 1. Analyzing the index of required block based on offset
+ * 2. Parsing the block group to obtain the block location on that index
+ */
+ @Override
+ protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
+ LocatedBlock lb = super.getBlockAt(blkStartOffset);
+ assert lb instanceof LocatedStripedBlock : "NameNode should return a " +
+ "LocatedStripedBlock for a striped file";
+
+ int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize)
+ % groupSize);
+ // If indexing information is returned, iterate through the index array
+ // to find the entry for position idx in the group
+ LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
+ int i = 0;
+ for (; i < lsb.getBlockIndices().length; i++) {
+ if (lsb.getBlockIndices()[i] == idx) {
+ break;
+ }
+ }
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("getBlockAt for striped blocks, offset="
+ + blkStartOffset + ". Obtained block " + lb + ", idx=" + idx);
+ }
+ return constructInternalBlock(lsb, i, cellSize, idx);
+ }
+
+ private LocatedBlock getBlockGroupAt(long offset) throws IOException {
+ return super.getBlockAt(offset);
+ }
+
+ /**
+ * Real implementation of pread.
+ */
+ @Override
+ protected void fetchBlockByteRange(LocatedBlock block, long start,
+ long end, byte[] buf, int offset,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
+ Map<Future<Void>, Integer> futures = new HashMap<>();
+ CompletionService<Void> stripedReadsService =
+ new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
+ int len = (int) (end - start + 1);
+
+ // Refresh the striped block group
+ block = getBlockGroupAt(block.getStartOffset());
+ assert block instanceof LocatedStripedBlock : "NameNode" +
+ " should return a LocatedStripedBlock for a striped file";
+ LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
+
+ // Planning the portion of I/O for each shard
+ ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, start,
+ len, offset);
+
+ // Parse group to get chosen DN location
+ LocatedBlock[] blks = parseStripedBlockGroup(blockGroup, groupSize, cellSize);
+
+ for (short i = 0; i < groupSize; i++) {
+ ReadPortion rp = readPortions[i];
+ if (rp.readLength <= 0) {
+ continue;
+ }
+ DatanodeInfo loc = blks[i].getLocations()[0];
+ StorageType type = blks[i].getStorageTypes()[0];
+ DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
+ loc.getXferAddr(dfsClient.getConf().connectToDnViaHostname)), type);
+ Callable<Void> readCallable = getFromOneDataNode(dnAddr, blks[i],
+ rp.startOffsetInBlock, rp.startOffsetInBlock + rp.readLength - 1, buf,
+ rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
+ Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
+ DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
+ futures.put(getFromDNRequest, (int) i);
+ }
+ while (!futures.isEmpty()) {
+ try {
+ waitNextCompletion(stripedReadsService, futures);
+ } catch (InterruptedException ie) {
+ // Ignore and retry
+ }
+ }
+ }
+
+ private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
+ final LocatedBlock block, final long start, final long end,
+ final byte[] buf, final int[] offsets, final int[] lengths,
+ final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
+ final int hedgedReadId) {
+ final Span parentSpan = Trace.currentSpan();
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ TraceScope scope =
+ Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan);
+ try {
+ actualGetFromOneDataNode(datanode, block, start,
+ end, buf, offsets, lengths, corruptedBlockMap);
+ } finally {
+ scope.close();
+ }
+ return null;
+ }
+ };
+ }
+
+ private void waitNextCompletion(CompletionService<Void> stripedReadsService,
+ Map<Future<Void>, Integer> futures) throws InterruptedException {
+ if (futures.isEmpty()) {
+ throw new InterruptedException("Futures already empty");
+ }
+ Future<Void> future = null;
+ try {
+ future = stripedReadsService.take();
+ future.get();
+ futures.remove(future);
+ } catch (ExecutionException | CancellationException e) {
+ // already logged in the Callable
+ futures.remove(future);
+ }
+ throw new InterruptedException("let's retry");
+ }
+
+ public void setCellSize(int cellSize) {
+ this.cellSize = cellSize;
+ }
+
+ /**
+ * This class represents the portion of I/O associated with each block in the
+ * striped block group.
+ */
+ static class ReadPortion {
+ /**
+ * startOffsetInBlock
+ * |
+ * v
+ * |<-lengths[0]->|<- lengths[1] ->|<-lengths[2]->|
+ * +------------------+------------------+----------------+
+ * | cell_0 | cell_3 | cell_6 | <- blk_0
+ * +------------------+------------------+----------------+
+ * _/ \_______________________
+ * | |
+ * v offsetsInBuf[0] v offsetsInBuf[1]
+ * +------------------------------------------------------+
+ * | cell_0 | cell_1 and cell_2 |cell_3 ...| <- buf
+ * | (partial) | (from blk_1 and blk_2) | |
+ * +------------------------------------------------------+
+ */
+ private long startOffsetInBlock = 0;
+ private long readLength = 0;
+ private final List<Integer> offsetsInBuf = new ArrayList<>();
+ private final List<Integer> lengths = new ArrayList<>();
+
+ int[] getOffsets() {
+ int[] offsets = new int[offsetsInBuf.size()];
+ for (int i = 0; i < offsets.length; i++) {
+ offsets[i] = offsetsInBuf.get(i);
+ }
+ return offsets;
+ }
+
+ int[] getLengths() {
+ int[] lens = new int[this.lengths.size()];
+ for (int i = 0; i < lens.length; i++) {
+ lens[i] = this.lengths.get(i);
+ }
+ return lens;
+ }
+
+ long getReadLength() {
+ return readLength;
+ }
+
+ long getStartOffsetInBlock() {
+ return startOffsetInBlock;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index b2882af..a888aa4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -186,5 +186,5 @@ public class HdfsConstants {
public static final byte MAX_BLOCKS_IN_GROUP = 16;
// The chunk size for striped block which is used by erasure coding
- public static final int BLOCK_STRIPED_CHUNK_SIZE = 64 * 1024;
+ public static final int BLOCK_STRIPED_CELL_SIZE = 128 * 1024;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index a38e8f2..4ba8193 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -212,5 +212,9 @@ public class LocatedBlock {
+ "; locs=" + Arrays.asList(locs)
+ "}";
}
+
+ public boolean isStriped() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
index 97e3a69..98614db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
@@ -65,4 +65,9 @@ public class LocatedStripedBlock extends LocatedBlock {
public int[] getBlockIndices() {
return this.blockIndices;
}
+
+ @Override
+ public boolean isStriped() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 4a85efb..20b0c5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import java.io.DataOutput;
import java.io.IOException;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
/**
* Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
@@ -203,8 +203,8 @@ public class BlockInfoStriped extends BlockInfo {
// In case striped blocks, total usage by this striped blocks should
// be the total of data blocks and parity blocks because
// `getNumBytes` is the total of actual data block size.
- return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CHUNK_SIZE) + 1)
- * BLOCK_STRIPED_CHUNK_SIZE * parityBlockNum + getNumBytes();
+ return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CELL_SIZE) + 1)
+ * BLOCK_STRIPED_CELL_SIZE * parityBlockNum + getNumBytes();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index dc91c37..e3a85cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -64,6 +64,12 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
@@ -100,6 +106,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -107,7 +114,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@@ -122,8 +128,10 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
@@ -131,7 +139,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
-import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
@@ -151,12 +158,8 @@ import org.apache.log4j.Level;
import org.junit.Assume;
import org.mockito.internal.util.reflection.Whitebox;
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
/** Utilities for HDFS tests */
public class DFSTestUtil {
@@ -1807,4 +1810,77 @@ public class DFSTestUtil {
reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
return reports;
}
+
+ public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
+ int numBlocks, int numStripesPerBlk) throws Exception {
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ dfs.mkdirs(dir);
+ dfs.getClient().createErasureCodingZone(dir.toString());
+
+ FSDataOutputStream out = null;
+ try {
+ out = dfs.create(file, (short) 1); // create an empty file
+
+ FSNamesystem ns = cluster.getNamesystem();
+ FSDirectory fsdir = ns.getFSDirectory();
+ INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
+
+ ExtendedBlock previous = null;
+ for (int i = 0; i < numBlocks; i++) {
+ Block newBlock = createBlock(cluster.getDataNodes(), dfs, ns,
+ file.toString(), fileNode, dfs.getClient().getClientName(),
+ previous, numStripesPerBlk);
+ previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
+ }
+
+ dfs.getClient().namenode.complete(file.toString(),
+ dfs.getClient().getClientName(), previous, fileNode.getId());
+ } finally {
+ IOUtils.cleanup(null, out);
+ }
+ }
+
+ static Block createBlock(List<DataNode> dataNodes, DistributedFileSystem fs,
+ FSNamesystem ns, String file, INodeFile fileNode, String clientName,
+ ExtendedBlock previous, int numStripes) throws Exception {
+ fs.getClient().namenode.addBlock(file, clientName, previous, null,
+ fileNode.getId(), null);
+
+ final BlockInfo lastBlock = fileNode.getLastBlock();
+ final int groupSize = fileNode.getBlockReplication();
+ // 1. RECEIVING_BLOCK IBR
+ int i = 0;
+ for (DataNode dn : dataNodes) {
+ if (i < groupSize) {
+ final Block block = new Block(lastBlock.getBlockId() + i++, 0,
+ lastBlock.getGenerationStamp());
+ DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+ .makeReportForReceivedBlock(block,
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
+ for (StorageReceivedDeletedBlocks report : reports) {
+ ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+ }
+ }
+ }
+
+ // 2. RECEIVED_BLOCK IBR
+ i = 0;
+ for (DataNode dn : dataNodes) {
+ if (i < groupSize) {
+ final Block block = new Block(lastBlock.getBlockId() + i++,
+ numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp());
+ DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+ .makeReportForReceivedBlock(block,
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+ for (StorageReceivedDeletedBlocks report : reports) {
+ ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+ }
+ }
+ }
+
+ lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS);
+ return lastBlock;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
new file mode 100644
index 0000000..0032bdd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestReadStripedFile {
+
+ public static final Log LOG = LogFactory.getLog(TestReadStripedFile.class);
+
+ private MiniDFSCluster cluster;
+ private Configuration conf = new Configuration();
+ private DistributedFileSystem fs;
+ private final Path dirPath = new Path("/striped");
+ private Path filePath = new Path(dirPath, "file");
+ private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS;
+ private final short TOTAL_SIZE = HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
+ private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private final int NUM_STRIPE_PER_BLOCK = 2;
+ private final int BLOCKSIZE = 2 * GROUP_SIZE * CELLSIZE;
+
+ @Before
+ public void setup() throws IOException {
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
+ SimulatedFSDataset.setFactory(conf);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(TOTAL_SIZE)
+ .build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private void testPlanReadPortions(int startInBlk, int length,
+ int bufferOffset, int[] readLengths, int[] offsetsInBlock,
+ int[][] bufferOffsets, int[][] bufferLengths) {
+ ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE,
+ CELLSIZE, startInBlk, length, bufferOffset);
+ assertEquals(GROUP_SIZE, results.length);
+
+ for (int i = 0; i < GROUP_SIZE; i++) {
+ assertEquals(readLengths[i], results[i].getReadLength());
+ assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
+ final int[] bOffsets = results[i].getOffsets();
+ assertArrayEquals(bufferOffsets[i], bOffsets);
+ final int[] bLengths = results[i].getLengths();
+ assertArrayEquals(bufferLengths[i], bLengths);
+ }
+ }
+
+ /**
+ * Test {@link DFSStripedInputStream#planReadPortions}
+ */
+ @Test
+ public void testPlanReadPortions() {
+ /**
+ * start block offset is 0, read cellSize - 10
+ */
+ testPlanReadPortions(0, CELLSIZE - 10, 0,
+ new int[]{CELLSIZE - 10, 0, 0}, new int[]{0, 0, 0},
+ new int[][]{new int[]{0}, new int[]{}, new int[]{}},
+ new int[][]{new int[]{CELLSIZE - 10}, new int[]{}, new int[]{}});
+
+ /**
+ * start block offset is 0, read 3 * cellSize
+ */
+ testPlanReadPortions(0, GROUP_SIZE * CELLSIZE, 0,
+ new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, new int[]{0, 0, 0},
+ new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{CELLSIZE * 2}},
+ new int[][]{new int[]{CELLSIZE}, new int[]{CELLSIZE}, new int[]{CELLSIZE}});
+
+ /**
+ * start block offset is 0, read cellSize + 10
+ */
+ testPlanReadPortions(0, CELLSIZE + 10, 0,
+ new int[]{CELLSIZE, 10, 0}, new int[]{0, 0, 0},
+ new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{}},
+ new int[][]{new int[]{CELLSIZE}, new int[]{10}, new int[]{}});
+
+ /**
+ * start block offset is 0, read 5 * cellSize + 10, buffer start offset is 100
+ */
+ testPlanReadPortions(0, 5 * CELLSIZE + 10, 100,
+ new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE + 10}, new int[]{0, 0, 0},
+ new int[][]{new int[]{100, 100 + CELLSIZE * GROUP_SIZE},
+ new int[]{100 + CELLSIZE, 100 + CELLSIZE * 4},
+ new int[]{100 + CELLSIZE * 2, 100 + CELLSIZE * 5}},
+ new int[][]{new int[]{CELLSIZE, CELLSIZE},
+ new int[]{CELLSIZE, CELLSIZE},
+ new int[]{CELLSIZE, 10}});
+
+ /**
+ * start block offset is 2, read 3 * cellSize
+ */
+ testPlanReadPortions(2, GROUP_SIZE * CELLSIZE, 100,
+ new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
+ new int[]{2, 0, 0},
+ new int[][]{new int[]{100, 100 + GROUP_SIZE * CELLSIZE - 2},
+ new int[]{100 + CELLSIZE - 2},
+ new int[]{100 + CELLSIZE * 2 - 2}},
+ new int[][]{new int[]{CELLSIZE - 2, 2},
+ new int[]{CELLSIZE},
+ new int[]{CELLSIZE}});
+
+ /**
+ * start block offset is 2, read 3 * cellSize + 10
+ */
+ testPlanReadPortions(2, GROUP_SIZE * CELLSIZE + 10, 0,
+ new int[]{CELLSIZE + 10, CELLSIZE, CELLSIZE},
+ new int[]{2, 0, 0},
+ new int[][]{new int[]{0, GROUP_SIZE * CELLSIZE - 2},
+ new int[]{CELLSIZE - 2},
+ new int[]{CELLSIZE * 2 - 2}},
+ new int[][]{new int[]{CELLSIZE - 2, 12},
+ new int[]{CELLSIZE},
+ new int[]{CELLSIZE}});
+
+ /**
+ * start block offset is cellSize * 2 - 1, read 5 * cellSize + 10
+ */
+ testPlanReadPortions(CELLSIZE * 2 - 1, 5 * CELLSIZE + 10, 0,
+ new int[]{CELLSIZE * 2, CELLSIZE + 10, CELLSIZE * 2},
+ new int[]{CELLSIZE, CELLSIZE - 1, 0},
+ new int[][]{new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1},
+ new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1},
+ new int[]{1, 3 * CELLSIZE + 1}},
+ new int[][]{new int[]{CELLSIZE, CELLSIZE},
+ new int[]{1, CELLSIZE, 9},
+ new int[]{CELLSIZE, CELLSIZE}});
+
+ /**
+ * start block offset is cellSize * 6 - 1, read 7 * cellSize + 10
+ */
+ testPlanReadPortions(CELLSIZE * 6 - 1, 7 * CELLSIZE + 10, 0,
+ new int[]{CELLSIZE * 3, CELLSIZE * 2 + 9, CELLSIZE * 2 + 1},
+ new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE * 2 - 1},
+ new int[][]{new int[]{1, 3 * CELLSIZE + 1, 6 * CELLSIZE + 1},
+ new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1, 7 * CELLSIZE + 1},
+ new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}},
+ new int[][]{new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
+ new int[]{CELLSIZE, CELLSIZE, 9},
+ new int[]{1, CELLSIZE, CELLSIZE}});
+ }
+
+ private LocatedStripedBlock createDummyLocatedBlock() {
+ final long blockGroupID = -1048576;
+ DatanodeInfo[] locs = new DatanodeInfo[TOTAL_SIZE];
+ String[] storageIDs = new String[TOTAL_SIZE];
+ StorageType[] storageTypes = new StorageType[TOTAL_SIZE];
+ int[] indices = new int[TOTAL_SIZE];
+ for (int i = 0; i < TOTAL_SIZE; i++) {
+ locs[i] = new DatanodeInfo(cluster.getDataNodes().get(i).getDatanodeId());
+ storageIDs[i] = cluster.getDataNodes().get(i).getDatanodeUuid();
+ storageTypes[i] = StorageType.DISK;
+ indices[i] = (i + 2) % GROUP_SIZE;
+ }
+ return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID),
+ locs, storageIDs, storageTypes, indices, 0, false, null);
+ }
+
+ @Test
+ public void testParseDummyStripedBlock() {
+ LocatedStripedBlock lsb = createDummyLocatedBlock();
+ LocatedBlock[] blocks = DFSStripedInputStream.parseStripedBlockGroup(
+ lsb, GROUP_SIZE, CELLSIZE);
+ assertEquals(GROUP_SIZE, blocks.length);
+ for (int j = 0; j < GROUP_SIZE; j++) {
+ assertFalse(blocks[j].isStriped());
+ assertEquals(j,
+ BlockIdManager.getBlockIndex(blocks[j].getBlock().getLocalBlock()));
+ assertEquals(j * CELLSIZE, blocks[j].getStartOffset());
+ }
+ }
+
+ @Test
+ public void testParseStripedBlock() throws Exception {
+ final int numBlocks = 4;
+ DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
+ NUM_STRIPE_PER_BLOCK);
+ LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+ filePath.toString(), 0, BLOCKSIZE * numBlocks);
+
+ assertEquals(4, lbs.locatedBlockCount());
+ List<LocatedBlock> lbList = lbs.getLocatedBlocks();
+ for (LocatedBlock lb : lbList) {
+ assertTrue(lb.isStriped());
+ }
+
+ for (int i = 0; i < numBlocks; i++) {
+ LocatedStripedBlock lsb = (LocatedStripedBlock) (lbs.get(i));
+ LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
+ GROUP_SIZE, CELLSIZE);
+ assertEquals(GROUP_SIZE, blks.length);
+ for (int j = 0; j < GROUP_SIZE; j++) {
+ assertFalse(blks[j].isStriped());
+ assertEquals(j,
+ BlockIdManager.getBlockIndex(blks[j].getBlock().getLocalBlock()));
+ assertEquals(i * BLOCKSIZE + j * CELLSIZE, blks[j].getStartOffset());
+ }
+ }
+ }
+
+ /**
+ * Test {@link DFSStripedInputStream#getBlockAt(long)}
+ */
+ @Test
+ public void testGetBlock() throws Exception {
+ final int numBlocks = 4;
+ DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
+ NUM_STRIPE_PER_BLOCK);
+ LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+ filePath.toString(), 0, BLOCKSIZE * numBlocks);
+ final DFSStripedInputStream in =
+ new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
+
+ List<LocatedBlock> lbList = lbs.getLocatedBlocks();
+ for (LocatedBlock aLbList : lbList) {
+ LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList;
+ LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
+ GROUP_SIZE, CELLSIZE);
+ for (int j = 0; j < GROUP_SIZE; j++) {
+ LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset());
+ assertEquals(blks[j].getBlock(), refreshed.getBlock());
+ assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
+ assertArrayEquals(blks[j].getLocations(), refreshed.getLocations());
+ }
+ }
+ }
+
+ @Test
+ public void testPread() throws Exception {
+ final int numBlocks = 4;
+ DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
+ NUM_STRIPE_PER_BLOCK);
+ LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+ filePath.toString(), 0, BLOCKSIZE);
+
+ assert lbs.get(0) instanceof LocatedStripedBlock;
+ LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
+ for (int i = 0; i < GROUP_SIZE; i++) {
+ Block blk = new Block(bg.getBlock().getBlockId() + i, BLOCKSIZE,
+ bg.getBlock().getGenerationStamp());
+ blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+ cluster.injectBlocks(i, Arrays.asList(blk),
+ bg.getBlock().getBlockPoolId());
+ }
+ DFSStripedInputStream in =
+ new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
+ in.setCellSize(CELLSIZE);
+ int readSize = BLOCKSIZE;
+ byte[] readBuffer = new byte[readSize];
+ int ret = in.read(0, readBuffer, 0, readSize);
+
+ assertEquals(readSize, ret);
+ // TODO: verify read results with patterned data from HDFS-8117
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
index d965ae7..b2ff6c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -18,15 +18,11 @@
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@@ -36,19 +32,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
-import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
-import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
-import java.util.UUID;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -84,83 +75,10 @@ public class TestRecoverStripedBlocks {
}
}
- public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
- int numBlocks) throws Exception {
- DistributedFileSystem dfs = cluster.getFileSystem();
- dfs.mkdirs(dir);
- dfs.getClient().getNamenode().createErasureCodingZone(dir.toString());
-
- FSDataOutputStream out = null;
- try {
- out = dfs.create(file, (short) 1); // create an empty file
-
- FSNamesystem ns = cluster.getNamesystem();
- FSDirectory fsdir = ns.getFSDirectory();
- INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
-
- ExtendedBlock previous = null;
- for (int i = 0; i < numBlocks; i++) {
- Block newBlock = createBlock(cluster.getDataNodes(), ns,
- file.toString(), fileNode, dfs.getClient().getClientName(),
- previous);
- previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
- }
-
- ns.completeFile(file.toString(), dfs.getClient().getClientName(),
- previous, fileNode.getId());
- } finally {
- IOUtils.cleanup(null, out);
- }
- }
-
- static Block createBlock(List<DataNode> dataNodes, FSNamesystem ns,
- String file, INodeFile fileNode, String clientName,
- ExtendedBlock previous) throws Exception {
- ns.getAdditionalBlock(file, fileNode.getId(), clientName, previous, null,
- null);
-
- final BlockInfo lastBlock = fileNode.getLastBlock();
- final int groupSize = fileNode.getBlockReplication();
- // 1. RECEIVING_BLOCK IBR
- int i = 0;
- for (DataNode dn : dataNodes) {
- if (i < groupSize) {
- final Block block = new Block(lastBlock.getBlockId() + i++, 0,
- lastBlock.getGenerationStamp());
- DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
- StorageReceivedDeletedBlocks[] reports = DFSTestUtil
- .makeReportForReceivedBlock(block,
- ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
- for (StorageReceivedDeletedBlocks report : reports) {
- ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
- }
- }
- }
-
- // 2. RECEIVED_BLOCK IBR
- i = 0;
- for (DataNode dn : dataNodes) {
- if (i < groupSize) {
- final Block block = new Block(lastBlock.getBlockId() + i++,
- BLOCK_STRIPED_CHUNK_SIZE, lastBlock.getGenerationStamp());
- DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
- StorageReceivedDeletedBlocks[] reports = DFSTestUtil
- .makeReportForReceivedBlock(block,
- ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
- for (StorageReceivedDeletedBlocks report : reports) {
- ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
- }
- }
- }
-
- lastBlock.setNumBytes(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS);
- return lastBlock;
- }
-
@Test
public void testMissingStripedBlock() throws Exception {
final int numBlocks = 4;
- createECFile(cluster, filePath, dirPath, numBlocks);
+ DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, 1);
// make sure the file is complete in NN
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
@@ -172,7 +90,7 @@ public class TestRecoverStripedBlocks {
for (BlockInfo blk : blocks) {
assertTrue(blk.isStriped());
assertTrue(blk.isComplete());
- assertEquals(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
+ assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
final BlockInfoStriped sb = (BlockInfoStriped) blk;
assertEquals(GROUP_SIZE, sb.numNodes());
}
[04/12] hadoop git commit: HADOOP-11818 Minor improvements for
erasurecode classes. Contributed by Rakesh R
Posted by zh...@apache.org.
HADOOP-11818 Minor improvements for erasurecode classes. Contributed by Rakesh R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cc2202c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cc2202c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cc2202c2
Branch: refs/heads/HDFS-7285
Commit: cc2202c276f71de4363c509a6223281d58f83f3f
Parents: d8c0ecf
Author: Kai Zheng <ka...@intel.com>
Authored: Fri Apr 10 04:31:48 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:19 2015 -0700
----------------------------------------------------------------------
.../hadoop-common/CHANGES-HDFS-EC-7285.txt | 2 ++
.../hadoop/io/erasurecode/SchemaLoader.java | 12 ++++++------
.../io/erasurecode/coder/RSErasureDecoder.java | 19 ++++++++++++++++++-
.../io/erasurecode/coder/RSErasureEncoder.java | 19 ++++++++++++++++++-
.../io/erasurecode/coder/XORErasureDecoder.java | 2 +-
.../io/erasurecode/rawcoder/util/RSUtil.java | 17 +++++++++++++++++
6 files changed, 62 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc2202c2/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
index c72394e..b850e11 100644
--- a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
@@ -40,3 +40,5 @@
HADOOP-11645. Erasure Codec API covering the essential aspects for an erasure code
( Kai Zheng via vinayakumarb )
+
+ HADOOP-11818. Minor improvements for erasurecode classes. (Rakesh R via Kai Zheng)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc2202c2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java
index c51ed37..75dd03a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java
@@ -17,8 +17,8 @@
*/
package org.apache.hadoop.io.erasurecode;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.w3c.dom.*;
@@ -36,7 +36,7 @@ import java.util.*;
* A EC schema loading utility that loads predefined EC schemas from XML file
*/
public class SchemaLoader {
- private static final Log LOG = LogFactory.getLog(SchemaLoader.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(SchemaLoader.class.getName());
/**
* Load predefined ec schemas from configuration file. This file is
@@ -63,7 +63,7 @@ public class SchemaLoader {
private List<ECSchema> loadSchema(File schemaFile)
throws ParserConfigurationException, IOException, SAXException {
- LOG.info("Loading predefined EC schema file " + schemaFile);
+ LOG.info("Loading predefined EC schema file {}", schemaFile);
// Read and parse the schema file.
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
@@ -87,7 +87,7 @@ public class SchemaLoader {
ECSchema schema = loadSchema(element);
schemas.add(schema);
} else {
- LOG.warn("Bad element in EC schema configuration file: " +
+ LOG.warn("Bad element in EC schema configuration file: {}",
element.getTagName());
}
}
@@ -109,7 +109,7 @@ public class SchemaLoader {
URL url = Thread.currentThread().getContextClassLoader()
.getResource(schemaFilePath);
if (url == null) {
- LOG.warn(schemaFilePath + " not found on the classpath.");
+ LOG.warn("{} not found on the classpath.", schemaFilePath);
schemaFile = null;
} else if (! url.getProtocol().equalsIgnoreCase("file")) {
throw new RuntimeException(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc2202c2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
index e2c5051..fc664a5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.io.erasurecode.coder;
import org.apache.hadoop.conf.Configuration;
@@ -11,7 +28,7 @@ import org.apache.hadoop.io.erasurecode.rawcoder.XORRawDecoder;
/**
* Reed-Solomon erasure decoder that decodes a block group.
*
- * It implements {@link ErasureDecoder}.
+ * It implements {@link ErasureCoder}.
*/
public class RSErasureDecoder extends AbstractErasureDecoder {
private RawErasureDecoder rsRawDecoder;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc2202c2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
index a7d02b5..18ca5ac 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.io.erasurecode.coder;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -9,7 +26,7 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
/**
* Reed-Solomon erasure encoder that encodes a block group.
*
- * It implements {@link ErasureEncoder}.
+ * It implements {@link ErasureCoder}.
*/
public class RSErasureEncoder extends AbstractErasureEncoder {
private RawErasureEncoder rawEncoder;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc2202c2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
index 6f4b423..0672549 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.io.erasurecode.rawcoder.XORRawDecoder;
/**
* Xor erasure decoder that decodes a block group.
*
- * It implements {@link ErasureDecoder}.
+ * It implements {@link ErasureCoder}.
*/
public class XORErasureDecoder extends AbstractErasureDecoder {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc2202c2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/RSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/RSUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/RSUtil.java
index 33ba561..8badf02 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/RSUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/RSUtil.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.io.erasurecode.rawcoder.util;
/**
[12/12] hadoop git commit: HDFS-8114. Erasure coding: Add auditlog
FSNamesystem#createErasureCodingZone if this operation fails. Contributed by
Rakesh R.
Posted by zh...@apache.org.
HDFS-8114. Erasure coding: Add auditlog FSNamesystem#createErasureCodingZone if this operation fails. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6e202bac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6e202bac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6e202bac
Branch: refs/heads/HDFS-7285
Commit: 6e202bac1aea7de46bd836d6039c4be543982170
Parents: 9708ae6
Author: Zhe Zhang <zh...@apache.org>
Authored: Mon Apr 13 11:15:02 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:21 2015 -0700
----------------------------------------------------------------------
.../hdfs/server/namenode/FSNamesystem.java | 21 ++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e202bac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index dfd0382..1a9c529 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -8116,11 +8116,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
SafeModeException, AccessControlException {
String src = srcArg;
HdfsFileStatus resultingStat = null;
- checkSuperuserPrivilege();
- checkOperation(OperationCategory.WRITE);
- final byte[][] pathComponents =
- FSDirectory.getPathComponentsForReservedPath(src);
- FSPermissionChecker pc = getPermissionChecker();
+ FSPermissionChecker pc = null;
+ byte[][] pathComponents = null;
+ boolean success = false;
+ try {
+ checkSuperuserPrivilege();
+ checkOperation(OperationCategory.WRITE);
+ pathComponents =
+ FSDirectory.getPathComponentsForReservedPath(src);
+ pc = getPermissionChecker();
+ } catch (Throwable e) {
+ logAuditEvent(success, "createErasureCodingZone", srcArg);
+ throw e;
+ }
writeLock();
try {
checkSuperuserPrivilege();
@@ -8134,11 +8142,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
final INodesInPath iip = dir.getINodesInPath4Write(src, false);
resultingStat = dir.getAuditFileInfo(iip);
+ success = true;
} finally {
writeUnlock();
}
getEditLog().logSync();
- logAuditEvent(true, "createErasureCodingZone", srcArg, null, resultingStat);
+ logAuditEvent(success, "createErasureCodingZone", srcArg, null, resultingStat);
}
/**
[08/12] hadoop git commit: HDFS-8122. Erasure Coding: Support
specifying ECSchema during creation of ECZone. Contributed by Vinayakumar B.
Posted by zh...@apache.org.
HDFS-8122. Erasure Coding: Support specifying ECSchema during creation of ECZone. Contributed by Vinayakumar B.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9708ae63
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9708ae63
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9708ae63
Branch: refs/heads/HDFS-7285
Commit: 9708ae637b51e9235318a1fae962f9d1f73132dd
Parents: f8992da
Author: Zhe Zhang <zh...@apache.org>
Authored: Mon Apr 13 11:08:57 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:20 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hdfs/DFSClient.java | 6 ++--
.../hadoop/hdfs/DistributedFileSystem.java | 33 ++++++++++++++++++++
.../hadoop/hdfs/protocol/ClientProtocol.java | 6 ++--
...tNamenodeProtocolServerSideTranslatorPB.java | 4 ++-
.../ClientNamenodeProtocolTranslatorPB.java | 5 ++-
.../namenode/ErasureCodingZoneManager.java | 30 +++++++++++++-----
.../hdfs/server/namenode/FSDirectory.java | 23 ++++++++------
.../hdfs/server/namenode/FSNamesystem.java | 19 ++++++-----
.../hdfs/server/namenode/NameNodeRpcServer.java | 6 ++--
.../src/main/proto/ClientNamenodeProtocol.proto | 1 +
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +-
.../hadoop/hdfs/TestDFSStripedOutputStream.java | 2 +-
.../hadoop/hdfs/TestErasureCodingZones.java | 18 +++++------
.../server/namenode/TestAddStripedBlocks.java | 2 +-
.../server/namenode/TestFSEditLogLoader.java | 4 +--
.../hdfs/server/namenode/TestFSImage.java | 4 +--
16 files changed, 112 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 6339d30..7ff9073 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1322,7 +1322,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
Progressable progress,
int buffersize,
ChecksumOpt checksumOpt) throws IOException {
- return create(src, permission, flag, createParent, replication, blockSize,
+ return create(src, permission, flag, createParent, replication, blockSize,
progress, buffersize, checksumOpt, null);
}
@@ -2966,12 +2966,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return new EncryptionZoneIterator(namenode, traceSampler);
}
- public void createErasureCodingZone(String src)
+ public void createErasureCodingZone(String src, ECSchema schema)
throws IOException {
checkOpen();
TraceScope scope = getPathTraceScope("createErasureCodingZone", src);
try {
- namenode.createErasureCodingZone(src);
+ namenode.createErasureCodingZone(src, schema);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
SafeModeException.class,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 21f5107..160aae3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
@@ -2217,4 +2218,36 @@ public class DistributedFileSystem extends FileSystem {
throws IOException {
return dfs.getInotifyEventStream(lastReadTxid);
}
+
+ /**
+ * Create the erasurecoding zone
+ *
+ * @param path Directory to create the ec zone
+ * @param schema ECSchema for the zone. If not specified default will be used.
+ * @throws IOException
+ */
+ public void createErasureCodingZone(final Path path, final ECSchema schema)
+ throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ dfs.createErasureCodingZone(getPathName(p), null);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p) throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem) fs;
+ myDfs.createErasureCodingZone(p, schema);
+ return null;
+ }
+ throw new UnsupportedOperationException(
+ "Cannot createErasureCodingZone through a symlink to a "
+ + "non-DistributedFileSystem: " + path + " -> " + p);
+ }
+ }.resolve(this, absF);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 7f5ac49..0c04ca9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -1364,11 +1364,11 @@ public interface ClientProtocol {
long prevId) throws IOException;
/**
- * Create an erasure coding zone (currently with hardcoded schema)
- * TODO: Configurable and pluggable schemas (HDFS-7337)
+ * Create an erasure coding zone with specified schema, if any, otherwise
+ * default
*/
@Idempotent
- public void createErasureCodingZone(String src)
+ public void createErasureCodingZone(String src, ECSchema schema)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 33207ab..cc5ca55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -1403,7 +1403,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, CreateErasureCodingZoneRequestProto req)
throws ServiceException {
try {
- server.createErasureCodingZone(req.getSrc());
+ ECSchema schema = req.hasSchema() ? PBHelper.convertECSchema(req
+ .getSchema()) : null;
+ server.createErasureCodingZone(req.getSrc(), schema);
return CreateErasureCodingZoneResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 0211522..2e17823 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -1420,11 +1420,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
- public void createErasureCodingZone(String src)
+ public void createErasureCodingZone(String src, ECSchema schema)
throws IOException {
final CreateErasureCodingZoneRequestProto.Builder builder =
CreateErasureCodingZoneRequestProto.newBuilder();
builder.setSrc(src);
+ if (schema != null) {
+ builder.setSchema(PBHelper.convertECSchema(schema));
+ }
CreateErasureCodingZoneRequestProto req = builder.build();
try {
rpcProxy.createErasureCodingZone(null, req);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
index 606e804..c7daa2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
@@ -22,6 +22,9 @@ import com.google.common.collect.Lists;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.XAttrHelper;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import java.io.IOException;
import java.util.ArrayList;
@@ -50,7 +53,11 @@ public class ErasureCodingZoneManager {
this.dir = dir;
}
- boolean getECPolicy(INodesInPath iip) {
+ boolean getECPolicy(INodesInPath iip) throws IOException {
+ return getECSchema(iip) != null;
+ }
+
+ ECSchema getECSchema(INodesInPath iip) throws IOException{
assert dir.hasReadLock();
Preconditions.checkNotNull(iip);
List<INode> inodes = iip.getReadOnlyINodes();
@@ -64,21 +71,23 @@ public class ErasureCodingZoneManager {
// EC
// TODO: properly support symlinks in EC zones
if (inode.isSymlink()) {
- return false;
+ return null;
}
final List<XAttr> xAttrs = inode.getXAttrFeature() == null ?
new ArrayList<XAttr>(0)
: inode.getXAttrFeature().getXAttrs();
for (XAttr xAttr : xAttrs) {
if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) {
- return true;
+ ECSchemaProto ecSchemaProto;
+ ecSchemaProto = ECSchemaProto.parseFrom(xAttr.getValue());
+ return PBHelper.convertECSchema(ecSchemaProto);
}
}
}
- return false;
+ return null;
}
- XAttr createErasureCodingZone(String src)
+ XAttr createErasureCodingZone(String src, ECSchema schema)
throws IOException {
assert dir.hasWriteLock();
final INodesInPath srcIIP = dir.getINodesInPath4Write(src, false);
@@ -97,8 +106,15 @@ public class ErasureCodingZoneManager {
throw new IOException("Directory " + src + " is already in an " +
"erasure coding zone.");
}
- final XAttr ecXAttr = XAttrHelper
- .buildXAttr(XATTR_ERASURECODING_ZONE, null);
+ // TODO HDFS-7859 Need to persist the schema in xattr in efficient way
+ // As of now storing the protobuf format
+ if (schema == null) {
+ schema = ECSchemaManager.getSystemDefaultSchema();
+ }
+ ECSchemaProto schemaProto = PBHelper.convertECSchema(schema);
+ byte[] schemaBytes = schemaProto.toByteArray();
+ final XAttr ecXAttr = XAttrHelper.buildXAttr(XATTR_ERASURECODING_ZONE,
+ schemaBytes);
final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
xattrs.add(ecXAttr);
FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index d68128a..188425e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -63,9 +63,9 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshotFeature;
import org.apache.hadoop.hdfs.util.ByteArray;
import org.apache.hadoop.hdfs.util.EnumCounters;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
@@ -414,11 +414,12 @@ public class FSDirectory implements Closeable {
* Add the given filename to the fs.
* @return the new INodesInPath instance that contains the new INode
*/
- INodesInPath addFile(INodesInPath existing, String localName, PermissionStatus
- permissions, short replication, long preferredBlockSize,
+ INodesInPath addFile(INodesInPath existing, String localName,
+ PermissionStatus permissions, short replication, long preferredBlockSize,
String clientName, String clientMachine)
- throws FileAlreadyExistsException, QuotaExceededException,
- UnresolvedLinkException, SnapshotAccessControlException, AclException {
+ throws FileAlreadyExistsException, QuotaExceededException,
+ UnresolvedLinkException, SnapshotAccessControlException, AclException,
+ IOException {
long modTime = now();
INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime,
@@ -1400,20 +1401,24 @@ public class FSDirectory implements Closeable {
}
}
- XAttr createErasureCodingZone(String src)
+ XAttr createErasureCodingZone(String src, ECSchema schema)
throws IOException {
writeLock();
try {
- return ecZoneManager.createErasureCodingZone(src);
+ return ecZoneManager.createErasureCodingZone(src, schema);
} finally {
writeUnlock();
}
}
- public boolean getECPolicy(INodesInPath iip) {
+ public boolean getECPolicy(INodesInPath iip) throws IOException {
+ return getECSchema(iip) != null;
+ }
+
+ ECSchema getECSchema(INodesInPath iip) throws IOException {
readLock();
try {
- return ecZoneManager.getECPolicy(iip);
+ return ecZoneManager.getECSchema(iip);
} finally {
readUnlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 6d89b0c..dfd0382 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -8103,16 +8103,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Create an erasure coding zone on directory src.
- *
+ * @param schema ECSchema for the erasure coding zone
* @param src the path of a directory which will be the root of the
* erasure coding zone. The directory must be empty.
+ *
* @throws AccessControlException if the caller is not the superuser.
* @throws UnresolvedLinkException if the path can't be resolved.
* @throws SafeModeException if the Namenode is in safe mode.
*/
- void createErasureCodingZone(final String srcArg,
- final boolean logRetryCache)
- throws IOException, UnresolvedLinkException,
+ void createErasureCodingZone(final String srcArg, final ECSchema schema,
+ final boolean logRetryCache) throws IOException, UnresolvedLinkException,
SafeModeException, AccessControlException {
String src = srcArg;
HdfsFileStatus resultingStat = null;
@@ -8128,7 +8128,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkNameNodeSafeMode("Cannot create erasure coding zone on " + src);
src = dir.resolvePath(pc, src, pathComponents);
- final XAttr ecXAttr = dir.createErasureCodingZone(src);
+ final XAttr ecXAttr = dir.createErasureCodingZone(src, schema);
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
xAttrs.add(ecXAttr);
getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
@@ -8158,11 +8158,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (isPermissionEnabled) {
dir.checkPathAccess(pc, iip, FsAction.READ);
}
- if (dir.getECPolicy(iip)) {
- // TODO HDFS-8074 and HDFS-7859 : To get from loaded schemas
- Map<String, String> options = new HashMap<String, String>();
- ECSchema defaultSchema = new ECSchema("RS-6-3", "rs", 6, 3, options);
- return new ECInfo(src, defaultSchema);
+ // Get schema set for the zone
+ ECSchema schema = dir.getECSchema(iip);
+ if (schema != null) {
+ return new ECInfo(src, schema);
}
} finally {
readUnlock();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 85ac1e5..5e01c77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1836,8 +1836,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // ClientProtocol
- public void createErasureCodingZone(String src)
- throws IOException {
+ public void createErasureCodingZone(String src, ECSchema schema)
+ throws IOException {
checkNNStartup();
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -1845,7 +1845,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
boolean success = false;
try {
- namesystem.createErasureCodingZone(src, cacheEntry != null);
+ namesystem.createErasureCodingZone(src, schema, cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, success);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index 3389a22..c9059bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -716,6 +716,7 @@ message GetEditsFromTxidResponseProto {
message CreateErasureCodingZoneRequestProto {
required string src = 1;
+ optional ECSchemaProto schema = 2;
}
message CreateErasureCodingZoneResponseProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index e3a85cc..4c80388 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1815,7 +1815,7 @@ public class DFSTestUtil {
int numBlocks, int numStripesPerBlk) throws Exception {
DistributedFileSystem dfs = cluster.getFileSystem();
dfs.mkdirs(dir);
- dfs.getClient().createErasureCodingZone(dir.toString());
+ dfs.getClient().createErasureCodingZone(dir.toString(), null);
FSDataOutputStream out = null;
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index ee6998b..c78922e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -50,7 +50,7 @@ public class TestDFSStripedOutputStream {
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, cellsInBlock * cellSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
- cluster.getFileSystem().getClient().createErasureCodingZone("/");
+ cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
fs = cluster.getFileSystem();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
index bdca915..699df4e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
@@ -64,7 +64,7 @@ public class TestErasureCodingZones {
fs.mkdir(testDir, FsPermission.getDirDefault());
/* Normal creation of an erasure coding zone */
- fs.getClient().createErasureCodingZone(testDir.toString());
+ fs.getClient().createErasureCodingZone(testDir.toString(), null);
/* Verify files under the zone are striped */
final Path ECFilePath = new Path(testDir, "foo");
@@ -77,7 +77,7 @@ public class TestErasureCodingZones {
fs.mkdir(notEmpty, FsPermission.getDirDefault());
fs.create(new Path(notEmpty, "foo"));
try {
- fs.getClient().createErasureCodingZone(notEmpty.toString());
+ fs.getClient().createErasureCodingZone(notEmpty.toString(), null);
fail("Erasure coding zone on non-empty dir");
} catch (IOException e) {
assertExceptionContains("erasure coding zone for a non-empty directory", e);
@@ -87,10 +87,10 @@ public class TestErasureCodingZones {
final Path zone1 = new Path("/zone1");
final Path zone2 = new Path(zone1, "zone2");
fs.mkdir(zone1, FsPermission.getDirDefault());
- fs.getClient().createErasureCodingZone(zone1.toString());
+ fs.getClient().createErasureCodingZone(zone1.toString(), null);
fs.mkdir(zone2, FsPermission.getDirDefault());
try {
- fs.getClient().createErasureCodingZone(zone2.toString());
+ fs.getClient().createErasureCodingZone(zone2.toString(), null);
fail("Nested erasure coding zones");
} catch (IOException e) {
assertExceptionContains("already in an erasure coding zone", e);
@@ -100,7 +100,7 @@ public class TestErasureCodingZones {
final Path fPath = new Path("/file");
fs.create(fPath);
try {
- fs.getClient().createErasureCodingZone(fPath.toString());
+ fs.getClient().createErasureCodingZone(fPath.toString(), null);
fail("Erasure coding zone on file");
} catch (IOException e) {
assertExceptionContains("erasure coding zone for a file", e);
@@ -113,8 +113,8 @@ public class TestErasureCodingZones {
final Path dstECDir = new Path("/dstEC");
fs.mkdir(srcECDir, FsPermission.getDirDefault());
fs.mkdir(dstECDir, FsPermission.getDirDefault());
- fs.getClient().createErasureCodingZone(srcECDir.toString());
- fs.getClient().createErasureCodingZone(dstECDir.toString());
+ fs.getClient().createErasureCodingZone(srcECDir.toString(), null);
+ fs.getClient().createErasureCodingZone(dstECDir.toString(), null);
final Path srcFile = new Path(srcECDir, "foo");
fs.create(srcFile);
@@ -158,7 +158,7 @@ public class TestErasureCodingZones {
// dir ECInfo before creating ec zone
assertNull(fs.getClient().getErasureCodingInfo(src));
// dir ECInfo after creating ec zone
- fs.getClient().createErasureCodingZone(src);
+ fs.getClient().createErasureCodingZone(src, null);
verifyErasureCodingInfo(src);
fs.create(new Path(ecDir, "/child1")).close();
// verify for the files in ec zone
@@ -182,4 +182,4 @@ public class TestErasureCodingZones {
assertEquals("Default chunkSize should be used",
ECSchema.DEFAULT_CHUNK_SIZE, schema.getChunkSize());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
index c3c8239..27df1cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
@@ -68,7 +68,7 @@ public class TestAddStripedBlocks {
.numDataNodes(GROUP_SIZE).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
- dfs.getClient().createErasureCodingZone("/");
+ dfs.getClient().createErasureCodingZone("/", null);
}
@After
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
index 0eeb7f8..c18fd5d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
@@ -444,7 +444,7 @@ public class TestFSEditLogLoader {
//set the storage policy of the directory
fs.mkdir(new Path(testDir), new FsPermission("755"));
- fs.getClient().getNamenode().createErasureCodingZone(testDir);
+ fs.getClient().getNamenode().createErasureCodingZone(testDir, null);
// Create a file with striped block
Path p = new Path(testFilePath);
@@ -516,7 +516,7 @@ public class TestFSEditLogLoader {
//set the storage policy of the directory
fs.mkdir(new Path(testDir), new FsPermission("755"));
- fs.getClient().getNamenode().createErasureCodingZone(testDir);
+ fs.getClient().getNamenode().createErasureCodingZone(testDir, null);
//create a file with striped blocks
Path p = new Path(testFilePath);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
index a456cad..fe130a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
@@ -135,7 +135,7 @@ public class TestFSImage {
private void testSaveAndLoadStripedINodeFile(FSNamesystem fsn, Configuration conf,
boolean isUC) throws IOException{
// contruct a INode with StripedBlock for saving and loading
- fsn.createErasureCodingZone("/", false);
+ fsn.createErasureCodingZone("/", null, false);
long id = 123456789;
byte[] name = "testSaveAndLoadInodeFile_testfile".getBytes();
PermissionStatus permissionStatus = new PermissionStatus("testuser_a",
@@ -397,7 +397,7 @@ public class TestFSImage {
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
- fs.getClient().getNamenode().createErasureCodingZone("/");
+ fs.getClient().getNamenode().createErasureCodingZone("/", null);
Path file = new Path("/striped");
FSDataOutputStream out = fs.create(file);
byte[] bytes = DFSTestUtil.generateSequentialBytes(0, BLOCK_SIZE);
[05/12] hadoop git commit: HDFS-8077. Erasure coding: fix bugs in EC
zone and symlinks. Contributed by Jing Zhao and Zhe Zhang.
Posted by zh...@apache.org.
HDFS-8077. Erasure coding: fix bugs in EC zone and symlinks. Contributed by Jing Zhao and Zhe Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58d9f264
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58d9f264
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58d9f264
Branch: refs/heads/HDFS-7285
Commit: 58d9f264645bbb10c522ce5df7d69ed6ec8b3e2d
Parents: cc2202c
Author: Jing Zhao <ji...@apache.org>
Authored: Thu Apr 9 17:53:22 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:19 2015 -0700
----------------------------------------------------------------------
.../BlockInfoStripedUnderConstruction.java | 2 +-
.../hdfs/server/blockmanagement/BlockManager.java | 12 ++++++------
.../server/namenode/ErasureCodingZoneManager.java | 7 +++++++
.../hadoop/hdfs/server/namenode/FSDirectory.java | 4 ++--
.../hdfs/server/namenode/FSEditLogLoader.java | 11 ++++++-----
.../hdfs/server/namenode/FSImageSerialization.java | 4 ++--
.../hadoop/hdfs/server/namenode/INodeFile.java | 17 ++++-------------
.../hdfs/server/namenode/TestFSEditLogLoader.java | 4 ++--
.../hadoop/hdfs/server/namenode/TestFSImage.java | 2 +-
.../server/namenode/TestRecoverStripedBlocks.java | 2 +-
10 files changed, 32 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
index cfaf3a0..0373314 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
@@ -96,7 +96,7 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped
for(int i = 0; i < numLocations; i++) {
// when creating a new block we simply sequentially assign block index to
// each storage
- Block blk = new Block(this.getBlockId() + i, this.getGenerationStamp(), 0);
+ Block blk = new Block(this.getBlockId() + i, 0, this.getGenerationStamp());
replicas[i] = new ReplicaUnderConstruction(blk, targets[i],
ReplicaState.RBW);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index fcf1421..94aafc7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2450,12 +2450,12 @@ public class BlockManager {
case COMMITTED:
if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
final long reportedGS = reported.getGenerationStamp();
- return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
+ return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS,
"block is " + ucState + " and reported genstamp " + reportedGS
+ " does not match genstamp in block map "
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
- return new BlockToMarkCorrupt(reported, storedBlock,
+ return new BlockToMarkCorrupt(new Block(reported), storedBlock,
"block is " + ucState + " and reported length " +
reported.getNumBytes() + " does not match " +
"length in block map " + storedBlock.getNumBytes(),
@@ -2466,7 +2466,7 @@ public class BlockManager {
case UNDER_CONSTRUCTION:
if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
final long reportedGS = reported.getGenerationStamp();
- return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
+ return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS,
"block is " + ucState + " and reported state " + reportedState
+ ", But reported genstamp " + reportedGS
+ " does not match genstamp in block map "
@@ -2482,7 +2482,7 @@ public class BlockManager {
return null; // not corrupt
} else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
final long reportedGS = reported.getGenerationStamp();
- return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
+ return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS,
"reported " + reportedState + " replica with genstamp " + reportedGS
+ " does not match COMPLETE block's genstamp in block map "
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
@@ -2497,7 +2497,7 @@ public class BlockManager {
"complete with the same genstamp");
return null;
} else {
- return new BlockToMarkCorrupt(reported, storedBlock,
+ return new BlockToMarkCorrupt(new Block(reported), storedBlock,
"reported replica has invalid state " + reportedState,
Reason.INVALID_STATE);
}
@@ -2510,7 +2510,7 @@ public class BlockManager {
" on " + dn + " size " + storedBlock.getNumBytes();
// log here at WARN level since this is really a broken HDFS invariant
LOG.warn(msg);
- return new BlockToMarkCorrupt(reported, storedBlock, msg,
+ return new BlockToMarkCorrupt(new Block(reported), storedBlock, msg,
Reason.INVALID_STATE);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
index d4ff7c5..606e804 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
@@ -59,6 +59,13 @@ public class ErasureCodingZoneManager {
if (inode == null) {
continue;
}
+ // We don't allow symlinks in an EC zone, or pointing to a file/dir in
+ // an EC. Therefore if a symlink is encountered, the dir shouldn't have
+ // EC
+ // TODO: properly support symlinks in EC zones
+ if (inode.isSymlink()) {
+ return false;
+ }
final List<XAttr> xAttrs = inode.getXAttrFeature() == null ?
new ArrayList<XAttr>(0)
: inode.getXAttrFeature().getXAttrs();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 53d2040..d68128a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -468,8 +468,8 @@ public class FSDirectory implements Closeable {
try {
INodesInPath iip = addINode(existing, newNode);
if (iip != null) {
- // TODO: we will no longer use storage policy for "Erasure Coding Zone"
- if (newNode.isStriped()) {
+ // check if the file is in an EC zone
+ if (getECPolicy(iip)) {
newNode.addStripedBlocksFeature();
}
if (aclEntries != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 89cfe05..f530772 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -417,7 +417,7 @@ public class FSEditLogLoader {
newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
// TODO whether the file is striped should later be retrieved from iip
- updateBlocks(fsDir, addCloseOp, iip, newFile, newFile.isStriped());
+ updateBlocks(fsDir, addCloseOp, iip, newFile, fsDir.getECPolicy(iip));
break;
}
case OP_CLOSE: {
@@ -438,7 +438,7 @@ public class FSEditLogLoader {
file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
// TODO whether the file is striped should later be retrieved from iip
- updateBlocks(fsDir, addCloseOp, iip, file, file.isStriped());
+ updateBlocks(fsDir, addCloseOp, iip, file, fsDir.getECPolicy(iip));
// Now close the file
if (!file.isUnderConstruction() &&
@@ -497,7 +497,7 @@ public class FSEditLogLoader {
INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
// Update in-memory data structures
// TODO whether the file is striped should later be retrieved from iip
- updateBlocks(fsDir, updateOp, iip, oldFile, oldFile.isStriped());
+ updateBlocks(fsDir, updateOp, iip, oldFile, fsDir.getECPolicy(iip));
if (toAddRetryCache) {
fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
@@ -511,10 +511,11 @@ public class FSEditLogLoader {
FSNamesystem.LOG.debug(op.opCode + ": " + path +
" new block id : " + addBlockOp.getLastBlock().getBlockId());
}
- INodeFile oldFile = INodeFile.valueOf(fsDir.getINode(path), path);
+ INodesInPath iip = fsDir.getINodesInPath(path, true);
+ INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
// add the new block to the INodeFile
// TODO whether the file is striped should later be retrieved from iip
- addNewBlock(addBlockOp, oldFile, oldFile.isStriped());
+ addNewBlock(addBlockOp, oldFile, fsDir.getECPolicy(iip));
break;
}
case OP_SET_REPLICATION: {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index 1e58858..58244e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@ -207,7 +207,7 @@ public class FSImageSerialization {
out.writeLong(cons.getModificationTime());
out.writeLong(cons.getPreferredBlockSize());
// whether the file has striped blocks
- out.writeBoolean(cons.isWithStripedBlocks());
+ out.writeBoolean(cons.isStriped());
writeBlocks(cons.getBlocks(), out);
cons.getPermissionStatus().write(out);
@@ -233,7 +233,7 @@ public class FSImageSerialization {
out.writeLong(file.getAccessTime());
out.writeLong(file.getPreferredBlockSize());
// whether the file has striped blocks
- out.writeBoolean(file.isWithStripedBlocks());
+ out.writeBoolean(file.isStriped());
writeBlocks(file.getBlocks(), out);
SnapshotFSImageFormat.saveFileDiffList(file, out);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index f95e54e..b5c510e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -185,17 +185,13 @@ public class INodeFile extends INodeWithAdditionalFields
public FileWithStripedBlocksFeature addStripedBlocksFeature() {
assert blocks == null || blocks.length == 0:
"The file contains contiguous blocks";
- assert !isWithStripedBlocks();
+ assert !isStriped();
this.setFileReplication((short) 0);
FileWithStripedBlocksFeature sb = new FileWithStripedBlocksFeature();
addFeature(sb);
return sb;
}
- public boolean isWithStripedBlocks() {
- return getStripedBlocksFeature() != null;
- }
-
/** Used to make sure there is no contiguous block related info */
private boolean hasNoContiguousBlock() {
return (blocks == null || blocks.length == 0) && getFileReplication() == 0;
@@ -431,7 +427,7 @@ public class INodeFile extends INodeWithAdditionalFields
/** Set the replication factor of this file. */
public final INodeFile setFileReplication(short replication,
int latestSnapshotId) throws QuotaExceededException {
- Preconditions.checkState(!isWithStripedBlocks(),
+ Preconditions.checkState(!isStriped(),
"Cannot set replication to a file with striped blocks");
recordModification(latestSnapshotId);
setFileReplication(replication);
@@ -653,7 +649,7 @@ public class INodeFile extends INodeWithAdditionalFields
long nsDelta = 1;
final long ssDeltaNoReplication;
short replication;
- if (isWithStripedBlocks()) {
+ if (isStriped()) {
return computeQuotaUsageWithStriped(bsps, counts);
}
FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
@@ -695,11 +691,6 @@ public class INodeFile extends INodeWithAdditionalFields
/**
* Compute quota of striped file
- * @param bsps
- * @param counts
- * @param useCache
- * @param lastSnapshotId
- * @return quota counts
*/
public final QuotaCounts computeQuotaUsageWithStriped(
BlockStoragePolicySuite bsps, QuotaCounts counts) {
@@ -828,7 +819,7 @@ public class INodeFile extends INodeWithAdditionalFields
* Use preferred block size for the last block if it is under construction.
*/
public final long storagespaceConsumed() {
- if (isWithStripedBlocks()) {
+ if (isStriped()) {
return storagespaceConsumedWithStriped();
} else {
return storagespaceConsumedNoReplication() * getBlockReplication();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
index 407d07e..0eeb7f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
@@ -472,7 +472,7 @@ public class TestFSEditLogLoader {
INodeFile inodeLoaded = (INodeFile)fns.getFSDirectory()
.getINode(testFilePath);
- assertTrue(inodeLoaded.isWithStripedBlocks());
+ assertTrue(inodeLoaded.isStriped());
BlockInfoStriped[] blks = (BlockInfoStriped[])inodeLoaded.getBlocks();
assertEquals(1, blks.length);
@@ -551,7 +551,7 @@ public class TestFSEditLogLoader {
INodeFile inodeLoaded = (INodeFile)fns.getFSDirectory()
.getINode(testFilePath);
- assertTrue(inodeLoaded.isWithStripedBlocks());
+ assertTrue(inodeLoaded.isStriped());
BlockInfoStriped[] blks = (BlockInfoStriped[])inodeLoaded.getBlocks();
assertEquals(1, blks.length);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
index 83f01c6..a456cad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
@@ -199,7 +199,7 @@ public class TestFSImage {
assertEquals(mtime, fileByLoaded.getModificationTime());
assertEquals(isUC ? mtime : atime, fileByLoaded.getAccessTime());
assertEquals(0, fileByLoaded.getContiguousBlocks().length);
- assertEquals(0, fileByLoaded.getBlockReplication());
+ assertEquals(0, fileByLoaded.getFileReplication());
assertEquals(preferredBlockSize, fileByLoaded.getPreferredBlockSize());
//check the BlockInfoStriped
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
index b2ff6c8..4292f9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -84,7 +84,7 @@ public class TestRecoverStripedBlocks {
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
.getINode4Write(filePath.toString()).asFile();
assertFalse(fileNode.isUnderConstruction());
- assertTrue(fileNode.isWithStripedBlocks());
+ assertTrue(fileNode.isStriped());
BlockInfo[] blocks = fileNode.getBlocks();
assertEquals(numBlocks, blocks.length);
for (BlockInfo blk : blocks) {
[11/12] hadoop git commit: HDFS-8090. Erasure Coding: Add RPC to
client-namenode to list all ECSchemas loaded in Namenode. (Contributed by
Vinayakumar B)
Posted by zh...@apache.org.
HDFS-8090. Erasure Coding: Add RPC to client-namenode to list all ECSchemas loaded in Namenode. (Contributed by Vinayakumar B)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/06562136
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/06562136
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/06562136
Branch: refs/heads/HDFS-7285
Commit: 065621364592d68fabc98d2f8f7aca50d2cbdd8b
Parents: 0331f20
Author: Vinayakumar B <vi...@apache.org>
Authored: Fri Apr 10 15:07:32 2015 +0530
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:20 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/io/erasurecode/ECSchema.java | 4 +-
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 5 +-
.../java/org/apache/hadoop/hdfs/DFSClient.java | 11 ++++
.../hadoop/hdfs/protocol/ClientProtocol.java | 10 ++++
...tNamenodeProtocolServerSideTranslatorPB.java | 19 +++++++
.../ClientNamenodeProtocolTranslatorPB.java | 26 ++++++++-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 5 +-
.../hdfs/server/namenode/FSNamesystem.java | 17 ++++++
.../hdfs/server/namenode/NameNodeRpcServer.java | 9 +++-
.../src/main/proto/ClientNamenodeProtocol.proto | 9 ++++
.../hadoop-hdfs/src/main/proto/hdfs.proto | 3 +-
.../org/apache/hadoop/hdfs/TestECSchemas.java | 57 ++++++++++++++++++++
12 files changed, 164 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
index 8c3310e..32077f6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
@@ -123,12 +123,12 @@ public final class ECSchema {
this.chunkSize = DEFAULT_CHUNK_SIZE;
try {
- if (options.containsKey(CHUNK_SIZE_KEY)) {
+ if (this.options.containsKey(CHUNK_SIZE_KEY)) {
this.chunkSize = Integer.parseInt(options.get(CHUNK_SIZE_KEY));
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Option value " +
- options.get(CHUNK_SIZE_KEY) + " for " + CHUNK_SIZE_KEY +
+ this.options.get(CHUNK_SIZE_KEY) + " for " + CHUNK_SIZE_KEY +
" is found. It should be an integer");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 753795a..5250dfa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -58,4 +58,7 @@
HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
- HDFS-7889. Subclass DFSOutputStream to support writing striping layout files. (Li Bo via Kai Zheng)
\ No newline at end of file
+ HDFS-7889. Subclass DFSOutputStream to support writing striping layout files. (Li Bo via Kai Zheng)
+
+ HDFS-8090. Erasure Coding: Add RPC to client-namenode to list all
+ ECSchemas loaded in Namenode. (vinayakumarb)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index b2a69dd..6339d30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -163,6 +163,7 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -3105,6 +3106,16 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
+ public ECSchema[] getECSchemas() throws IOException {
+ checkOpen();
+ TraceScope scope = Trace.startSpan("getECSchemas", traceSampler);
+ try {
+ return namenode.getECSchemas();
+ } finally {
+ scope.close();
+ }
+ }
+
public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
return new DFSInotifyEventInputStream(traceSampler, namenode);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 45d92f3..7f5ac49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.security.AccessControlException;
@@ -1474,4 +1475,13 @@ public interface ClientProtocol {
*/
@Idempotent
public ECInfo getErasureCodingInfo(String src) throws IOException;
+
+ /**
+ * Gets list of ECSchemas loaded in Namenode
+ *
+ * @return Returns the list of ECSchemas loaded at Namenode
+ * @throws IOException
+ */
+ @Idempotent
+ public ECSchema[] getECSchemas() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 1493e52..33207ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -106,6 +106,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDat
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetECSchemasRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetECSchemasResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoRequestProto;
@@ -218,6 +220,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
@@ -1530,4 +1533,20 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
throw new ServiceException(e);
}
}
+
+ @Override
+ public GetECSchemasResponseProto getECSchemas(RpcController controller,
+ GetECSchemasRequestProto request) throws ServiceException {
+ try {
+ ECSchema[] ecSchemas = server.getECSchemas();
+ GetECSchemasResponseProto.Builder resBuilder = GetECSchemasResponseProto
+ .newBuilder();
+ for (ECSchema ecSchema : ecSchemas) {
+ resBuilder.addSchemas(PBHelper.convertECSchema(ecSchema));
+ }
+ return resBuilder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 568da68..0211522 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -107,6 +107,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDat
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetECSchemasRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetECSchemasResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoResponseProto;
@@ -165,10 +167,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Update
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateErasureCodingZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateErasureCodingZoneResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
+import org.apache.hadoop.hdfs.protocol.proto.*;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
@@ -180,6 +183,7 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
@@ -237,6 +241,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
VOID_GET_STORAGE_POLICIES_REQUEST =
GetStoragePoliciesRequestProto.newBuilder().build();
+ private final static GetECSchemasRequestProto
+ VOID_GET_ECSCHEMAS_REQUEST = GetECSchemasRequestProto
+ .newBuilder().build();
+
public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) {
rpcProxy = proxy;
}
@@ -1550,4 +1558,20 @@ public class ClientNamenodeProtocolTranslatorPB implements
throw ProtobufHelper.getRemoteException(e);
}
}
+
+ @Override
+ public ECSchema[] getECSchemas() throws IOException {
+ try {
+ GetECSchemasResponseProto response = rpcProxy.getECSchemas(null,
+ VOID_GET_ECSCHEMAS_REQUEST);
+ ECSchema[] schemas = new ECSchema[response.getSchemasCount()];
+ int i = 0;
+ for (ECSchemaProto schemaProto : response.getSchemasList()) {
+ schemas[i++] = PBHelper.convertECSchema(schemaProto);
+ }
+ return schemas;
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index afdb302..3d0e9bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -3122,8 +3122,6 @@ public class PBHelper {
for (ECSchemaOptionEntryProto option : optionsList) {
options.put(option.getKey(), option.getValue());
}
- // include chunksize in options.
- options.put(ECSchema.CHUNK_SIZE_KEY, String.valueOf(schema.getChunkSize()));
return new ECSchema(schema.getSchemaName(), schema.getCodecName(),
schema.getDataUnits(), schema.getParityUnits(), options);
}
@@ -3133,8 +3131,7 @@ public class PBHelper {
.setSchemaName(schema.getSchemaName())
.setCodecName(schema.getCodecName())
.setDataUnits(schema.getNumDataUnits())
- .setParityUnits(schema.getNumParityUnits())
- .setChunkSize(schema.getChunkSize());
+ .setParityUnits(schema.getNumParityUnits());
Set<Entry<String, String>> entrySet = schema.getOptions().entrySet();
for (Entry<String, String> entry : entrySet) {
builder.addOptions(ECSchemaOptionEntryProto.newBuilder()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 4c86bd3..a504907 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -8169,6 +8169,23 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return null;
}
+ /**
+ * Get available ECSchemas
+ */
+ ECSchema[] getECSchemas() throws IOException {
+ checkOperation(OperationCategory.READ);
+ waitForLoadingFSImage();
+ readLock();
+ try {
+ checkOperation(OperationCategory.READ);
+ // TODO HDFS-7866 Need to return all schemas maintained by Namenode
+ ECSchema defaultSchema = ECSchemaManager.getSystemDefaultSchema();
+ return new ECSchema[] { defaultSchema };
+ } finally {
+ readUnlock();
+ }
+ }
+
void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
boolean logRetryCache)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index fe9b901..85ac1e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -142,6 +142,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RetryCache;
@@ -2044,9 +2045,15 @@ class NameNodeRpcServer implements NamenodeProtocols {
nn.spanReceiverHost.removeSpanReceiver(id);
}
- @Override // ClientNameNodeProtocol
+ @Override // ClientProtocol
public ECInfo getErasureCodingInfo(String src) throws IOException {
checkNNStartup();
return namesystem.getErasureCodingInfo(src);
}
+
+ @Override // ClientProtocol
+ public ECSchema[] getECSchemas() throws IOException {
+ checkNNStartup();
+ return namesystem.getECSchemas();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index 9488aed..3389a22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -729,6 +729,13 @@ message GetErasureCodingInfoResponseProto {
optional ECInfoProto ECInfo = 1;
}
+message GetECSchemasRequestProto { // void request
+}
+
+message GetECSchemasResponseProto {
+ repeated ECSchemaProto schemas = 1;
+}
+
service ClientNamenodeProtocol {
rpc getBlockLocations(GetBlockLocationsRequestProto)
returns(GetBlockLocationsResponseProto);
@@ -879,4 +886,6 @@ service ClientNamenodeProtocol {
returns(GetEditsFromTxidResponseProto);
rpc getErasureCodingInfo(GetErasureCodingInfoRequestProto)
returns(GetErasureCodingInfoResponseProto);
+ rpc getECSchemas(GetECSchemasRequestProto)
+ returns(GetECSchemasResponseProto);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index 1314ea0..0507538 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -637,8 +637,7 @@ message ECSchemaProto {
required string codecName = 2;
required uint32 dataUnits = 3;
required uint32 parityUnits = 4;
- required uint32 chunkSize = 5;
- repeated ECSchemaOptionEntryProto options = 6;
+ repeated ECSchemaOptionEntryProto options = 5;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java
new file mode 100644
index 0000000..07e1359
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestECSchemas {
+ private MiniDFSCluster cluster;
+
+ @Before
+ public void before() throws IOException {
+ cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(0)
+ .build();
+ cluster.waitActive();
+ }
+
+ @After
+ public void after() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testGetECSchemas() throws Exception {
+ ECSchema[] ecSchemas = cluster.getFileSystem().getClient().getECSchemas();
+ // TODO update assertion after HDFS-7866
+ assertNotNull(ecSchemas);
+ assertEquals("Should have only one ecSchema", 1, ecSchemas.length);
+ assertEquals("Returned schemas should have only default schema",
+ ECSchemaManager.getSystemDefaultSchema(), ecSchemas[0]);
+ }
+}
[06/12] hadoop git commit: HDFS-8104 Make hard-coded values
consistent with the system default schema first before remove them.
Contributed by Kai Zheng
Posted by zh...@apache.org.
HDFS-8104 Make hard-coded values consistent with the system default schema first before remove them. Contributed by Kai Zheng
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d8c0ecfb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d8c0ecfb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d8c0ecfb
Branch: refs/heads/HDFS-7285
Commit: d8c0ecfb0be9c998a32480d14b392a378cc9e6a9
Parents: cc11b5c
Author: Kai Zheng <ka...@intel.com>
Authored: Fri Apr 10 00:16:28 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:19 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 4 +-
.../hadoop/hdfs/protocol/HdfsConstants.java | 12 +-
.../hadoop/hdfs/TestPlanReadPortions.java | 142 +++++++++++++++++++
.../apache/hadoop/hdfs/TestReadStripedFile.java | 112 ---------------
4 files changed, 154 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8c0ecfb/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 5078a15..1e695c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -54,4 +54,6 @@
HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from
NameNode (vinayakumarb)
- HDFS-8074. Define a system-wide default EC schema. (Kai Zheng)
\ No newline at end of file
+ HDFS-8074. Define a system-wide default EC schema. (Kai Zheng)
+
+ HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8c0ecfb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index a888aa4..11c5260 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -180,11 +180,17 @@ public class HdfsConstants {
public static final byte WARM_STORAGE_POLICY_ID = 5;
public static final byte COLD_STORAGE_POLICY_ID = 2;
- public static final byte NUM_DATA_BLOCKS = 3;
- public static final byte NUM_PARITY_BLOCKS = 2;
+
public static final long BLOCK_GROUP_INDEX_MASK = 15;
public static final byte MAX_BLOCKS_IN_GROUP = 16;
+ /*
+ * These values correspond to the values used by the system default schema.
+ * TODO: to be removed once all places use schema.
+ */
+
+ public static final byte NUM_DATA_BLOCKS = 6;
+ public static final byte NUM_PARITY_BLOCKS = 3;
// The chunk size for striped block which is used by erasure coding
- public static final int BLOCK_STRIPED_CELL_SIZE = 128 * 1024;
+ public static final int BLOCK_STRIPED_CELL_SIZE = 256 * 1024;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8c0ecfb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
new file mode 100644
index 0000000..cf84b30
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.junit.Test;
+
+import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion;
+import static org.junit.Assert.*;
+
+public class TestPlanReadPortions {
+
+ // We only support this as num of data blocks. It might be good enough for now
+ // for the purpose, even not flexible yet for any number in a schema.
+ private final short GROUP_SIZE = 3;
+ private final int CELLSIZE = 128 * 1024;
+
+ private void testPlanReadPortions(int startInBlk, int length,
+ int bufferOffset, int[] readLengths, int[] offsetsInBlock,
+ int[][] bufferOffsets, int[][] bufferLengths) {
+ ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE,
+ CELLSIZE, startInBlk, length, bufferOffset);
+ assertEquals(GROUP_SIZE, results.length);
+
+ for (int i = 0; i < GROUP_SIZE; i++) {
+ assertEquals(readLengths[i], results[i].getReadLength());
+ assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
+ final int[] bOffsets = results[i].getOffsets();
+ assertArrayEquals(bufferOffsets[i], bOffsets);
+ final int[] bLengths = results[i].getLengths();
+ assertArrayEquals(bufferLengths[i], bLengths);
+ }
+ }
+
+ /**
+ * Test {@link DFSStripedInputStream#planReadPortions}
+ */
+ @Test
+ public void testPlanReadPortions() {
+ /**
+ * start block offset is 0, read cellSize - 10
+ */
+ testPlanReadPortions(0, CELLSIZE - 10, 0,
+ new int[]{CELLSIZE - 10, 0, 0}, new int[]{0, 0, 0},
+ new int[][]{new int[]{0}, new int[]{}, new int[]{}},
+ new int[][]{new int[]{CELLSIZE - 10}, new int[]{}, new int[]{}});
+
+ /**
+ * start block offset is 0, read 3 * cellSize
+ */
+ testPlanReadPortions(0, GROUP_SIZE * CELLSIZE, 0,
+ new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, new int[]{0, 0, 0},
+ new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{CELLSIZE * 2}},
+ new int[][]{new int[]{CELLSIZE}, new int[]{CELLSIZE}, new int[]{CELLSIZE}});
+
+ /**
+ * start block offset is 0, read cellSize + 10
+ */
+ testPlanReadPortions(0, CELLSIZE + 10, 0,
+ new int[]{CELLSIZE, 10, 0}, new int[]{0, 0, 0},
+ new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{}},
+ new int[][]{new int[]{CELLSIZE}, new int[]{10}, new int[]{}});
+
+ /**
+ * start block offset is 0, read 5 * cellSize + 10, buffer start offset is 100
+ */
+ testPlanReadPortions(0, 5 * CELLSIZE + 10, 100,
+ new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE + 10}, new int[]{0, 0, 0},
+ new int[][]{new int[]{100, 100 + CELLSIZE * GROUP_SIZE},
+ new int[]{100 + CELLSIZE, 100 + CELLSIZE * 4},
+ new int[]{100 + CELLSIZE * 2, 100 + CELLSIZE * 5}},
+ new int[][]{new int[]{CELLSIZE, CELLSIZE},
+ new int[]{CELLSIZE, CELLSIZE},
+ new int[]{CELLSIZE, 10}});
+
+ /**
+ * start block offset is 2, read 3 * cellSize
+ */
+ testPlanReadPortions(2, GROUP_SIZE * CELLSIZE, 100,
+ new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
+ new int[]{2, 0, 0},
+ new int[][]{new int[]{100, 100 + GROUP_SIZE * CELLSIZE - 2},
+ new int[]{100 + CELLSIZE - 2},
+ new int[]{100 + CELLSIZE * 2 - 2}},
+ new int[][]{new int[]{CELLSIZE - 2, 2},
+ new int[]{CELLSIZE},
+ new int[]{CELLSIZE}});
+
+ /**
+ * start block offset is 2, read 3 * cellSize + 10
+ */
+ testPlanReadPortions(2, GROUP_SIZE * CELLSIZE + 10, 0,
+ new int[]{CELLSIZE + 10, CELLSIZE, CELLSIZE},
+ new int[]{2, 0, 0},
+ new int[][]{new int[]{0, GROUP_SIZE * CELLSIZE - 2},
+ new int[]{CELLSIZE - 2},
+ new int[]{CELLSIZE * 2 - 2}},
+ new int[][]{new int[]{CELLSIZE - 2, 12},
+ new int[]{CELLSIZE},
+ new int[]{CELLSIZE}});
+
+ /**
+ * start block offset is cellSize * 2 - 1, read 5 * cellSize + 10
+ */
+ testPlanReadPortions(CELLSIZE * 2 - 1, 5 * CELLSIZE + 10, 0,
+ new int[]{CELLSIZE * 2, CELLSIZE + 10, CELLSIZE * 2},
+ new int[]{CELLSIZE, CELLSIZE - 1, 0},
+ new int[][]{new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1},
+ new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1},
+ new int[]{1, 3 * CELLSIZE + 1}},
+ new int[][]{new int[]{CELLSIZE, CELLSIZE},
+ new int[]{1, CELLSIZE, 9},
+ new int[]{CELLSIZE, CELLSIZE}});
+
+ /**
+ * start block offset is cellSize * 6 - 1, read 7 * cellSize + 10
+ */
+ testPlanReadPortions(CELLSIZE * 6 - 1, 7 * CELLSIZE + 10, 0,
+ new int[]{CELLSIZE * 3, CELLSIZE * 2 + 9, CELLSIZE * 2 + 1},
+ new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE * 2 - 1},
+ new int[][]{new int[]{1, 3 * CELLSIZE + 1, 6 * CELLSIZE + 1},
+ new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1, 7 * CELLSIZE + 1},
+ new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}},
+ new int[][]{new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
+ new int[]{CELLSIZE, CELLSIZE, 9},
+ new int[]{1, CELLSIZE, CELLSIZE}});
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8c0ecfb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
index 0032bdd..849e12e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
-import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -78,117 +77,6 @@ public class TestReadStripedFile {
}
}
- private void testPlanReadPortions(int startInBlk, int length,
- int bufferOffset, int[] readLengths, int[] offsetsInBlock,
- int[][] bufferOffsets, int[][] bufferLengths) {
- ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE,
- CELLSIZE, startInBlk, length, bufferOffset);
- assertEquals(GROUP_SIZE, results.length);
-
- for (int i = 0; i < GROUP_SIZE; i++) {
- assertEquals(readLengths[i], results[i].getReadLength());
- assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
- final int[] bOffsets = results[i].getOffsets();
- assertArrayEquals(bufferOffsets[i], bOffsets);
- final int[] bLengths = results[i].getLengths();
- assertArrayEquals(bufferLengths[i], bLengths);
- }
- }
-
- /**
- * Test {@link DFSStripedInputStream#planReadPortions}
- */
- @Test
- public void testPlanReadPortions() {
- /**
- * start block offset is 0, read cellSize - 10
- */
- testPlanReadPortions(0, CELLSIZE - 10, 0,
- new int[]{CELLSIZE - 10, 0, 0}, new int[]{0, 0, 0},
- new int[][]{new int[]{0}, new int[]{}, new int[]{}},
- new int[][]{new int[]{CELLSIZE - 10}, new int[]{}, new int[]{}});
-
- /**
- * start block offset is 0, read 3 * cellSize
- */
- testPlanReadPortions(0, GROUP_SIZE * CELLSIZE, 0,
- new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, new int[]{0, 0, 0},
- new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{CELLSIZE * 2}},
- new int[][]{new int[]{CELLSIZE}, new int[]{CELLSIZE}, new int[]{CELLSIZE}});
-
- /**
- * start block offset is 0, read cellSize + 10
- */
- testPlanReadPortions(0, CELLSIZE + 10, 0,
- new int[]{CELLSIZE, 10, 0}, new int[]{0, 0, 0},
- new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{}},
- new int[][]{new int[]{CELLSIZE}, new int[]{10}, new int[]{}});
-
- /**
- * start block offset is 0, read 5 * cellSize + 10, buffer start offset is 100
- */
- testPlanReadPortions(0, 5 * CELLSIZE + 10, 100,
- new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE + 10}, new int[]{0, 0, 0},
- new int[][]{new int[]{100, 100 + CELLSIZE * GROUP_SIZE},
- new int[]{100 + CELLSIZE, 100 + CELLSIZE * 4},
- new int[]{100 + CELLSIZE * 2, 100 + CELLSIZE * 5}},
- new int[][]{new int[]{CELLSIZE, CELLSIZE},
- new int[]{CELLSIZE, CELLSIZE},
- new int[]{CELLSIZE, 10}});
-
- /**
- * start block offset is 2, read 3 * cellSize
- */
- testPlanReadPortions(2, GROUP_SIZE * CELLSIZE, 100,
- new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
- new int[]{2, 0, 0},
- new int[][]{new int[]{100, 100 + GROUP_SIZE * CELLSIZE - 2},
- new int[]{100 + CELLSIZE - 2},
- new int[]{100 + CELLSIZE * 2 - 2}},
- new int[][]{new int[]{CELLSIZE - 2, 2},
- new int[]{CELLSIZE},
- new int[]{CELLSIZE}});
-
- /**
- * start block offset is 2, read 3 * cellSize + 10
- */
- testPlanReadPortions(2, GROUP_SIZE * CELLSIZE + 10, 0,
- new int[]{CELLSIZE + 10, CELLSIZE, CELLSIZE},
- new int[]{2, 0, 0},
- new int[][]{new int[]{0, GROUP_SIZE * CELLSIZE - 2},
- new int[]{CELLSIZE - 2},
- new int[]{CELLSIZE * 2 - 2}},
- new int[][]{new int[]{CELLSIZE - 2, 12},
- new int[]{CELLSIZE},
- new int[]{CELLSIZE}});
-
- /**
- * start block offset is cellSize * 2 - 1, read 5 * cellSize + 10
- */
- testPlanReadPortions(CELLSIZE * 2 - 1, 5 * CELLSIZE + 10, 0,
- new int[]{CELLSIZE * 2, CELLSIZE + 10, CELLSIZE * 2},
- new int[]{CELLSIZE, CELLSIZE - 1, 0},
- new int[][]{new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1},
- new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1},
- new int[]{1, 3 * CELLSIZE + 1}},
- new int[][]{new int[]{CELLSIZE, CELLSIZE},
- new int[]{1, CELLSIZE, 9},
- new int[]{CELLSIZE, CELLSIZE}});
-
- /**
- * start block offset is cellSize * 6 - 1, read 7 * cellSize + 10
- */
- testPlanReadPortions(CELLSIZE * 6 - 1, 7 * CELLSIZE + 10, 0,
- new int[]{CELLSIZE * 3, CELLSIZE * 2 + 9, CELLSIZE * 2 + 1},
- new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE * 2 - 1},
- new int[][]{new int[]{1, 3 * CELLSIZE + 1, 6 * CELLSIZE + 1},
- new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1, 7 * CELLSIZE + 1},
- new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}},
- new int[][]{new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
- new int[]{CELLSIZE, CELLSIZE, 9},
- new int[]{1, CELLSIZE, CELLSIZE}});
- }
-
private LocatedStripedBlock createDummyLocatedBlock() {
final long blockGroupID = -1048576;
DatanodeInfo[] locs = new DatanodeInfo[TOTAL_SIZE];
[10/12] hadoop git commit: HDFS-7889 Subclass DFSOutputStream to
support writing striping layout files. Contributed by Li Bo
Posted by zh...@apache.org.
HDFS-7889 Subclass DFSOutputStream to support writing striping layout files. Contributed by Li Bo
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0331f20a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0331f20a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0331f20a
Branch: refs/heads/HDFS-7285
Commit: 0331f20aec85312eec1093a5030fb3d94d5cedef
Parents: 58d9f26
Author: Kai Zheng <ka...@intel.com>
Authored: Sat Apr 11 01:03:37 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:20 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 4 +-
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 13 +-
.../java/org/apache/hadoop/hdfs/DFSPacket.java | 26 +-
.../hadoop/hdfs/DFSStripedOutputStream.java | 439 +++++++++++++++++++
.../org/apache/hadoop/hdfs/DataStreamer.java | 12 +-
.../apache/hadoop/hdfs/StripedDataStreamer.java | 241 ++++++++++
.../hadoop/hdfs/TestDFSStripedOutputStream.java | 311 +++++++++++++
7 files changed, 1031 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 1e695c4..753795a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -56,4 +56,6 @@
HDFS-8074. Define a system-wide default EC schema. (Kai Zheng)
- HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
\ No newline at end of file
+ HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
+
+ HDFS-7889. Subclass DFSOutputStream to support writing striping layout files. (Li Bo via Kai Zheng)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 8cde274..8270331 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -268,8 +268,14 @@ public class DFSOutputStream extends FSOutputSummer
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
- final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
- flag, progress, checksum, favoredNodes);
+ final DFSOutputStream out;
+ if(stat.getReplication() == 0) {
+ out = new DFSStripedOutputStream(dfsClient, src, stat,
+ flag, progress, checksum, favoredNodes);
+ } else {
+ out = new DFSOutputStream(dfsClient, src, stat,
+ flag, progress, checksum, favoredNodes);
+ }
out.start();
return out;
} finally {
@@ -347,6 +353,9 @@ public class DFSOutputStream extends FSOutputSummer
String[] favoredNodes) throws IOException {
TraceScope scope =
dfsClient.getPathTraceScope("newStreamForAppend", src);
+ if(stat.getReplication() == 0) {
+ throw new IOException("Not support appending to a striping layout file yet.");
+ }
try {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
progress, lastBlock, stat, checksum);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
index 22055c3..9cd1ec1 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
@@ -113,6 +114,19 @@ class DFSPacket {
dataPos += len;
}
+ synchronized void writeData(ByteBuffer inBuffer, int len)
+ throws ClosedChannelException {
+ checkBuffer();
+ len = len > inBuffer.remaining() ? inBuffer.remaining() : len;
+ if (dataPos + len > buf.length) {
+ throw new BufferOverflowException();
+ }
+ for (int i = 0; i < len; i++) {
+ buf[dataPos + i] = inBuffer.get();
+ }
+ dataPos += len;
+ }
+
/**
* Write checksums to this packet
*
@@ -222,7 +236,7 @@ class DFSPacket {
*
* @return true if the packet is the last packet
*/
- boolean isLastPacketInBlock(){
+ boolean isLastPacketInBlock() {
return lastPacketInBlock;
}
@@ -231,7 +245,7 @@ class DFSPacket {
*
* @return the sequence number of this packet
*/
- long getSeqno(){
+ long getSeqno() {
return seqno;
}
@@ -240,14 +254,14 @@ class DFSPacket {
*
* @return the number of chunks in this packet
*/
- synchronized int getNumChunks(){
+ synchronized int getNumChunks() {
return numChunks;
}
/**
* increase the number of chunks by one
*/
- synchronized void incNumChunks(){
+ synchronized void incNumChunks() {
numChunks++;
}
@@ -256,7 +270,7 @@ class DFSPacket {
*
* @return the maximum number of packets
*/
- int getMaxChunks(){
+ int getMaxChunks() {
return maxChunks;
}
@@ -265,7 +279,7 @@ class DFSPacket {
*
* @param syncBlock if to sync block
*/
- synchronized void setSyncBlock(boolean syncBlock){
+ synchronized void setSyncBlock(boolean syncBlock) {
this.syncBlock = syncBlock;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
new file mode 100644
index 0000000..aded4fe
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+
+/****************************************************************
+ * The DFSStripedOutputStream class supports writing files in striped
+ * layout. Each stripe contains a sequence of cells and multiple
+ * {@link StripedDataStreamer}s in DFSStripedOutputStream are responsible
+ * for writing the cells to different datanodes.
+ *
+ ****************************************************************/
+
+@InterfaceAudience.Private
+public class DFSStripedOutputStream extends DFSOutputStream {
+
+ private final List<StripedDataStreamer> streamers;
+ /**
+ * Size of each striping cell, must be a multiple of bytesPerChecksum
+ */
+ private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private ByteBuffer[] cellBuffers;
+ private final short blockGroupBlocks = HdfsConstants.NUM_DATA_BLOCKS
+ + HdfsConstants.NUM_PARITY_BLOCKS;
+ private final short blockGroupDataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+ private int curIdx = 0;
+ /* bytes written in current block group */
+ private long currentBlockGroupBytes = 0;
+
+ //TODO: Use ErasureCoder interface (HDFS-7781)
+ private RawErasureEncoder encoder;
+
+ private StripedDataStreamer getLeadingStreamer() {
+ return streamers.get(0);
+ }
+
+ private long getBlockGroupSize() {
+ return blockSize * HdfsConstants.NUM_DATA_BLOCKS;
+ }
+
+ /** Construct a new output stream for creating a file. */
+ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
+ EnumSet<CreateFlag> flag, Progressable progress,
+ DataChecksum checksum, String[] favoredNodes)
+ throws IOException {
+ super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
+ DFSClient.LOG.info("Creating striped output stream");
+ if (blockGroupBlocks <= 1) {
+ throw new IOException("The block group must contain more than one block.");
+ }
+
+ cellBuffers = new ByteBuffer[blockGroupBlocks];
+ List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>();
+
+ for (int i = 0; i < blockGroupBlocks; i++) {
+ stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(blockGroupBlocks));
+ try {
+ cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
+ } catch (InterruptedException ie) {
+ final InterruptedIOException iioe = new InterruptedIOException(
+ "create cell buffers");
+ iioe.initCause(ie);
+ throw iioe;
+ }
+ }
+ encoder = new RSRawEncoder();
+ encoder.initialize(blockGroupDataBlocks,
+ blockGroupBlocks - blockGroupDataBlocks, cellSize);
+
+ streamers = new ArrayList<>(blockGroupBlocks);
+ for (short i = 0; i < blockGroupBlocks; i++) {
+ StripedDataStreamer streamer = new StripedDataStreamer(stat, null,
+ dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
+ i, stripeBlocks);
+ if (favoredNodes != null && favoredNodes.length != 0) {
+ streamer.setFavoredNodes(favoredNodes);
+ }
+ streamers.add(streamer);
+ }
+
+ refreshStreamer();
+ }
+
+ private void refreshStreamer() {
+ streamer = streamers.get(curIdx);
+ }
+
+ private void moveToNextStreamer() {
+ curIdx = (curIdx + 1) % blockGroupBlocks;
+ refreshStreamer();
+ }
+
+ /**
+ * encode the buffers.
+ * After encoding, flip each buffer.
+ *
+ * @param buffers data buffers + parity buffers
+ */
+ private void encode(ByteBuffer[] buffers) {
+ ByteBuffer[] dataBuffers = new ByteBuffer[blockGroupDataBlocks];
+ ByteBuffer[] parityBuffers = new ByteBuffer[blockGroupBlocks - blockGroupDataBlocks];
+ for (int i = 0; i < blockGroupBlocks; i++) {
+ if (i < blockGroupDataBlocks) {
+ dataBuffers[i] = buffers[i];
+ } else {
+ parityBuffers[i - blockGroupDataBlocks] = buffers[i];
+ }
+ }
+ encoder.encode(dataBuffers, parityBuffers);
+ }
+
+ /**
+ * Generate packets from a given buffer
+ *
+ * @param byteBuffer the given buffer to generate packets
+ * @return packets generated
+ * @throws IOException
+ */
+ private List<DFSPacket> generatePackets(ByteBuffer byteBuffer)
+ throws IOException{
+ List<DFSPacket> packets = new ArrayList<>();
+ while (byteBuffer.remaining() > 0) {
+ DFSPacket p = createPacket(packetSize, chunksPerPacket,
+ streamer.getBytesCurBlock(),
+ streamer.getAndIncCurrentSeqno(), false);
+ int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum;
+ int toWrite = byteBuffer.remaining() > maxBytesToPacket ?
+ maxBytesToPacket: byteBuffer.remaining();
+ p.writeData(byteBuffer, toWrite);
+ streamer.incBytesCurBlock(toWrite);
+ packets.add(p);
+ }
+ return packets;
+ }
+
+ @Override
+ protected synchronized void writeChunk(byte[] b, int offset, int len,
+ byte[] checksum, int ckoff, int cklen) throws IOException {
+ super.writeChunk(b, offset, len, checksum, ckoff, cklen);
+
+ if (getSizeOfCellnBuffer(curIdx) <= cellSize) {
+ addToCellBuffer(b, offset, len);
+ } else {
+ String msg = "Writing a chunk should not overflow the cell buffer.";
+ DFSClient.LOG.info(msg);
+ throw new IOException(msg);
+ }
+
+
+ // If current packet has not been enqueued for transmission,
+ // but the cell buffer is full, we need to enqueue the packet
+ if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("DFSClient writeChunk cell buffer full seqno=" +
+ currentPacket.getSeqno() +
+ ", curIdx=" + curIdx +
+ ", src=" + src +
+ ", bytesCurBlock=" + streamer.getBytesCurBlock() +
+ ", blockSize=" + blockSize +
+ ", appendChunk=" + streamer.getAppendChunk());
+ }
+ streamer.waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ adjustChunkBoundary();
+ endBlock();
+ }
+
+ // Two extra steps are needed when a striping cell is full:
+ // 1. Forward the current index pointer
+ // 2. Generate parity packets if a full stripe of data cells are present
+ if (getSizeOfCellnBuffer(curIdx) == cellSize) {
+ //move curIdx to next cell
+ moveToNextStreamer();
+ //When all data cells in a stripe are ready, we need to encode
+ //them and generate some parity cells. These cells will be
+ //converted to packets and put to their DataStreamer's queue.
+ if (curIdx == blockGroupDataBlocks) {
+ //encode the data cells
+ for (int k = 0; k < blockGroupDataBlocks; k++) {
+ cellBuffers[k].flip();
+ }
+ encode(cellBuffers);
+ for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
+ ByteBuffer parityBuffer = cellBuffers[i];
+ List<DFSPacket> packets = generatePackets(parityBuffer);
+ for (DFSPacket p : packets) {
+ currentPacket = p;
+ streamer.waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ }
+ endBlock();
+ moveToNextStreamer();
+ }
+ //read next stripe to cellBuffers
+ clearCellBuffers();
+ }
+ }
+ }
+
+ private void addToCellBuffer(byte[] b, int off, int len) {
+ cellBuffers[curIdx].put(b, off, len);
+ }
+
+ private int getSizeOfCellnBuffer(int cellIndex) {
+ return cellBuffers[cellIndex].position();
+ }
+
+ private void clearCellBuffers() {
+ for (int i = 0; i< blockGroupBlocks; i++) {
+ cellBuffers[i].clear();
+ }
+ }
+
+ private int stripeDataSize() {
+ return blockGroupDataBlocks * cellSize;
+ }
+
+ private void notSupported(String headMsg)
+ throws IOException{
+ throw new IOException(
+ headMsg + " is now not supported for striping layout.");
+ }
+
+ @Override
+ public void hflush() throws IOException {
+ notSupported("hflush");
+ }
+
+ @Override
+ public void hsync() throws IOException {
+ notSupported("hsync");
+ }
+
+
+ @Override
+ protected synchronized void start() {
+ for (StripedDataStreamer streamer : streamers) {
+ streamer.start();
+ }
+ }
+
+ @Override
+ synchronized void abort() throws IOException {
+ if (isClosed()) {
+ return;
+ }
+ for (StripedDataStreamer streamer : streamers) {
+ streamer.setLastException(new IOException("Lease timeout of "
+ + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
+ }
+ closeThreads(true);
+ dfsClient.endFileLease(fileId);
+ }
+
+ //TODO: Handle slow writers (HDFS-7786)
+ //Cuurently only check if the leading streamer is terminated
+ boolean isClosed() {
+ return closed || getLeadingStreamer().streamerClosed();
+ }
+
+ // shutdown datastreamer and responseprocessor threads.
+ // interrupt datastreamer if force is true
+ @Override
+ protected void closeThreads(boolean force) throws IOException {
+ StripedDataStreamer leadingStreamer = null;
+ for (StripedDataStreamer streamer : streamers) {
+ try {
+ streamer.close(force);
+ streamer.join();
+ streamer.closeSocket();
+ if (streamer.isLeadingStreamer()) {
+ leadingStreamer = streamer;
+ } else {
+ streamer.countTailingBlockGroupBytes();
+ }
+
+ } catch (InterruptedException e) {
+ throw new IOException("Failed to shutdown streamer");
+ } finally {
+ streamer.setSocketToNull();
+ setClosed();
+ }
+ }
+ leadingStreamer.countTailingBlockGroupBytes();
+ }
+
+ @Override
+ public synchronized void write(int b) throws IOException {
+ super.write(b);
+ currentBlockGroupBytes = (currentBlockGroupBytes + 1) % getBlockGroupSize();
+ }
+
+ @Override
+ public synchronized void write(byte b[], int off, int len)
+ throws IOException {
+ super.write(b, off, len);
+ currentBlockGroupBytes = (currentBlockGroupBytes + len) % getBlockGroupSize();
+ }
+
+ private void writeParityCellsForLastStripe() throws IOException{
+ if(currentBlockGroupBytes == 0 ||
+ currentBlockGroupBytes % stripeDataSize() == 0)
+ return;
+ int lastStripeLen =(int)(currentBlockGroupBytes % stripeDataSize());
+ // Size of parity cells should equal the size of the first cell, if it
+ // is not full.
+ int parityCellSize = cellSize;
+ int index = lastStripeLen / cellSize;
+ if (lastStripeLen < cellSize) {
+ parityCellSize = lastStripeLen;
+ index++;
+ }
+ for (int i = 0; i < blockGroupBlocks; i++) {
+ if (i >= index) {
+ int position = cellBuffers[i].position();
+ for (int j = 0; j < parityCellSize - position; j++) {
+ cellBuffers[i].put((byte)0);
+ }
+ }
+ cellBuffers[i].flip();
+ }
+ encode(cellBuffers);
+
+ //write parity cells
+ curIdx = blockGroupDataBlocks;
+ refreshStreamer();
+ for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
+ ByteBuffer parityBuffer = cellBuffers[i];
+ List<DFSPacket> packets = generatePackets(parityBuffer);
+ for (DFSPacket p : packets) {
+ currentPacket = p;
+ streamer.waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ }
+ endBlock();
+ moveToNextStreamer();
+ }
+
+ clearCellBuffers();
+ }
+
+ @Override
+ void setClosed() {
+ super.setClosed();
+ for (int i = 0; i < blockGroupBlocks; i++) {
+ byteArrayManager.release(cellBuffers[i].array());
+ streamers.get(i).release();
+ }
+ }
+
+ @Override
+ protected synchronized void closeImpl() throws IOException {
+ if (isClosed()) {
+ IOException e = getLeadingStreamer().getLastException().getAndSet(null);
+ if (e == null)
+ return;
+ else
+ throw e;
+ }
+
+ try {
+ // flush from all upper layers
+ flushBuffer();
+ if (currentPacket != null) {
+ streamer.waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ }
+ //if the last stripe is incomplete, generate and write parity cells
+ writeParityCellsForLastStripe();
+
+ for (int i = 0; i < blockGroupBlocks; i++) {
+ curIdx = i;
+ refreshStreamer();
+ if (streamer.getBytesCurBlock()!= 0 ||
+ currentBlockGroupBytes < getBlockGroupSize()) {
+ // send an empty packet to mark the end of the block
+ currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
+ streamer.getAndIncCurrentSeqno(), true);
+ currentPacket.setSyncBlock(shouldSyncBlock);
+ }
+ // flush all data to Datanode
+ flushInternal();
+ }
+
+ // get last block before destroying the streamer
+ ExtendedBlock lastBlock = streamers.get(0).getBlock();
+ closeThreads(false);
+ TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
+ try {
+ completeFile(lastBlock);
+ } finally {
+ scope.close();
+ }
+ dfsClient.endFileLease(fileId);
+ } catch (ClosedChannelException e) {
+ } finally {
+ setClosed();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index d53e73f..1b99d60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -169,7 +169,7 @@ class DataStreamer extends Daemon {
}
private volatile boolean streamerClosed = false;
- private ExtendedBlock block; // its length is number of bytes acked
+ protected ExtendedBlock block; // its length is number of bytes acked
private Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
@@ -177,7 +177,7 @@ class DataStreamer extends Daemon {
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
private volatile StorageType[] storageTypes = null;
private volatile String[] storageIDs = null;
- private String[] favoredNodes;
+ protected String[] favoredNodes;
volatile boolean hasError = false;
volatile int errorIndex = -1;
// Restarting node index
@@ -204,12 +204,12 @@ class DataStreamer extends Daemon {
private final AtomicReference<IOException> lastException = new AtomicReference<>();
private Socket s;
- private final DFSClient dfsClient;
- private final String src;
+ protected final DFSClient dfsClient;
+ protected final String src;
/** Only for DataTransferProtocol.writeBlock(..) */
private final DataChecksum checksum4WriteBlock;
private final Progressable progress;
- private final HdfsFileStatus stat;
+ protected final HdfsFileStatus stat;
// appending to existing partial block
private volatile boolean appendChunk = false;
// both dataQueue and ackQueue are protected by dataQueue lock
@@ -332,7 +332,7 @@ class DataStreamer extends Daemon {
stage = BlockConstructionStage.DATA_STREAMING;
}
- private void endBlock() {
+ protected void endBlock() {
if(DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Closing old block " + block);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
new file mode 100644
index 0000000..710d92d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.List;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/****************************************************************************
+ * The StripedDataStreamer class is used by {@link DFSStripedOutputStream}.
+ * There are two kinds of StripedDataStreamer, leading streamer and ordinary
+ * stream. Leading streamer requests a block group from NameNode, unwraps
+ * it to located blocks and transfers each located block to its corresponding
+ * ordinary streamer via a blocking queue.
+ *
+ ****************************************************************************/
+public class StripedDataStreamer extends DataStreamer {
+ private final short index;
+ private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
+ private static short blockGroupSize = HdfsConstants.NUM_DATA_BLOCKS
+ + HdfsConstants.NUM_PARITY_BLOCKS;
+ private boolean hasCommittedBlock = false;
+
+ StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block,
+ DFSClient dfsClient, String src,
+ Progressable progress, DataChecksum checksum,
+ AtomicReference<CachingStrategy> cachingStrategy,
+ ByteArrayManager byteArrayManage, short index,
+ List<BlockingQueue<LocatedBlock>> stripedBlocks) {
+ super(stat,block, dfsClient, src, progress, checksum, cachingStrategy,
+ byteArrayManage);
+ this.index = index;
+ this.stripedBlocks = stripedBlocks;
+ }
+
+ /**
+ * Construct a data streamer for appending to the last partial block
+ * @param lastBlock last block of the file to be appended
+ * @param stat status of the file to be appended
+ * @throws IOException if error occurs
+ */
+ StripedDataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
+ DFSClient dfsClient, String src,
+ Progressable progress, DataChecksum checksum,
+ AtomicReference<CachingStrategy> cachingStrategy,
+ ByteArrayManager byteArrayManage, short index,
+ List<BlockingQueue<LocatedBlock>> stripedBlocks)
+ throws IOException {
+ super(lastBlock, stat, dfsClient, src, progress, checksum, cachingStrategy,
+ byteArrayManage);
+ this.index = index;
+ this.stripedBlocks = stripedBlocks;
+ }
+
+ public boolean isLeadingStreamer () {
+ return index == 0;
+ }
+
+ private boolean isParityStreamer() {
+ return index >= HdfsConstants.NUM_DATA_BLOCKS;
+ }
+
+ @Override
+ protected void endBlock() {
+ if (!isLeadingStreamer() && !isParityStreamer()) {
+ //before retrieving a new block, transfer the finished block to
+ //leading streamer
+ LocatedBlock finishedBlock = new LocatedBlock(
+ new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(),
+ block.getNumBytes(),block.getGenerationStamp()), null);
+ try{
+ boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
+ TimeUnit.SECONDS);
+ }catch (InterruptedException ie) {
+ //TODO: Handle InterruptedException (HDFS-7786)
+ }
+ }
+ super.endBlock();
+ }
+
+ /**
+ * This function is called after the streamer is closed.
+ */
+ void countTailingBlockGroupBytes () throws IOException {
+ if (isLeadingStreamer()) {
+ //when committing a block group, leading streamer has to adjust
+ // {@link block} including the size of block group
+ for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
+ try {
+ LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
+ TimeUnit.SECONDS);
+ if (finishedLocatedBlock == null) {
+ throw new IOException("Fail to get finished LocatedBlock " +
+ "from streamer, i=" + i);
+ }
+ ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
+ long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
+ if (block != null) {
+ block.setNumBytes(block.getNumBytes() + bytes);
+ }
+ } catch (InterruptedException ie) {
+ DFSClient.LOG.info("InterruptedException received when " +
+ "putting a block to stripeBlocks, ie = " + ie);
+ }
+ }
+ } else if (!isParityStreamer()) {
+ if (block == null || block.getNumBytes() == 0) {
+ LocatedBlock finishedBlock = new LocatedBlock(null, null);
+ try {
+ boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
+ TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ //TODO: Handle InterruptedException (HDFS-7786)
+ ie.printStackTrace();
+ }
+ }
+ }
+
+ }
+
+ @Override
+ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
+ throws IOException {
+ LocatedBlock lb = null;
+ if (isLeadingStreamer()) {
+ if(hasCommittedBlock) {
+ //when committing a block group, leading streamer has to adjust
+ // {@link block} including the size of block group
+ for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
+ try {
+ LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
+ TimeUnit.SECONDS);
+ if (finishedLocatedBlock == null) {
+ throw new IOException("Fail to get finished LocatedBlock " +
+ "from streamer, i=" + i);
+ }
+ ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
+ long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
+ if(block != null) {
+ block.setNumBytes(block.getNumBytes() + bytes);
+ }
+ } catch (InterruptedException ie) {
+ DFSClient.LOG.info("InterruptedException received when putting" +
+ " a block to stripeBlocks, ie = " + ie);
+ }
+ }
+ }
+
+ lb = super.locateFollowingBlock(excludedNodes);
+ hasCommittedBlock = true;
+ LocatedBlock[] blocks = unwrapBlockGroup(lb);
+ assert blocks.length == blockGroupSize :
+ "Fail to get block group from namenode: blockGroupSize: " +
+ blockGroupSize + ", blocks.length: " + blocks.length;
+ lb = blocks[0];
+ for (int i = 1; i < blocks.length; i++) {
+ try {
+ boolean offSuccess = stripedBlocks.get(i).offer(blocks[i],
+ 90, TimeUnit.SECONDS);
+ if(!offSuccess){
+ String msg = "Fail to put block to stripeBlocks. i = " + i;
+ DFSClient.LOG.info(msg);
+ throw new IOException(msg);
+ } else {
+ DFSClient.LOG.info("Allocate a new block to a streamer. i = " + i
+ + ", block: " + blocks[i]);
+ }
+ } catch (InterruptedException ie) {
+ DFSClient.LOG.info("InterruptedException received when putting" +
+ " a block to stripeBlocks, ie = " + ie);
+ }
+ }
+ } else {
+ try {
+ //wait 90 seconds to get a block from the queue
+ lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ DFSClient.LOG.info("InterruptedException received when retrieving " +
+ "a block from stripeBlocks, ie = " + ie);
+ }
+ }
+ return lb;
+ }
+
+ /**
+ * Generate other blocks in a block group according to the first one.
+ *
+ * @param firstBlockInGroup the first block in a block group
+ * @return other blocks in this group
+ */
+ public static LocatedBlock[] unwrapBlockGroup(
+ final LocatedBlock firstBlockInGroup) {
+ ExtendedBlock eb = firstBlockInGroup.getBlock();
+ DatanodeInfo[] locs = firstBlockInGroup.getLocations();
+ String[] storageIDs = firstBlockInGroup.getStorageIDs();
+ StorageType[] storageTypes = firstBlockInGroup.getStorageTypes();
+ Token<BlockTokenIdentifier> blockToken = firstBlockInGroup.getBlockToken();
+ LocatedBlock[] blocksInGroup = new LocatedBlock[locs.length];
+ for (int i = 0; i < blocksInGroup.length; i++) {
+ //each block in a group has the same number of bytes and timestamp
+ ExtendedBlock extendedBlock = new ExtendedBlock(eb.getBlockPoolId(),
+ eb.getBlockId() + i, eb.getNumBytes(), eb.getGenerationStamp());
+ blocksInGroup[i] = new LocatedBlock(extendedBlock,
+ new DatanodeInfo[] {locs[i]}, new String[]{storageIDs[i]},
+ new StorageType[] {storageTypes[i]});
+ blocksInGroup[i].setBlockToken(blockToken);
+ }
+ return blocksInGroup;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
new file mode 100644
index 0000000..f5a37f3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -0,0 +1,311 @@
+package org.apache.hadoop.hdfs;
+
+import java.util.Arrays;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestDFSStripedOutputStream {
+ private int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+ private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+
+ private MiniDFSCluster cluster;
+ private Configuration conf = new Configuration();
+ private DistributedFileSystem fs;
+ int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ int blockSize = 8 * 1024 * 1024;
+ int cellsInBlock = blockSize / cellSize;
+ private int mod = 29;
+
+ @Before
+ public void setup() throws IOException {
+ int numDNs = dataBlocks + parityBlocks + 2;
+ Configuration conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, cellsInBlock * cellSize);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+ cluster.getFileSystem().getClient().createErasureCodingZone("/");
+ fs = cluster.getFileSystem();
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void TestFileEmpty() throws IOException {
+ testOneFile("/EmptyFile", 0);
+ }
+
+ @Test
+ public void TestFileSmallerThanOneCell1() throws IOException {
+ testOneFile("/SmallerThanOneCell", 1);
+ }
+
+ @Test
+ public void TestFileSmallerThanOneCell2() throws IOException {
+ testOneFile("/SmallerThanOneCell", cellSize - 1);
+ }
+
+ @Test
+ public void TestFileEqualsWithOneCell() throws IOException {
+ testOneFile("/EqualsWithOneCell", cellSize);
+ }
+
+ @Test
+ public void TestFileSmallerThanOneStripe1() throws IOException {
+ testOneFile("/SmallerThanOneStripe", cellSize * dataBlocks - 1);
+ }
+
+ @Test
+ public void TestFileSmallerThanOneStripe2() throws IOException {
+ testOneFile("/SmallerThanOneStripe", cellSize + 123);
+ }
+
+ @Test
+ public void TestFileEqualsWithOneStripe() throws IOException {
+ testOneFile("/EqualsWithOneStripe", cellSize * dataBlocks);
+ }
+
+ @Test
+ public void TestFileMoreThanOneStripe1() throws IOException {
+ testOneFile("/MoreThanOneStripe1", cellSize * dataBlocks + 123);
+ }
+
+ @Test
+ public void TestFileMoreThanOneStripe2() throws IOException {
+ testOneFile("/MoreThanOneStripe2",
+ cellSize * dataBlocks * (cellsInBlock >= 2 ? cellsInBlock / 2 : 1)
+ + cellSize * dataBlocks + 123);
+ }
+
+ @Test
+ public void TestFileFullBlockGroup() throws IOException {
+ testOneFile("/FullBlockGroup", blockSize * dataBlocks);
+ }
+
+ //TODO: The following tests will pass after HDFS-8121 fixed
+// @Test
+ public void TestFileMoreThanABlockGroup1() throws IOException {
+ testOneFile("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
+ }
+
+ // @Test
+ public void TestFileMoreThanABlockGroup2() throws IOException {
+ testOneFile("/MoreThanABlockGroup2",
+ blockSize * dataBlocks * 3
+ + (cellsInBlock >= 2 ? cellsInBlock / 2 : 1) * cellSize * dataBlocks
+ + 123);
+ }
+
+ private int stripeDataSize() {
+ return cellSize * dataBlocks;
+ }
+
+ private byte[] generateBytes(int cnt) {
+ byte[] bytes = new byte[cnt];
+ for (int i = 0; i < cnt; i++) {
+ bytes[i] = getByte(i);
+ }
+ return bytes;
+ }
+
+ private byte getByte(long pos) {
+ return (byte) (pos % mod + 1);
+ }
+
+ private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
+ throws IOException {
+ Path TestPath = new Path(src);
+ byte[] bytes = generateBytes(writeBytes);
+ DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+
+ //check file length
+ FileStatus status = fs.getFileStatus(TestPath);
+ long fileLength = status.getLen();
+ if (fileLength != writeBytes) {
+ Assert.fail("File Length error: expect=" + writeBytes
+ + ", actual=" + fileLength);
+ }
+
+ DFSStripedInputStream dis = new DFSStripedInputStream(
+ fs.getClient(), src, true);
+ byte[] buf = new byte[writeBytes + 100];
+ int readLen = dis.read(0, buf, 0, buf.length);
+ readLen = readLen >= 0 ? readLen : 0;
+ if (readLen != writeBytes) {
+ Assert.fail("The length of file is not correct.");
+ }
+
+ for (int i = 0; i < writeBytes; i++) {
+ if (getByte(i) != buf[i]) {
+ Assert.fail("Byte at i = " + i + " is wrongly written.");
+ }
+ }
+ }
+
+ private void testOneFile(String src, int writeBytes)
+ throws IOException {
+ Path TestPath = new Path(src);
+
+ int allBlocks = dataBlocks + parityBlocks;
+ byte[] bytes = generateBytes(writeBytes);
+ DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+
+ //check file length
+ FileStatus status = fs.getFileStatus(TestPath);
+ long fileLength = status.getLen();
+ if (fileLength != writeBytes) {
+ Assert.fail("File Length error: expect=" + writeBytes
+ + ", actual=" + fileLength);
+ }
+
+ List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
+ LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
+
+ for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
+ LocatedBlock[] blocks = StripedDataStreamer.unwrapBlockGroup(firstBlock);
+ List<LocatedBlock> oneGroup = Arrays.asList(blocks);
+ blockGroupList.add(oneGroup);
+ }
+
+ //test each block group
+ for (int group = 0; group < blockGroupList.size(); group++) {
+ //get the data of this block
+ List<LocatedBlock> blockList = blockGroupList.get(group);
+ byte[][] dataBlockBytes = new byte[dataBlocks][];
+ byte[][] parityBlockBytes = new byte[allBlocks - dataBlocks][];
+
+ //calculate the size of this block group
+ int lenOfBlockGroup = group < blockGroupList.size() - 1 ?
+ blockSize * dataBlocks :
+ writeBytes - blockSize * (blockGroupList.size() - 1) * dataBlocks;
+ int intactStripes = lenOfBlockGroup / stripeDataSize();
+ int lastStripeLen = lenOfBlockGroup % stripeDataSize();
+
+ //for each block, use BlockReader to read data
+ for (int i = 0; i < blockList.size(); i++) {
+ LocatedBlock lblock = blockList.get(i);
+ if (lblock == null) {
+ continue;
+ }
+ DatanodeInfo[] nodes = lblock.getLocations();
+ ExtendedBlock block = lblock.getBlock();
+ InetSocketAddress targetAddr = NetUtils.createSocketAddr(
+ nodes[0].getXferAddr());
+
+ int lenOfCell = cellSize;
+ if (i == lastStripeLen / cellSize) {
+ lenOfCell = lastStripeLen % cellSize;
+ } else if (i > lastStripeLen / cellSize) {
+ lenOfCell = 0;
+ }
+ int lenOfBlock = cellSize * intactStripes + lenOfCell;
+ byte[] blockBytes = new byte[lenOfBlock];
+ if (i < dataBlocks) {
+ dataBlockBytes[i] = blockBytes;
+ } else {
+ parityBlockBytes[i - dataBlocks] = blockBytes;
+ }
+
+ if (lenOfBlock == 0) {
+ continue;
+ }
+
+ block.setNumBytes(lenOfBlock);
+ BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
+ setFileName(src).
+ setBlock(block).
+ setBlockToken(lblock.getBlockToken()).
+ setInetSocketAddress(targetAddr).
+ setStartOffset(0).
+ setLength(block.getNumBytes()).
+ setVerifyChecksum(true).
+ setClientName("TestStripeLayoutWrite").
+ setDatanodeInfo(nodes[0]).
+ setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+ setClientCacheContext(ClientContext.getFromConf(conf)).
+ setConfiguration(conf).
+ setRemotePeerFactory(new RemotePeerFactory() {
+ @Override
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken,
+ DatanodeID datanodeId)
+ throws IOException {
+ Peer peer = null;
+ Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+ try {
+ sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+ sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+ peer = TcpPeerServer.peerFromSocket(sock);
+ } finally {
+ if (peer == null) {
+ IOUtils.closeSocket(sock);
+ }
+ }
+ return peer;
+ }
+ }).build();
+
+ blockReader.readAll(blockBytes, 0, lenOfBlock);
+ blockReader.close();
+ }
+
+ //check if we write the data correctly
+ for (int i = 0; i < dataBlockBytes.length; i++) {
+ byte[] cells = dataBlockBytes[i];
+ if (cells == null) {
+ continue;
+ }
+ for (int j = 0; j < cells.length; j++) {
+ byte expected;
+ //calculate the postion of this byte in the file
+ long pos = group * dataBlocks * blockSize
+ + (i * cellSize + j / cellSize * cellSize * dataBlocks)
+ + j % cellSize;
+ if (pos >= writeBytes) {
+ expected = 0;
+ } else {
+ expected = getByte(pos);
+ }
+
+ if (expected != cells[j]) {
+ Assert.fail("Unexpected byte " + cells[j] + ", expect " + expected
+ + ". Block group index is " + group +
+ ", stripe index is " + j / cellSize +
+ ", cell index is " + i + ", byte index is " + j % cellSize);
+ }
+ }
+ }
+ }
+ }
+
+}
[02/12] hadoop git commit: HDFS-8023. Erasure Coding: retrieve eraure
coding schema for a file from NameNode (Contributed by Vinayakumar B) Added
missed file
Posted by zh...@apache.org.
HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from NameNode (Contributed by Vinayakumar B)
Added missed file
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/75b32770
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/75b32770
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/75b32770
Branch: refs/heads/HDFS-7285
Commit: 75b32770fbc25f1c41dc7212966511ad5d7a481a
Parents: c11a02c
Author: Vinayakumar B <vi...@apache.org>
Authored: Wed Apr 8 14:23:03 2015 +0530
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:18 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/protocol/ECInfo.java | 41 ++++++++++++++++++++
1 file changed, 41 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b32770/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ECInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ECInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ECInfo.java
new file mode 100644
index 0000000..ca642c2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ECInfo.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.io.erasurecode.ECSchema;
+
+/**
+ * Class to provide information, such as ECSchema, for a file/block.
+ */
+public class ECInfo {
+ private final String src;
+ private final ECSchema schema;
+
+ public ECInfo(String src, ECSchema schema) {
+ this.src = src;
+ this.schema = schema;
+ }
+
+ public String getSrc() {
+ return src;
+ }
+
+ public ECSchema getSchema() {
+ return schema;
+ }
+}
[07/12] hadoop git commit: HDFS-8074 Define a system-wide default EC
schema. Contributed by Kai Zheng
Posted by zh...@apache.org.
HDFS-8074 Define a system-wide default EC schema. Contributed by Kai Zheng
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cc11b5c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cc11b5c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cc11b5c2
Branch: refs/heads/HDFS-7285
Commit: cc11b5c2230ef10023066ed6c006fcd46af328bb
Parents: 75b3277
Author: Kai Zheng <ka...@intel.com>
Authored: Thu Apr 9 01:30:02 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:19 2015 -0700
----------------------------------------------------------------------
.../src/main/conf/ecschema-def.xml | 5 --
.../apache/hadoop/io/erasurecode/ECSchema.java | 57 +++++++++++++++++-
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 4 +-
.../hdfs/server/namenode/ECSchemaManager.java | 62 ++++++++++++++++++++
4 files changed, 120 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc11b5c2/hadoop-common-project/hadoop-common/src/main/conf/ecschema-def.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/conf/ecschema-def.xml b/hadoop-common-project/hadoop-common/src/main/conf/ecschema-def.xml
index e619485..e36d386 100644
--- a/hadoop-common-project/hadoop-common/src/main/conf/ecschema-def.xml
+++ b/hadoop-common-project/hadoop-common/src/main/conf/ecschema-def.xml
@@ -27,11 +27,6 @@ You can modify and remove those not used yet, or add new ones.
-->
<schemas>
- <schema name="RS-6-3">
- <k>6</k>
- <m>3</m>
- <codec>RS</codec>
- </schema>
<schema name="RS-10-4">
<k>10</k>
<m>4</m>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc11b5c2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
index 27be00e..8c3310e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
@@ -23,12 +23,12 @@ import java.util.Map;
/**
* Erasure coding schema to housekeeper relevant information.
*/
-public class ECSchema {
+public final class ECSchema {
public static final String NUM_DATA_UNITS_KEY = "k";
public static final String NUM_PARITY_UNITS_KEY = "m";
public static final String CODEC_NAME_KEY = "codec";
public static final String CHUNK_SIZE_KEY = "chunkSize";
- public static final int DEFAULT_CHUNK_SIZE = 64 * 1024; // 64K
+ public static final int DEFAULT_CHUNK_SIZE = 256 * 1024; // 256K
private String schemaName;
private String codecName;
@@ -82,6 +82,18 @@ public class ECSchema {
}
/**
+ * Constructor with key parameters provided.
+ * @param schemaName
+ * @param codecName
+ * @param numDataUnits
+ * @param numParityUnits
+ */
+ public ECSchema(String schemaName, String codecName,
+ int numDataUnits, int numParityUnits) {
+ this(schemaName, codecName, numDataUnits, numParityUnits, null);
+ }
+
+ /**
* Constructor with key parameters provided. Note the options may contain
* additional information for the erasure codec to interpret further.
* @param schemaName
@@ -200,4 +212,45 @@ public class ECSchema {
return sb.toString();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ECSchema ecSchema = (ECSchema) o;
+
+ if (numDataUnits != ecSchema.numDataUnits) {
+ return false;
+ }
+ if (numParityUnits != ecSchema.numParityUnits) {
+ return false;
+ }
+ if (chunkSize != ecSchema.chunkSize) {
+ return false;
+ }
+ if (!schemaName.equals(ecSchema.schemaName)) {
+ return false;
+ }
+ if (!codecName.equals(ecSchema.codecName)) {
+ return false;
+ }
+ return options.equals(ecSchema.options);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = schemaName.hashCode();
+ result = 31 * result + codecName.hashCode();
+ result = 31 * result + options.hashCode();
+ result = 31 * result + numDataUnits;
+ result = 31 * result + numParityUnits;
+ result = 31 * result + chunkSize;
+
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc11b5c2/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 7423033..5078a15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -52,4 +52,6 @@
manage EC zones (Zhe Zhang)
HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from
- NameNode (vinayakumarb)
\ No newline at end of file
+ NameNode (vinayakumarb)
+
+ HDFS-8074. Define a system-wide default EC schema. (Kai Zheng)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc11b5c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ECSchemaManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ECSchemaManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ECSchemaManager.java
new file mode 100644
index 0000000..b001c57
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ECSchemaManager.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+
+/**
+ * This manages EC schemas predefined and activated in the system. It loads from
+ * predefined ones in XML and syncs with persisted ones in NameNode image.
+ *
+ * This class is instantiated by the FSNamesystem.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+public final class ECSchemaManager {
+
+ private static final int DEFAULT_DATA_BLOCKS = 6;
+ private static final int DEFAULT_PARITY_BLOCKS = 3;
+ private static final String DEFAULT_CODEC_NAME = "rs";
+ private static final String DEFAULT_SCHEMA_NAME = "SYS-DEFAULT-RS-6-3";
+
+ private static ECSchema SYS_DEFAULT_SCHEMA = new ECSchema(DEFAULT_SCHEMA_NAME,
+ DEFAULT_CODEC_NAME, DEFAULT_DATA_BLOCKS, DEFAULT_PARITY_BLOCKS);
+
+ /**
+ * Get system-wide default EC schema, which can be used by default when no
+ * schema is specified for an EC zone.
+ * @return schema
+ */
+ public static ECSchema getSystemDefaultSchema() {
+ return SYS_DEFAULT_SCHEMA;
+ }
+
+ /**
+ * Tell the specified schema is the system default one or not.
+ * @param schema
+ * @return true if it's the default false otherwise
+ */
+ public static boolean isSystemDefault(ECSchema schema) {
+ if (schema == null) {
+ throw new IllegalArgumentException("Invalid schema parameter");
+ }
+
+ // schema name is the identifier, but for safety we check all properties.
+ return SYS_DEFAULT_SCHEMA.equals(schema);
+ }
+}
[03/12] hadoop git commit: HDFS-8023. Erasure Coding: retrieve eraure
coding schema for a file from NameNode (Contributed by Vinayakumar B)
Posted by zh...@apache.org.
HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from NameNode (Contributed by Vinayakumar B)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c11a02c5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c11a02c5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c11a02c5
Branch: refs/heads/HDFS-7285
Commit: c11a02c5f96bfa9db36abc63b19716e3da7d0735
Parents: ae8f0c1
Author: Vinayakumar B <vi...@apache.org>
Authored: Wed Apr 8 12:48:59 2015 +0530
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:18 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 5 ++-
.../java/org/apache/hadoop/hdfs/DFSClient.java | 14 ++++++
.../hadoop/hdfs/protocol/ClientProtocol.java | 10 +++++
...tNamenodeProtocolServerSideTranslatorPB.java | 19 ++++++++
.../ClientNamenodeProtocolTranslatorPB.java | 18 ++++++++
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 47 +++++++++++++++++++-
.../hdfs/server/namenode/FSNamesystem.java | 31 +++++++++++++
.../hdfs/server/namenode/NameNodeRpcServer.java | 7 +++
.../src/main/proto/ClientNamenodeProtocol.proto | 10 +++++
.../hadoop-hdfs/src/main/proto/hdfs.proto | 28 ++++++++++++
.../hadoop/hdfs/TestErasureCodingZones.java | 38 +++++++++++++++-
11 files changed, 223 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 9927ccf..7423033 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -49,4 +49,7 @@
(Hui Zheng via Zhe Zhang)
HDFS-7839. Erasure coding: implement facilities in NameNode to create and
- manage EC zones (Zhe Zhang)
\ No newline at end of file
+ manage EC zones (Zhe Zhang)
+
+ HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from
+ NameNode (vinayakumarb)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 2ef1d36..b2a69dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -117,6 +117,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -3091,6 +3092,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
+ public ECInfo getErasureCodingInfo(String src) throws IOException {
+ checkOpen();
+ TraceScope scope = getPathTraceScope("getErasureCodingInfo", src);
+ try {
+ return namenode.getErasureCodingInfo(src);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class, UnresolvedPathException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
return new DFSInotifyEventInputStream(traceSampler, namenode);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 8efe344..45d92f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -1464,4 +1464,14 @@ public interface ClientProtocol {
*/
@Idempotent
public EventBatchList getEditsFromTxid(long txid) throws IOException;
+
+ /**
+ * Gets the ECInfo for the specified file/directory
+ *
+ * @param src
+ * @return Returns the ECInfo if the file/directory is erasure coded, null otherwise
+ * @throws IOException
+ */
+ @Idempotent
+ public ECInfo getErasureCodingInfo(String src) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index a209539..1493e52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@@ -107,6 +108,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDat
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
@@ -1511,4 +1514,20 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
throw new ServiceException(e);
}
}
+
+ @Override
+ public GetErasureCodingInfoResponseProto getErasureCodingInfo(RpcController controller,
+ GetErasureCodingInfoRequestProto request) throws ServiceException {
+ try {
+ ECInfo ecInfo = server.getErasureCodingInfo(request.getSrc());
+ GetErasureCodingInfoResponseProto.Builder resBuilder = GetErasureCodingInfoResponseProto
+ .newBuilder();
+ if (ecInfo != null) {
+ resBuilder.setECInfo(PBHelper.convertECInfo(ecInfo));
+ }
+ return resBuilder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 43a0322..568da68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@@ -107,6 +108,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDat
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
@@ -1532,4 +1535,19 @@ public class ClientNamenodeProtocolTranslatorPB implements
throw ProtobufHelper.getRemoteException(e);
}
}
+
+ @Override
+ public ECInfo getErasureCodingInfo(String src) throws IOException {
+ GetErasureCodingInfoRequestProto req = GetErasureCodingInfoRequestProto.newBuilder()
+ .setSrc(src).build();
+ try {
+ GetErasureCodingInfoResponseProto res = rpcProxy.getErasureCodingInfo(null, req);
+ if (res.hasECInfo()) {
+ return PBHelper.convertECInfo(res.getECInfo());
+ }
+ return null;
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index cda708f..afdb302 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -29,7 +29,11 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
@@ -76,6 +80,7 @@ import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -112,7 +117,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rollin
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
@@ -146,6 +150,9 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeLocalInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeStorageProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeStorageProto.StorageState;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FsPermissionProto;
@@ -228,6 +235,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -3097,4 +3105,41 @@ public class PBHelper {
setId(context.getReportId()).
build();
}
+
+ public static ECInfo convertECInfo(ECInfoProto ecInfoProto) {
+ return new ECInfo(ecInfoProto.getSrc(),
+ convertECSchema(ecInfoProto.getSchema()));
+ }
+
+ public static ECInfoProto convertECInfo(ECInfo ecInfo) {
+ return ECInfoProto.newBuilder().setSrc(ecInfo.getSrc())
+ .setSchema(convertECSchema(ecInfo.getSchema())).build();
+ }
+
+ public static ECSchema convertECSchema(ECSchemaProto schema) {
+ List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList();
+ Map<String, String> options = new HashMap<>(optionsList.size());
+ for (ECSchemaOptionEntryProto option : optionsList) {
+ options.put(option.getKey(), option.getValue());
+ }
+ // include chunksize in options.
+ options.put(ECSchema.CHUNK_SIZE_KEY, String.valueOf(schema.getChunkSize()));
+ return new ECSchema(schema.getSchemaName(), schema.getCodecName(),
+ schema.getDataUnits(), schema.getParityUnits(), options);
+ }
+
+ public static ECSchemaProto convertECSchema(ECSchema schema) {
+ ECSchemaProto.Builder builder = ECSchemaProto.newBuilder()
+ .setSchemaName(schema.getSchemaName())
+ .setCodecName(schema.getCodecName())
+ .setDataUnits(schema.getNumDataUnits())
+ .setParityUnits(schema.getNumParityUnits())
+ .setChunkSize(schema.getChunkSize());
+ Set<Entry<String, String>> entrySet = schema.getOptions().entrySet();
+ for (Entry<String, String> entry : entrySet) {
+ builder.addOptions(ECSchemaOptionEntryProto.newBuilder()
+ .setKey(entry.getKey()).setValue(entry.getValue()).build());
+ }
+ return builder.build();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 79d6f21..4c86bd3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -180,6 +180,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -261,6 +262,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.RetryCache;
import org.apache.hadoop.ipc.Server;
@@ -8138,6 +8140,35 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
logAuditEvent(true, "createErasureCodingZone", srcArg, null, resultingStat);
}
+ /**
+ * Get the erasure coding information for specified src
+ */
+ ECInfo getErasureCodingInfo(String src) throws AccessControlException,
+ UnresolvedLinkException, IOException {
+ checkOperation(OperationCategory.READ);
+ final byte[][] pathComponents = FSDirectory
+ .getPathComponentsForReservedPath(src);
+ final FSPermissionChecker pc = getPermissionChecker();
+ readLock();
+ try {
+ checkOperation(OperationCategory.READ);
+ src = dir.resolvePath(pc, src, pathComponents);
+ final INodesInPath iip = dir.getINodesInPath(src, true);
+ if (isPermissionEnabled) {
+ dir.checkPathAccess(pc, iip, FsAction.READ);
+ }
+ if (dir.getECPolicy(iip)) {
+ // TODO HDFS-8074 and HDFS-7859 : To get from loaded schemas
+ Map<String, String> options = new HashMap<String, String>();
+ ECSchema defaultSchema = new ECSchema("RS-6-3", "rs", 6, 3, options);
+ return new ECInfo(src, defaultSchema);
+ }
+ } finally {
+ readUnlock();
+ }
+ return null;
+ }
+
void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
boolean logRetryCache)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index ce24662..fe9b901 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSLimitException;
@@ -2042,4 +2043,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
namesystem.checkSuperuserPrivilege();
nn.spanReceiverHost.removeSpanReceiver(id);
}
+
+ @Override // ClientNameNodeProtocol
+ public ECInfo getErasureCodingInfo(String src) throws IOException {
+ checkNNStartup();
+ return namesystem.getErasureCodingInfo(src);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index 183aff8..9488aed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -721,6 +721,14 @@ message CreateErasureCodingZoneRequestProto {
message CreateErasureCodingZoneResponseProto {
}
+message GetErasureCodingInfoRequestProto {
+ required string src = 1;
+}
+
+message GetErasureCodingInfoResponseProto {
+ optional ECInfoProto ECInfo = 1;
+}
+
service ClientNamenodeProtocol {
rpc getBlockLocations(GetBlockLocationsRequestProto)
returns(GetBlockLocationsResponseProto);
@@ -869,4 +877,6 @@ service ClientNamenodeProtocol {
returns(GetCurrentEditLogTxidResponseProto);
rpc getEditsFromTxid(GetEditsFromTxidRequestProto)
returns(GetEditsFromTxidResponseProto);
+ rpc getErasureCodingInfo(GetErasureCodingInfoRequestProto)
+ returns(GetErasureCodingInfoResponseProto);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index 67e2058..1314ea0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -620,3 +620,31 @@ message RollingUpgradeStatusProto {
required string blockPoolId = 1;
optional bool finalized = 2 [default = false];
}
+
+/**
+ * ECSchema options entry
+ */
+message ECSchemaOptionEntryProto {
+ required string key = 1;
+ required string value = 2;
+}
+
+/**
+ * ECShema for erasurecoding
+ */
+message ECSchemaProto {
+ required string schemaName = 1;
+ required string codecName = 2;
+ required uint32 dataUnits = 3;
+ required uint32 parityUnits = 4;
+ required uint32 chunkSize = 5;
+ repeated ECSchemaOptionEntryProto options = 6;
+}
+
+/**
+ * ECInfo
+ */
+message ECInfoProto {
+ required string src = 1;
+ required ECSchemaProto schema = 2;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
index 49f08eef..bdca915 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -29,8 +31,7 @@ import org.junit.Test;
import java.io.IOException;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
public class TestErasureCodingZones {
private final int NUM_OF_DATANODES = 3;
@@ -148,4 +149,37 @@ public class TestErasureCodingZones {
"destination have different erasure coding policies", e);
}
}
+
+ @Test
+ public void testGetErasureCodingInfo() throws Exception {
+ String src = "/ec";
+ final Path ecDir = new Path(src);
+ fs.mkdir(ecDir, FsPermission.getDirDefault());
+ // dir ECInfo before creating ec zone
+ assertNull(fs.getClient().getErasureCodingInfo(src));
+ // dir ECInfo after creating ec zone
+ fs.getClient().createErasureCodingZone(src);
+ verifyErasureCodingInfo(src);
+ fs.create(new Path(ecDir, "/child1")).close();
+ // verify for the files in ec zone
+ verifyErasureCodingInfo(src + "/child1");
+ }
+
+ private void verifyErasureCodingInfo(String src) throws IOException {
+ ECInfo ecInfo = fs.getClient().getErasureCodingInfo(src);
+ assertNotNull("ECInfo should have been non-null", ecInfo);
+ assertEquals(src, ecInfo.getSrc());
+ ECSchema schema = ecInfo.getSchema();
+ assertNotNull(schema);
+ assertEquals("Default schema should be returned", "RS-6-3",
+ schema.getSchemaName());
+ assertEquals("Default codec(rs) should be returned", "rs",
+ schema.getCodecName());
+ assertEquals("Default numDataUnits should be used", 6,
+ schema.getNumDataUnits());
+ assertEquals("Default numParityUnits should be used", 3,
+ schema.getNumParityUnits());
+ assertEquals("Default chunkSize should be used",
+ ECSchema.DEFAULT_CHUNK_SIZE, schema.getChunkSize());
+ }
}
\ No newline at end of file
[09/12] hadoop git commit: HDFS-7936. Erasure coding: resolving
conflicts in the branch when merging trunk changes (this commit mainly
addresses HDFS-8081 and HDFS-8048. Contributed by Zhe Zhang.
Posted by zh...@apache.org.
HDFS-7936. Erasure coding: resolving conflicts in the branch when merging trunk changes (this commit mainly addresses HDFS-8081 and HDFS-8048. Contributed by Zhe Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f8992da8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f8992da8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f8992da8
Branch: refs/heads/HDFS-7285
Commit: f8992da86d8f15241e426fe7a5175e17b08680cf
Parents: 0656213
Author: Zhe Zhang <zh...@apache.org>
Authored: Mon Apr 13 10:56:24 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:20 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hdfs/DFSInputStream.java | 4 ++--
.../apache/hadoop/hdfs/DFSStripedInputStream.java | 16 +++++++++-------
.../apache/hadoop/hdfs/DFSStripedOutputStream.java | 3 ++-
.../hadoop/hdfs/server/namenode/FSNamesystem.java | 5 +++--
.../hadoop/hdfs/TestDFSStripedOutputStream.java | 3 ++-
5 files changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8992da8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index e94c41a..6006693 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -1099,7 +1099,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
final int length = (int) (end - start + 1);
- actualGetFromOneDataNode(datanode, block, start, end, buf,
+ actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
new int[]{offset}, new int[]{length}, corruptedBlockMap);
}
@@ -1118,7 +1118,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* block replica
*/
void actualGetFromOneDataNode(final DNAddrPair datanode,
- LocatedBlock block, final long startInBlk, final long endInBlk,
+ long blockStartOffset, final long startInBlk, final long endInBlk,
byte[] buf, int[] offsets, int[] lengths,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8992da8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 077b0f8..8a431b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -224,7 +224,7 @@ public class DFSStripedInputStream extends DFSInputStream {
* Real implementation of pread.
*/
@Override
- protected void fetchBlockByteRange(LocatedBlock block, long start,
+ protected void fetchBlockByteRange(long blockStartOffset, long start,
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
@@ -234,7 +234,7 @@ public class DFSStripedInputStream extends DFSInputStream {
int len = (int) (end - start + 1);
// Refresh the striped block group
- block = getBlockGroupAt(block.getStartOffset());
+ LocatedBlock block = getBlockGroupAt(blockStartOffset);
assert block instanceof LocatedStripedBlock : "NameNode" +
" should return a LocatedStripedBlock for a striped file";
LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
@@ -254,9 +254,11 @@ public class DFSStripedInputStream extends DFSInputStream {
DatanodeInfo loc = blks[i].getLocations()[0];
StorageType type = blks[i].getStorageTypes()[0];
DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
- loc.getXferAddr(dfsClient.getConf().connectToDnViaHostname)), type);
- Callable<Void> readCallable = getFromOneDataNode(dnAddr, blks[i],
- rp.startOffsetInBlock, rp.startOffsetInBlock + rp.readLength - 1, buf,
+ loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
+ type);
+ Callable<Void> readCallable = getFromOneDataNode(dnAddr,
+ blks[i].getStartOffset(), rp.startOffsetInBlock,
+ rp.startOffsetInBlock + rp.readLength - 1, buf,
rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
@@ -272,7 +274,7 @@ public class DFSStripedInputStream extends DFSInputStream {
}
private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
- final LocatedBlock block, final long start, final long end,
+ final long blockStartOffset, final long start, final long end,
final byte[] buf, final int[] offsets, final int[] lengths,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final int hedgedReadId) {
@@ -283,7 +285,7 @@ public class DFSStripedInputStream extends DFSInputStream {
TraceScope scope =
Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan);
try {
- actualGetFromOneDataNode(datanode, block, start,
+ actualGetFromOneDataNode(datanode, blockStartOffset, start,
end, buf, offsets, lengths, corruptedBlockMap);
} finally {
scope.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8992da8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index aded4fe..1d0e1be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -284,7 +284,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
}
for (StripedDataStreamer streamer : streamers) {
streamer.setLastException(new IOException("Lease timeout of "
- + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
+ + (dfsClient.getConf().getHdfsTimeout()/1000) +
+ " seconds expired."));
}
closeThreads(true);
dfsClient.endFileLease(fileId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8992da8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index a504907..6d89b0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3067,7 +3067,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final long blockSize;
final short numTargets;
final byte storagePolicyID;
- final boolean isStriped;
Node clientNode = null;
String clientMachine = null;
@@ -3109,7 +3108,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
clientMachine);
// TODO: make block group size configurable (HDFS-7337)
- isStriped = pendingFile.isStriped();
+ boolean isStriped = pendingFile.isStriped();
numTargets = isStriped ?
HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS :
pendingFile.getFileReplication();
@@ -3138,6 +3137,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
ExtendedBlock previous, DatanodeStorageInfo[] targets) throws IOException {
Block newBlock = null;
long offset;
+ boolean isStriped;
checkOperation(OperationCategory.WRITE);
waitForLoadingFSImage();
writeLock();
@@ -3168,6 +3168,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
commitOrCompleteLastBlock(pendingFile, fileState.iip,
ExtendedBlock.getLocalBlock(previous));
+ isStriped = pendingFile.isStriped();
// allocate new block, record block locations in INode.
newBlock = createNewBlock(isStriped);
saveAllocatedBlock(src, fileState.iip, newBlock, targets,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8992da8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index f5a37f3..ee6998b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -5,6 +5,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -241,7 +242,7 @@ public class TestDFSStripedOutputStream {
}
block.setNumBytes(lenOfBlock);
- BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
+ BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)).
setFileName(src).
setBlock(block).
setBlockToken(lblock.getBlockToken()).