You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/30 17:42:32 UTC
[32/50] [abbrv] hadoop git commit: Merge remote-tracking branch
'apache/trunk' into HDFS-7285
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 7a7cd24,0000000..dabae2c
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@@ -1,1013 -1,0 +1,1014 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSPacket;
+import org.apache.hadoop.hdfs.DFSUtil;
++import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.RemoteBlockReader2;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode;
+
+/**
+ * ErasureCodingWorker handles the erasure coding recovery work commands. These
+ * commands would be issued from Namenode as part of Datanode's heart beat
+ * response. BPOfferService delegates the work to this class for handling EC
+ * commands.
+ */
+public final class ErasureCodingWorker {
+ private static final Log LOG = DataNode.LOG;
+
+ private final DataNode datanode;
+ private final Configuration conf;
+
+ private ThreadPoolExecutor STRIPED_BLK_RECOVERY_THREAD_POOL;
+ private ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
+ private final int STRIPED_READ_TIMEOUT_MILLIS;
+ private final int STRIPED_READ_BUFFER_SIZE;
+
+ public ErasureCodingWorker(Configuration conf, DataNode datanode) {
+ this.datanode = datanode;
+ this.conf = conf;
+
+ STRIPED_READ_TIMEOUT_MILLIS = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_KEY,
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
+ initializeStripedReadThreadPool(conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY,
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT));
+ STRIPED_READ_BUFFER_SIZE = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
+
+ initializeStripedBlkRecoveryThreadPool(conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY,
+ DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT));
+ }
+
+ private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
+ return CodecUtil.createRSRawDecoder(conf, numDataUnits, numParityUnits);
+ }
+
+ private void initializeStripedReadThreadPool(int num) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using striped reads; pool threads=" + num);
+ }
+ STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new Daemon.DaemonFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = super.newThread(r);
+ t.setName("stripedRead-" + threadIndex.getAndIncrement());
+ return t;
+ }
+ }, new ThreadPoolExecutor.CallerRunsPolicy() {
+ @Override
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
+ LOG.info("Execution for striped reading rejected, "
+ + "Executing in current thread");
+ // will run in the current thread
+ super.rejectedExecution(runnable, e);
+ }
+ });
+ STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
+ }
+
+ private void initializeStripedBlkRecoveryThreadPool(int num) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using striped block recovery; pool threads=" + num);
+ }
+ STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+ new Daemon.DaemonFactory() {
+ private final AtomicInteger threadIdx = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = super.newThread(r);
+ t.setName("stripedBlockRecovery-" + threadIdx.getAndIncrement());
+ return t;
+ }
+ });
+ STRIPED_BLK_RECOVERY_THREAD_POOL.allowCoreThreadTimeOut(true);
+ }
+
+ /**
+ * Handles the Erasure Coding recovery work commands.
+ *
+ * @param ecTasks
+ * BlockECRecoveryInfo
+ */
+ public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
+ for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
+ try {
+ STRIPED_BLK_RECOVERY_THREAD_POOL
+ .submit(new ReconstructAndTransferBlock(recoveryInfo));
+ } catch (Throwable e) {
+ LOG.warn("Failed to recover striped block "
+ + recoveryInfo.getExtendedBlock().getLocalBlock(), e);
+ }
+ }
+ }
+
+ /**
+ * ReconstructAndTransferBlock recover one or more missed striped block in the
+ * striped block group, the minimum number of live striped blocks should be
+ * no less than data block number.
+ *
+ * | <- Striped Block Group -> |
+ * blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group
+ * | | | |
+ * v v v v
+ * +------+ +------+ +------+ +------+
+ * |cell_0| |cell_1| |cell_2| |cell_3| ...
+ * +------+ +------+ +------+ +------+
+ * |cell_4| |cell_5| |cell_6| |cell_7| ...
+ * +------+ +------+ +------+ +------+
+ * |cell_8| |cell_9| |cell10| |cell11| ...
+ * +------+ +------+ +------+ +------+
+ * ... ... ... ...
+ *
+ *
+ * We use following steps to recover striped block group, in each round, we
+ * recover <code>bufferSize</code> data until finish, the
+ * <code>bufferSize</code> is configurable and may be less or larger than
+ * cell size:
+ * step1: read <code>bufferSize</code> data from minimum number of sources
+ * required by recovery.
+ * step2: decode data for targets.
+ * step3: transfer data to targets.
+ *
+ * In step1, try to read <code>bufferSize</code> data from minimum number
+ * of sources , if there is corrupt or stale sources, read from new source
+ * will be scheduled. The best sources are remembered for next round and
+ * may be updated in each round.
+ *
+ * In step2, typically if source blocks we read are all data blocks, we
+ * need to call encode, and if there is one parity block, we need to call
+ * decode. Notice we only read once and recover all missed striped block
+ * if they are more than one.
+ *
+ * In step3, send the recovered data to targets by constructing packet
+ * and send them directly. Same as continuous block replication, we
+ * don't check the packet ack. Since the datanode doing the recovery work
+ * are one of the source datanodes, so the recovered data are sent
+ * remotely.
+ *
+ * There are some points we can do further improvements in next phase:
+ * 1. we can read the block file directly on the local datanode,
+ * currently we use remote block reader. (Notice short-circuit is not
+ * a good choice, see inline comments).
+ * 2. We need to check the packet ack for EC recovery? Since EC recovery
+ * is more expensive than continuous block replication, it needs to
+ * read from several other datanodes, should we make sure the
+ * recovered result received by targets?
+ */
+ private class ReconstructAndTransferBlock implements Runnable {
+ private final int dataBlkNum;
+ private final int parityBlkNum;
+ private final int cellSize;
+
+ private RawErasureDecoder decoder;
+
+ // Striped read buffer size
+ private int bufferSize;
+
+ private final ExtendedBlock blockGroup;
+ private final int minRequiredSources;
+ // position in striped internal block
+ private long positionInBlock;
+
+ // sources
+ private final short[] liveIndices;
+ private final DatanodeInfo[] sources;
+
+ private final List<StripedReader> stripedReaders;
+
+ // The buffers and indices for striped blocks whose length is 0
+ private ByteBuffer[] zeroStripeBuffers;
+ private short[] zeroStripeIndices;
+
+ // targets
+ private final DatanodeInfo[] targets;
+ private final StorageType[] targetStorageTypes;
+
+ private final short[] targetIndices;
+ private final ByteBuffer[] targetBuffers;
+
+ private final Socket[] targetSockets;
+ private final DataOutputStream[] targetOutputStreams;
+ private final DataInputStream[] targetInputStreams;
+
+ private final long[] blockOffset4Targets;
+ private final long[] seqNo4Targets;
+
+ private final static int WRITE_PACKET_SIZE = 64 * 1024;
+ private DataChecksum checksum;
+ private int maxChunksPerPacket;
+ private byte[] packetBuf;
+ private byte[] checksumBuf;
+ private int bytesPerChecksum;
+ private int checksumSize;
+
+ private final CachingStrategy cachingStrategy;
+
+ private final Map<Future<Void>, Integer> futures = new HashMap<>();
+ private final CompletionService<Void> readService =
+ new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL);
+
+ ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) {
+ ErasureCodingPolicy ecPolicy = recoveryInfo.getErasureCodingPolicy();
+ dataBlkNum = ecPolicy.getNumDataUnits();
+ parityBlkNum = ecPolicy.getNumParityUnits();
+ cellSize = ecPolicy.getCellSize();
+
+ blockGroup = recoveryInfo.getExtendedBlock();
+ final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 1);
+ minRequiredSources = Math.min(cellsNum, dataBlkNum);
+
+ liveIndices = recoveryInfo.getLiveBlockIndices();
+ sources = recoveryInfo.getSourceDnInfos();
+ stripedReaders = new ArrayList<>(sources.length);
+
+ Preconditions.checkArgument(liveIndices.length >= minRequiredSources,
+ "No enough live striped blocks.");
+ Preconditions.checkArgument(liveIndices.length == sources.length,
+ "liveBlockIndices and source dns should match");
+
+ if (minRequiredSources < dataBlkNum) {
+ zeroStripeBuffers =
+ new ByteBuffer[dataBlkNum - minRequiredSources];
+ zeroStripeIndices = new short[dataBlkNum - minRequiredSources];
+ }
+
+ targets = recoveryInfo.getTargetDnInfos();
+ targetStorageTypes = recoveryInfo.getTargetStorageTypes();
+ targetIndices = new short[targets.length];
+ targetBuffers = new ByteBuffer[targets.length];
+
+ Preconditions.checkArgument(targetIndices.length <= parityBlkNum,
+ "Too much missed striped blocks.");
+
+ targetSockets = new Socket[targets.length];
+ targetOutputStreams = new DataOutputStream[targets.length];
+ targetInputStreams = new DataInputStream[targets.length];
+
+ blockOffset4Targets = new long[targets.length];
+ seqNo4Targets = new long[targets.length];
+
+ for (int i = 0; i < targets.length; i++) {
+ blockOffset4Targets[i] = 0;
+ seqNo4Targets[i] = 0;
+ }
+
+ getTargetIndices();
+ cachingStrategy = CachingStrategy.newDefaultStrategy();
+ }
+
+ private ByteBuffer allocateBuffer(int length) {
+ return ByteBuffer.allocate(length);
+ }
+
+ private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) {
+ return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize,
+ dataBlkNum, i);
+ }
+
+ private long getBlockLen(ExtendedBlock blockGroup, int i) {
+ return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(),
+ cellSize, dataBlkNum, i);
+ }
+
+ /**
+ * StripedReader is used to read from one source DN, it contains a block
+ * reader, buffer and striped block index.
+ * Only allocate StripedReader once for one source, and the StripedReader
+ * has the same array order with sources. Typically we only need to allocate
+ * minimum number (minRequiredSources) of StripedReader, and allocate
+ * new for new source DN if some existing DN invalid or slow.
+ * If some source DN is corrupt, set the corresponding blockReader to
+ * null and will never read from it again.
+ *
+ * @param i the array index of sources
+ * @param offsetInBlock offset for the internal block
+ * @return StripedReader
+ */
+ private StripedReader addStripedReader(int i, long offsetInBlock) {
+ StripedReader reader = new StripedReader(liveIndices[i]);
+ stripedReaders.add(reader);
+
+ BlockReader blockReader = newBlockReader(
+ getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]);
+ if (blockReader != null) {
+ initChecksumAndBufferSizeIfNeeded(blockReader);
+ reader.blockReader = blockReader;
+ }
+ reader.buffer = allocateBuffer(bufferSize);
+ return reader;
+ }
+
+ @Override
+ public void run() {
+ datanode.incrementXmitsInProgress();
+ try {
+ // Store the array indices of source DNs we have read successfully.
+ // In each iteration of read, the success list may be updated if
+ // some source DN is corrupted or slow. And use the updated success
+ // list of DNs for next iteration read.
+ int[] success = new int[minRequiredSources];
+
+ int nsuccess = 0;
+ for (int i = 0;
+ i < sources.length && nsuccess < minRequiredSources; i++) {
+ StripedReader reader = addStripedReader(i, 0);
+ if (reader.blockReader != null) {
+ success[nsuccess++] = i;
+ }
+ }
+
+ if (nsuccess < minRequiredSources) {
+ String error = "Can't find minimum sources required by "
+ + "recovery, block id: " + blockGroup.getBlockId();
+ throw new IOException(error);
+ }
+
+ if (zeroStripeBuffers != null) {
+ for (int i = 0; i < zeroStripeBuffers.length; i++) {
+ zeroStripeBuffers[i] = allocateBuffer(bufferSize);
+ }
+ }
+
+ for (int i = 0; i < targets.length; i++) {
+ targetBuffers[i] = allocateBuffer(bufferSize);
+ }
+
+ checksumSize = checksum.getChecksumSize();
+ int chunkSize = bytesPerChecksum + checksumSize;
+ maxChunksPerPacket = Math.max(
+ (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN)/chunkSize, 1);
+ int maxPacketSize = chunkSize * maxChunksPerPacket
+ + PacketHeader.PKT_MAX_HEADER_LEN;
+
+ packetBuf = new byte[maxPacketSize];
+ checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)];
+
+ // targetsStatus store whether some target is success, it will record
+ // any failed target once, if some target failed (invalid DN or transfer
+ // failed), will not transfer data to it any more.
+ boolean[] targetsStatus = new boolean[targets.length];
+ if (initTargetStreams(targetsStatus) == 0) {
+ String error = "All targets are failed.";
+ throw new IOException(error);
+ }
+
+ long firstStripedBlockLength = getBlockLen(blockGroup, 0);
+ while (positionInBlock < firstStripedBlockLength) {
+ int toRead = Math.min(
+ bufferSize, (int)(firstStripedBlockLength - positionInBlock));
+ // step1: read from minimum source DNs required for reconstruction.
+ // The returned success list is the source DNs we do real read from
+ success = readMinimumStripedData4Recovery(success);
+
+ // step2: decode to reconstruct targets
+ long remaining = firstStripedBlockLength - positionInBlock;
+ int toRecoverLen = remaining < bufferSize ?
+ (int)remaining : bufferSize;
+ recoverTargets(success, targetsStatus, toRecoverLen);
+
+ // step3: transfer data
+ if (transferData2Targets(targetsStatus) == 0) {
+ String error = "Transfer failed for all targets.";
+ throw new IOException(error);
+ }
+
+ clearBuffers();
+ positionInBlock += toRead;
+ }
+
+ endTargetBlocks(targetsStatus);
+
+ // Currently we don't check the acks for packets, this is similar as
+ // block replication.
+ } catch (Throwable e) {
+ LOG.warn("Failed to recover striped block: " + blockGroup, e);
+ } finally {
+ datanode.decrementXmitsInProgress();
+ // close block readers
+ for (StripedReader stripedReader : stripedReaders) {
+ closeBlockReader(stripedReader.blockReader);
+ }
+ for (int i = 0; i < targets.length; i++) {
+ IOUtils.closeStream(targetOutputStreams[i]);
+ IOUtils.closeStream(targetInputStreams[i]);
+ IOUtils.closeStream(targetSockets[i]);
+ }
+ }
+ }
+
+ // init checksum from block reader
+ private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) {
+ if (checksum == null) {
+ checksum = blockReader.getDataChecksum();
+ bytesPerChecksum = checksum.getBytesPerChecksum();
+ // The bufferSize is flat to divide bytesPerChecksum
+ int readBufferSize = STRIPED_READ_BUFFER_SIZE;
+ bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum :
+ readBufferSize - readBufferSize % bytesPerChecksum;
+ } else {
+ assert blockReader.getDataChecksum().equals(checksum);
+ }
+ }
+
+ private void getTargetIndices() {
+ BitSet bitset = new BitSet(dataBlkNum + parityBlkNum);
+ for (int i = 0; i < sources.length; i++) {
+ bitset.set(liveIndices[i]);
+ }
+ int m = 0;
+ int k = 0;
+ for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+ if (!bitset.get(i)) {
+ if (getBlockLen(blockGroup, i) > 0) {
+ if (m < targets.length) {
+ targetIndices[m++] = (short)i;
+ }
+ } else {
+ zeroStripeIndices[k++] = (short)i;
+ }
+ }
+ }
+ }
+
+ private long getReadLength(int index) {
+ long blockLen = getBlockLen(blockGroup, index);
+ long remaining = blockLen - positionInBlock;
+ return remaining > bufferSize ? bufferSize : remaining;
+ }
+
+ /**
+ * Read from minimum source DNs required for reconstruction in the iteration.
+ * First try the success list which we think they are the best DNs
+ * If source DN is corrupt or slow, try to read some other source DN,
+ * and will update the success list.
+ *
+ * Remember the updated success list and return it for following
+ * operations and next iteration read.
+ *
+ * @param success the initial success list of source DNs we think best
+ * @return updated success list of source DNs we do real read
+ * @throws IOException
+ */
+ private int[] readMinimumStripedData4Recovery(final int[] success)
+ throws IOException {
+ int nsuccess = 0;
+ int[] newSuccess = new int[minRequiredSources];
+ BitSet used = new BitSet(sources.length);
+ /*
+ * Read from minimum source DNs required, the success list contains
+ * source DNs which we think best.
+ */
+ for (int i = 0; i < minRequiredSources; i++) {
+ StripedReader reader = stripedReaders.get(success[i]);
+ if (getReadLength(liveIndices[success[i]]) > 0) {
+ Callable<Void> readCallable = readFromBlock(
+ reader.blockReader, reader.buffer);
+ Future<Void> f = readService.submit(readCallable);
+ futures.put(f, success[i]);
+ } else {
+ // If the read length is 0, we don't need to do real read
+ reader.buffer.position(0);
+ newSuccess[nsuccess++] = success[i];
+ }
+ used.set(success[i]);
+ }
+
+ while (!futures.isEmpty()) {
+ try {
+ StripingChunkReadResult result =
+ StripedBlockUtil.getNextCompletedStripedRead(
+ readService, futures, STRIPED_READ_TIMEOUT_MILLIS);
+ int resultIndex = -1;
+ if (result.state == StripingChunkReadResult.SUCCESSFUL) {
+ resultIndex = result.index;
+ } else if (result.state == StripingChunkReadResult.FAILED) {
+ // If read failed for some source DN, we should not use it anymore
+ // and schedule read from another source DN.
+ StripedReader failedReader = stripedReaders.get(result.index);
+ closeBlockReader(failedReader.blockReader);
+ failedReader.blockReader = null;
+ resultIndex = scheduleNewRead(used);
+ } else if (result.state == StripingChunkReadResult.TIMEOUT) {
+ // If timeout, we also schedule a new read.
+ resultIndex = scheduleNewRead(used);
+ }
+ if (resultIndex >= 0) {
+ newSuccess[nsuccess++] = resultIndex;
+ if (nsuccess >= minRequiredSources) {
+ // cancel remaining reads if we read successfully from minimum
+ // number of source DNs required by reconstruction.
+ cancelReads(futures.keySet());
+ futures.clear();
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Read data interrupted.", e);
+ break;
+ }
+ }
+
+ if (nsuccess < minRequiredSources) {
+ String error = "Can't read data from minimum number of sources "
+ + "required by reconstruction, block id: " + blockGroup.getBlockId();
+ throw new IOException(error);
+ }
+
+ return newSuccess;
+ }
+
+ private void paddingBufferToLen(ByteBuffer buffer, int len) {
+ int toPadding = len - buffer.position();
+ for (int i = 0; i < toPadding; i++) {
+ buffer.put((byte) 0);
+ }
+ }
+
+ // Initialize decoder
+ private void initDecoderIfNecessary() {
+ if (decoder == null) {
+ decoder = newDecoder(dataBlkNum, parityBlkNum);
+ }
+ }
+
+ private int[] getErasedIndices(boolean[] targetsStatus) {
+ int[] result = new int[targets.length];
+ int m = 0;
+ for (int i = 0; i < targets.length; i++) {
+ if (targetsStatus[i]) {
+ result[m++] = convertIndex4Decode(targetIndices[i],
+ dataBlkNum, parityBlkNum);
+ }
+ }
+ return Arrays.copyOf(result, m);
+ }
+
+ private void recoverTargets(int[] success, boolean[] targetsStatus,
+ int toRecoverLen) {
+ initDecoderIfNecessary();
+ ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum];
+ for (int i = 0; i < success.length; i++) {
+ StripedReader reader = stripedReaders.get(success[i]);
+ ByteBuffer buffer = reader.buffer;
+ paddingBufferToLen(buffer, toRecoverLen);
+ inputs[convertIndex4Decode(reader.index, dataBlkNum, parityBlkNum)] =
+ (ByteBuffer)buffer.flip();
+ }
+ if (success.length < dataBlkNum) {
+ for (int i = 0; i < zeroStripeBuffers.length; i++) {
+ ByteBuffer buffer = zeroStripeBuffers[i];
+ paddingBufferToLen(buffer, toRecoverLen);
+ int index = convertIndex4Decode(zeroStripeIndices[i], dataBlkNum,
+ parityBlkNum);
+ inputs[index] = (ByteBuffer)buffer.flip();
+ }
+ }
+ int[] erasedIndices = getErasedIndices(targetsStatus);
+ ByteBuffer[] outputs = new ByteBuffer[erasedIndices.length];
+ int m = 0;
+ for (int i = 0; i < targetBuffers.length; i++) {
+ if (targetsStatus[i]) {
+ outputs[m++] = targetBuffers[i];
+ outputs[i].limit(toRecoverLen);
+ }
+ }
+ decoder.decode(inputs, erasedIndices, outputs);
+
+ for (int i = 0; i < targets.length; i++) {
+ if (targetsStatus[i]) {
+ long blockLen = getBlockLen(blockGroup, targetIndices[i]);
+ long remaining = blockLen - positionInBlock;
+ if (remaining < 0) {
+ targetBuffers[i].limit(0);
+ } else if (remaining < toRecoverLen) {
+ targetBuffers[i].limit((int)remaining);
+ }
+ }
+ }
+ }
+
+ /**
+ * Schedule a read from some new source DN if some DN is corrupted
+ * or slow, this is called from the read iteration.
+ * Initially we may only have <code>minRequiredSources</code> number of
+ * StripedReader.
+ * If the position is at the end of target block, don't need to do
+ * real read, and return the array index of source DN, otherwise -1.
+ *
+ * @param used the used source DNs in this iteration.
+ * @return the array index of source DN if don't need to do real read.
+ */
+ private int scheduleNewRead(BitSet used) {
+ StripedReader reader = null;
+ // step1: initially we may only have <code>minRequiredSources</code>
+ // number of StripedReader, and there may be some source DNs we never
+ // read before, so will try to create StripedReader for one new source DN
+ // and try to read from it. If found, go to step 3.
+ int m = stripedReaders.size();
+ while (reader == null && m < sources.length) {
+ reader = addStripedReader(m, positionInBlock);
+ if (getReadLength(liveIndices[m]) > 0) {
+ if (reader.blockReader == null) {
+ reader = null;
+ m++;
+ }
+ } else {
+ used.set(m);
+ return m;
+ }
+ }
+
+ // step2: if there is no new source DN we can use, try to find a source
+ // DN we ever read from but because some reason, e.g., slow, it
+ // is not in the success DN list at the begin of this iteration, so
+ // we have not tried it in this iteration. Now we have a chance to
+ // revisit it again.
+ for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
+ if (!used.get(i)) {
+ StripedReader r = stripedReaders.get(i);
+ if (getReadLength(liveIndices[i]) > 0) {
+ closeBlockReader(r.blockReader);
+ r.blockReader = newBlockReader(
+ getBlock(blockGroup, liveIndices[i]), positionInBlock,
+ sources[i]);
+ if (r.blockReader != null) {
+ m = i;
+ reader = r;
+ }
+ } else {
+ used.set(i);
+ r.buffer.position(0);
+ return i;
+ }
+ }
+ }
+
+ // step3: schedule if find a correct source DN and need to do real read.
+ if (reader != null) {
+ Callable<Void> readCallable = readFromBlock(
+ reader.blockReader, reader.buffer);
+ Future<Void> f = readService.submit(readCallable);
+ futures.put(f, m);
+ used.set(m);
+ }
+
+ return -1;
+ }
+
+ // cancel all reads.
+ private void cancelReads(Collection<Future<Void>> futures) {
+ for (Future<Void> future : futures) {
+ future.cancel(true);
+ }
+ }
+
+ private Callable<Void> readFromBlock(final BlockReader reader,
+ final ByteBuffer buf) {
+ return new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ actualReadFromBlock(reader, buf);
+ return null;
+ } catch (IOException e) {
+ LOG.info(e.getMessage());
+ throw e;
+ }
+ }
+
+ };
+ }
+
+ /**
+ * Read bytes from block
+ */
+ private void actualReadFromBlock(BlockReader reader, ByteBuffer buf)
+ throws IOException {
+ int len = buf.remaining();
+ int n = 0;
+ while (n < len) {
+ int nread = reader.read(buf);
+ if (nread <= 0) {
+ break;
+ }
+ n += nread;
+ }
+ }
+
+ // close block reader
+ private void closeBlockReader(BlockReader blockReader) {
+ try {
+ if (blockReader != null) {
+ blockReader.close();
+ }
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+ private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
+ return NetUtils.createSocketAddr(dnInfo.getXferAddr(
+ datanode.getDnConf().getConnectToDnViaHostname()));
+ }
+
+ private BlockReader newBlockReader(final ExtendedBlock block,
+ long offsetInBlock, DatanodeInfo dnInfo) {
+ if (offsetInBlock >= block.getNumBytes()) {
+ return null;
+ }
+ try {
+ InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo);
+ Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
+ block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ));
+ /*
+ * This can be further improved if the replica is local, then we can
+ * read directly from DN and need to check the replica is FINALIZED
+ * state, notice we should not use short-circuit local read which
+ * requires config for domain-socket in UNIX or legacy config in Windows.
+ */
+ return RemoteBlockReader2.newBlockReader(
+ "dummy", block, blockToken, offsetInBlock,
+ block.getNumBytes() - offsetInBlock, true,
+ "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo,
+ null, cachingStrategy);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+ throws IOException {
+ Peer peer = null;
+ boolean success = false;
+ Socket sock = null;
+ final int socketTimeout = datanode.getDnConf().getSocketTimeout();
+ try {
+ sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+ NetUtils.connect(sock, addr, socketTimeout);
- peer = TcpPeerServer.peerFromSocketAndKey(datanode.getSaslClient(),
++ peer = DFSUtilClient.peerFromSocketAndKey(datanode.getSaslClient(),
+ sock, datanode.getDataEncryptionKeyFactoryForBlock(b),
+ blockToken, datanodeId);
+ peer.setReadTimeout(socketTimeout);
+ success = true;
+ return peer;
+ } finally {
+ if (!success) {
+ IOUtils.cleanup(LOG, peer);
+ IOUtils.closeSocket(sock);
+ }
+ }
+ }
+
+ /**
+ * Send data to targets
+ */
+ private int transferData2Targets(boolean[] targetsStatus) {
+ int nsuccess = 0;
+ for (int i = 0; i < targets.length; i++) {
+ if (targetsStatus[i]) {
+ boolean success = false;
+ try {
+ ByteBuffer buffer = targetBuffers[i];
+
+ if (buffer.remaining() == 0) {
+ continue;
+ }
+
+ checksum.calculateChunkedSums(
+ buffer.array(), 0, buffer.remaining(), checksumBuf, 0);
+
+ int ckOff = 0;
+ while (buffer.remaining() > 0) {
+ DFSPacket packet = new DFSPacket(packetBuf, maxChunksPerPacket,
+ blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, false);
+ int maxBytesToPacket = maxChunksPerPacket * bytesPerChecksum;
+ int toWrite = buffer.remaining() > maxBytesToPacket ?
+ maxBytesToPacket : buffer.remaining();
+ int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * checksumSize;
+ packet.writeChecksum(checksumBuf, ckOff, ckLen);
+ ckOff += ckLen;
+ packet.writeData(buffer, toWrite);
+
+ // Send packet
+ packet.writeTo(targetOutputStreams[i]);
+
+ blockOffset4Targets[i] += toWrite;
+ nsuccess++;
+ success = true;
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ }
+ targetsStatus[i] = success;
+ }
+ }
+ return nsuccess;
+ }
+
+ /**
+ * clear all buffers
+ */
+ private void clearBuffers() {
+ for (StripedReader stripedReader : stripedReaders) {
+ if (stripedReader.buffer != null) {
+ stripedReader.buffer.clear();
+ }
+ }
+
+ if (zeroStripeBuffers != null) {
+ for (int i = 0; i < zeroStripeBuffers.length; i++) {
+ zeroStripeBuffers[i].clear();
+ }
+ }
+
+ for (int i = 0; i < targetBuffers.length; i++) {
+ if (targetBuffers[i] != null) {
+ cleanBuffer(targetBuffers[i]);
+ }
+ }
+ }
+
+ private ByteBuffer cleanBuffer(ByteBuffer buffer) {
+ Arrays.fill(buffer.array(), (byte) 0);
+ return (ByteBuffer)buffer.clear();
+ }
+
+ // send an empty packet to mark the end of the block
+ private void endTargetBlocks(boolean[] targetsStatus) {
+ for (int i = 0; i < targets.length; i++) {
+ if (targetsStatus[i]) {
+ try {
+ DFSPacket packet = new DFSPacket(packetBuf, 0,
+ blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, true);
+ packet.writeTo(targetOutputStreams[i]);
+ targetOutputStreams[i].flush();
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ }
+ }
+ }
+ }
+
+ /**
+ * Initialize output/input streams for transferring data to target
+ * and send create block request.
+ */
+ private int initTargetStreams(boolean[] targetsStatus) {
+ int nsuccess = 0;
+ for (int i = 0; i < targets.length; i++) {
+ Socket socket = null;
+ DataOutputStream out = null;
+ DataInputStream in = null;
+ boolean success = false;
+ try {
+ InetSocketAddress targetAddr =
+ getSocketAddress4Transfer(targets[i]);
+ socket = datanode.newSocket();
+ NetUtils.connect(socket, targetAddr,
+ datanode.getDnConf().getSocketTimeout());
+ socket.setSoTimeout(datanode.getDnConf().getSocketTimeout());
+
+ ExtendedBlock block = getBlock(blockGroup, targetIndices[i]);
+ Token<BlockTokenIdentifier> blockToken =
+ datanode.getBlockAccessToken(block,
+ EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
+
+ long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
+ OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout);
+ InputStream unbufIn = NetUtils.getInputStream(socket);
+ DataEncryptionKeyFactory keyFactory =
+ datanode.getDataEncryptionKeyFactoryForBlock(block);
+ IOStreamPair saslStreams = datanode.getSaslClient().socketSend(
+ socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]);
+
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
+
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+ DFSUtil.getSmallBufferSize(conf)));
+ in = new DataInputStream(unbufIn);
+
+ DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId());
+ new Sender(out).writeBlock(block, targetStorageTypes[i],
+ blockToken, "", new DatanodeInfo[]{targets[i]},
+ new StorageType[]{targetStorageTypes[i]}, source,
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0,
+ checksum, cachingStrategy, false, false, null);
+
+ targetSockets[i] = socket;
+ targetOutputStreams[i] = out;
+ targetInputStreams[i] = in;
+ nsuccess++;
+ success = true;
+ } catch (Throwable e) {
+ LOG.warn(e.getMessage());
+ } finally {
+ if (!success) {
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(socket);
+ }
+ }
+ targetsStatus[i] = success;
+ }
+ return nsuccess;
+ }
+ }
+
+ private static class StripedReader {
+ private final short index; // internal block index
+ private BlockReader blockReader;
+ private ByteBuffer buffer;
+
+ private StripedReader(short index) {
+ this.index = index;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index a115138,0ae739c..34b28e4
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@@ -39,10 -39,8 +39,9 @@@ import org.apache.hadoop.fs.permission.
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
- import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
@@@ -334,21 -331,13 +333,21 @@@ public final class FSImageFormatPBINod
INodeSection.INodeFile f = n.getFile();
List<BlockProto> bp = f.getBlocksList();
short replication = (short) f.getReplication();
+ boolean isStriped = f.getIsStriped();
LoaderContext state = parent.getLoaderContext();
+ ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy();
BlockInfo[] blocks = new BlockInfo[bp.size()];
- for (int i = 0, e = bp.size(); i < e; ++i) {
- blocks[i] =
- new BlockInfoContiguous(PBHelperClient.convert(bp.get(i)), replication);
+ for (int i = 0; i < bp.size(); ++i) {
+ BlockProto b = bp.get(i);
+ if (isStriped) {
- blocks[i] = new BlockInfoStriped(PBHelper.convert(b), ecPolicy);
++ blocks[i] = new BlockInfoStriped(PBHelperClient.convert(b), ecPolicy);
+ } else {
- blocks[i] = new BlockInfoContiguous(PBHelper.convert(b),
++ blocks[i] = new BlockInfoContiguous(PBHelperClient.convert(b),
+ replication);
+ }
}
+
final PermissionStatus permissions = loadPermission(f.getPermission(),
parent.getLoaderContext().getStringTable());
@@@ -654,11 -632,10 +653,11 @@@
private void save(OutputStream out, INodeFile n) throws IOException {
INodeSection.INodeFile.Builder b = buildINodeFile(n,
parent.getSaverContext());
+ BlockInfo[] blocks = n.getBlocks();
- if (n.getBlocks() != null) {
+ if (blocks != null) {
for (Block block : n.getBlocks()) {
- b.addBlocks(PBHelper.convert(block));
+ b.addBlocks(PBHelperClient.convert(block));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index ed52ca4,75b6be9..b6b151c
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@@ -4714,26 -4654,8 +4713,8 @@@ public class FSNamesystem implements Na
&& safeMode.isOn();
}
- /**
- * Check if replication queues are to be populated
- * @return true when node is HAState.Active and not in the very first safemode
- */
- @Override
- public boolean isPopulatingReplQueues() {
- if (!shouldPopulateReplQueues()) {
- return false;
- }
- return initializedReplQueues;
- }
-
- private boolean shouldPopulateReplQueues() {
- if(haContext == null || haContext.getState() == null)
- return false;
- return haContext.getState().shouldPopulateReplQueues();
- }
-
@Override
- public void incrementSafeBlockCount(int replication) {
+ public void incrementSafeBlockCount(int storageNum, BlockInfo storedBlock) {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
@@@ -6233,11 -6150,11 +6222,16 @@@
return cacheManager;
}
+ /** @return the ErasureCodingPolicyManager. */
+ public ErasureCodingPolicyManager getErasureCodingPolicyManager() {
+ return ecPolicyManager;
+ }
+
+ @Override
+ public HAContext getHAContext() {
+ return haContext;
+ }
+
@Override // NameNodeMXBean
public String getCorruptFiles() {
List<String> list = new ArrayList<String>();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 6f7b702,8565522..c765edc
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@@ -664,37 -608,12 +664,24 @@@ public class INodeFile extends INodeWit
return counts;
}
+ /**
+ * Compute quota of striped file. Note that currently EC files do not support
+ * append/hflush/hsync, thus the file length recorded in snapshots should be
+ * the same with the current file length.
+ */
+ public final QuotaCounts computeQuotaUsageWithStriped(
+ BlockStoragePolicy bsp, QuotaCounts counts) {
+ counts.addNameSpace(1);
+ counts.add(storagespaceConsumed(bsp));
+ return counts;
+ }
+
@Override
public final ContentSummaryComputationContext computeContentSummary(
- final ContentSummaryComputationContext summary) {
+ int snapshotId, final ContentSummaryComputationContext summary) {
final ContentCounts counts = summary.getCounts();
- FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
- final long fileLen;
- if (sf == null) {
- fileLen = computeFileSize();
- counts.addContent(Content.FILE, 1);
- } else {
- final FileDiffList diffs = sf.getDiffs();
- final int n = diffs.asList().size();
- counts.addContent(Content.FILE, n);
- if (n > 0 && sf.isCurrentFileDeleted()) {
- fileLen = diffs.getLast().getFileSize();
- } else {
- fileLen = computeFileSize();
- }
- }
+ counts.addContent(Content.FILE, 1);
+ final long fileLen = computeFileSize(snapshotId);
counts.addContent(Content.LENGTH, fileLen);
counts.addContent(Content.DISKSPACE, storagespaceConsumed(null)
.getStorageSpace());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index e1702d9,5bc4033..b1012c2
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@@ -64,4 -52,5 +65,6 @@@ public interface Namesystem extends RwL
boolean isInSnapshot(BlockInfo blockUC);
CacheManager getCacheManager();
++
+ HAContext getHAContext();
-}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
index 252844c,06a8219..98deed2
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
@@@ -39,15 -39,11 +39,12 @@@ public interface SafeMode
*/
public boolean isInStartupSafeMode();
- /** Check whether replication queues are being populated. */
- public boolean isPopulatingReplQueues();
-
/**
* Increment number of blocks that reached minimal replication.
- * @param replication current replication
+ * @param replication current replication
+ * @param storedBlock current stored Block
*/
- public void incrementSafeBlockCount(int replication);
+ public void incrementSafeBlockCount(int replication, BlockInfo storedBlock);
/** Decrement number of blocks that reached minimal replication. */
public void decrementSafeBlockCount(BlockInfo b);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
index 450d981,cf21411..ae23783
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
@@@ -243,15 -242,13 +243,15 @@@ public class FSImageFormatPBSnapshot
FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null,
pbf.getFileSize());
List<BlockProto> bpl = pbf.getBlocksList();
+ // in file diff there can only be contiguous blocks
BlockInfo[] blocks = new BlockInfo[bpl.size()];
for(int j = 0, e = bpl.size(); j < e; ++j) {
- Block blk = PBHelper.convert(bpl.get(j));
+ Block blk = PBHelperClient.convert(bpl.get(j));
BlockInfo storedBlock = bm.getStoredBlock(blk);
if(storedBlock == null) {
- storedBlock = bm.addBlockCollection(
- new BlockInfoContiguous(blk, copy.getFileReplication()), file);
+ storedBlock = (BlockInfoContiguous) fsn.getBlockManager()
+ .addBlockCollectionWithCheck(new BlockInfoContiguous(blk,
+ copy.getFileReplication()), file);
}
blocks[j] = storedBlock;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java
index 6c06a8d,0000000..0499a2b
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java
@@@ -1,38 -1,0 +1,39 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli;
+
+import org.apache.hadoop.cli.util.CLICommandErasureCodingCli;
+import org.apache.hadoop.cli.util.CLICommandTypes;
+import org.apache.hadoop.cli.util.CLITestCmd;
+import org.apache.hadoop.cli.util.CommandExecutor;
+import org.apache.hadoop.cli.util.ErasureCodingCliCmdExecutor;
++import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.tools.erasurecode.ECCli;
+
+public class CLITestCmdErasureCoding extends CLITestCmd {
+ public CLITestCmdErasureCoding(String str, CLICommandTypes type) {
+ super(str, type);
+ }
+
+ @Override
- public CommandExecutor getExecutor(String tag) throws IllegalArgumentException {
++ public CommandExecutor getExecutor(String tag, Configuration conf) throws IllegalArgumentException {
+ if (getType() instanceof CLICommandErasureCodingCli)
+ return new ErasureCodingCliCmdExecutor(tag, new ECCli());
- return super.getExecutor(tag);
++ return super.getExecutor(tag, conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java
index dfefb66,0000000..29ec98e
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java
@@@ -1,115 -1,0 +1,115 @@@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.cli;
+
+import org.apache.hadoop.cli.util.CLICommand;
+import org.apache.hadoop.cli.util.CLICommandErasureCodingCli;
+import org.apache.hadoop.cli.util.CommandExecutor.Result;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+public class TestErasureCodingCLI extends CLITestHelper {
+ private final int NUM_OF_DATANODES = 3;
+ private MiniDFSCluster dfsCluster = null;
+ private FileSystem fs = null;
+ private String namenode = null;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ dfsCluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(NUM_OF_DATANODES).build();
+ dfsCluster.waitClusterUp();
+ namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
+
+ username = System.getProperty("user.name");
+
+ fs = dfsCluster.getFileSystem();
+ }
+
+ @Override
+ protected String getTestFile() {
+ return "testErasureCodingConf.xml";
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ if (fs != null) {
+ fs.close();
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ Thread.sleep(2000);
+ super.tearDown();
+ }
+
+ @Override
+ protected String expandCommand(final String cmd) {
+ String expCmd = cmd;
+ expCmd = expCmd.replaceAll("NAMENODE", namenode);
+ expCmd = expCmd.replaceAll("#LF#", System.getProperty("line.separator"));
+ expCmd = super.expandCommand(expCmd);
+ return expCmd;
+ }
+
+ @Override
+ protected TestConfigFileParser getConfigParser() {
+ return new TestErasureCodingAdmin();
+ }
+
+ private class TestErasureCodingAdmin extends
+ CLITestHelper.TestConfigFileParser {
+ @Override
+ public void endElement(String uri, String localName, String qName)
+ throws SAXException {
+ if (qName.equals("ec-admin-command")) {
+ if (testCommands != null) {
+ testCommands.add(new CLITestCmdErasureCoding(charString,
+ new CLICommandErasureCodingCli()));
+ } else if (cleanupCommands != null) {
+ cleanupCommands.add(new CLITestCmdErasureCoding(charString,
+ new CLICommandErasureCodingCli()));
+ }
+ } else {
+ super.endElement(uri, localName, qName);
+ }
+ }
+ }
+
+ @Override
+ protected Result execute(CLICommand cmd) throws Exception {
- return cmd.getExecutor(namenode).executeCommand(cmd.getCmd());
++ return cmd.getExecutor(namenode, conf).executeCommand(cmd.getCmd());
+ }
+
+ @Test
+ @Override
+ public void testAll() {
+ super.testAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 3551055,a7e80ca..12d4811
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@@ -66,14 -66,9 +66,15 @@@ import java.util.Set
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
+ import org.apache.commons.lang.UnhandledException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@@ -141,10 -133,8 +142,11 @@@ import org.apache.hadoop.hdfs.server.na
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
+ import org.apache.hadoop.hdfs.tools.JMXGet;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.NetUtils;
@@@ -1870,150 -1858,21 +1872,168 @@@ public class DFSTestUtil
}
}
+ public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
+ Block block, BlockStatus blockStatus, DatanodeStorage storage) {
+ ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1];
+ receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, blockStatus, null);
+ StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1];
+ reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
+ return reports;
+ }
+
+ /**
+ * Creates the metadata of a file in striped layout. This method only
+ * manipulates the NameNode state without injecting data to DataNode.
+ * You should disable periodical heartbeat before use this.
+ * @param file Path of the file to create
+ * @param dir Parent path of the file
+ * @param numBlocks Number of striped block groups to add to the file
+ * @param numStripesPerBlk Number of striped cells in each block
+ * @param toMkdir
+ */
+ public static void createStripedFile(MiniDFSCluster cluster, Path file, Path dir,
+ int numBlocks, int numStripesPerBlk, boolean toMkdir) throws Exception {
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ // If outer test already set EC policy, dir should be left as null
+ if (toMkdir) {
+ assert dir != null;
+ dfs.mkdirs(dir);
+ try {
+ dfs.getClient().setErasureCodingPolicy(dir.toString(), null);
+ } catch (IOException e) {
+ if (!e.getMessage().contains("non-empty directory")) {
+ throw e;
+ }
+ }
+ }
+
+ FSDataOutputStream out = null;
+ try {
+ out = dfs.create(file, (short) 1); // create an empty file
+
+ FSNamesystem ns = cluster.getNamesystem();
+ FSDirectory fsdir = ns.getFSDirectory();
+ INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
+
+ ExtendedBlock previous = null;
+ for (int i = 0; i < numBlocks; i++) {
+ Block newBlock = addStripedBlockToFile(cluster.getDataNodes(), dfs, ns,
+ file.toString(), fileNode, dfs.getClient().getClientName(),
+ previous, numStripesPerBlk);
+ previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
+ }
+
+ dfs.getClient().namenode.complete(file.toString(),
+ dfs.getClient().getClientName(), previous, fileNode.getId());
+ } finally {
+ IOUtils.cleanup(null, out);
+ }
+ }
+
+ /**
+ * Adds a striped block group to a file. This method only manipulates NameNode
+ * states of the file and the block without injecting data to DataNode.
+ * It does mimic block reports.
+ * You should disable periodical heartbeat before use this.
+ * @param dataNodes List DataNodes to host the striped block group
+ * @param previous Previous block in the file
+ * @param numStripes Number of stripes in each block group
+ * @return The added block group
+ */
+ public static Block addStripedBlockToFile(List<DataNode> dataNodes,
+ DistributedFileSystem fs, FSNamesystem ns, String file, INodeFile fileNode,
+ String clientName, ExtendedBlock previous, int numStripes)
+ throws Exception {
+ fs.getClient().namenode.addBlock(file, clientName, previous, null,
+ fileNode.getId(), null);
+
+ final BlockInfo lastBlock = fileNode.getLastBlock();
+ final int groupSize = fileNode.getPreferredBlockReplication();
+ assert dataNodes.size() >= groupSize;
+ // 1. RECEIVING_BLOCK IBR
+ for (int i = 0; i < groupSize; i++) {
+ DataNode dn = dataNodes.get(i);
+ final Block block = new Block(lastBlock.getBlockId() + i, 0,
+ lastBlock.getGenerationStamp());
+ DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+ .makeReportForReceivedBlock(block,
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
+ for (StorageReceivedDeletedBlocks report : reports) {
+ ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+ }
+ }
+
+ // 2. RECEIVED_BLOCK IBR
+ for (int i = 0; i < groupSize; i++) {
+ DataNode dn = dataNodes.get(i);
+ final Block block = new Block(lastBlock.getBlockId() + i,
+ numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp());
+ DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+ .makeReportForReceivedBlock(block,
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+ for (StorageReceivedDeletedBlocks report : reports) {
+ ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+ }
+ }
+
+ lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS);
+ return lastBlock;
+ }
+
+ /**
+ * Because currently DFSStripedOutputStream does not support hflush/hsync,
+ * tests can use this method to flush all the buffered data to DataNodes.
+ */
+ public static ExtendedBlock flushInternal(DFSStripedOutputStream out)
+ throws IOException {
+ out.flushInternal();
+ return out.getBlock();
+ }
+
+ /**
+ * Verify that blocks in striped block group are on different nodes, and every
+ * internal blocks exists.
+ */
+ public static void verifyLocatedStripedBlocks(LocatedBlocks lbs,
+ int groupSize) {
+ for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+ assert lb instanceof LocatedStripedBlock;
+ HashSet<DatanodeInfo> locs = new HashSet<>();
+ for (DatanodeInfo datanodeInfo : lb.getLocations()) {
+ locs.add(datanodeInfo);
+ }
+ assertEquals(groupSize, lb.getLocations().length);
+ assertEquals(groupSize, locs.size());
+
+ // verify that every internal blocks exists
+ int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices();
+ assertEquals(groupSize, blockIndices.length);
+ HashSet<Integer> found = new HashSet<>();
+ for (int index : blockIndices) {
+ assert index >=0;
+ found.add(index);
+ }
+ assertEquals(groupSize, found.size());
+ }
+ }
++
+ public static void waitForMetric(final JMXGet jmx, final String metricName, final int expectedValue)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ final int currentValue = Integer.parseInt(jmx.getValue(metricName));
+ LOG.info("Waiting for " + metricName +
+ " to reach value " + expectedValue +
+ ", current value = " + currentValue);
+ return currentValue == expectedValue;
+ } catch (Exception e) {
+ throw new UnhandledException("Test failed due to unexpected exception", e);
+ }
+ }
+ }, 1000, Integer.MAX_VALUE);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
index 50f98a3,0000000..c28bff8
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
@@@ -1,160 -1,0 +1,163 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
++import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks;
+
+public class TestWriteStripedFileWithFailure {
+ public static final Log LOG = LogFactory
+ .getLog(TestWriteStripedFileWithFailure.class);
+ private static MiniDFSCluster cluster;
+ private static FileSystem fs;
+ private static Configuration conf = new HdfsConfiguration();
+ private final int smallFileLength = blockSize * dataBlocks - 123;
+ private final int largeFileLength = blockSize * dataBlocks + 123;
+ private final int[] fileLengths = {smallFileLength, largeFileLength};
+
+ public void setup() throws IOException {
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+ cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);
+ fs = cluster.getFileSystem();
+ }
+
+ public void tearDown() throws IOException {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ // Test writing file with some Datanodes failure
++ // TODO: enable this test after HDFS-8704 and HDFS-9040
++ @Ignore
+ @Test(timeout = 300000)
+ public void testWriteStripedFileWithDNFailure() throws IOException {
+ for (int fileLength : fileLengths) {
+ for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) {
+ for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) {
+ try {
+ // setup a new cluster with no dead datanode
+ setup();
+ writeFileWithDNFailure(fileLength, dataDelNum, parityDelNum);
+ } catch (IOException ioe) {
+ String fileType = fileLength < (blockSize * dataBlocks) ?
+ "smallFile" : "largeFile";
+ LOG.error("Failed to write file with DN failure:"
+ + " fileType = "+ fileType
+ + ", dataDelNum = " + dataDelNum
+ + ", parityDelNum = " + parityDelNum);
+ throw ioe;
+ } finally {
+ // tear down the cluster
+ tearDown();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Test writing a file with shutting down some DNs(data DNs or parity DNs or both).
+ * @param fileLength file length
+ * @param dataDNFailureNum the shutdown number of data DNs
+ * @param parityDNFailureNum the shutdown number of parity DNs
+ * @throws IOException
+ */
+ private void writeFileWithDNFailure(int fileLength,
+ int dataDNFailureNum, int parityDNFailureNum) throws IOException {
+ String fileType = fileLength < (blockSize * dataBlocks) ?
+ "smallFile" : "largeFile";
+ String src = "/dnFailure_" + dataDNFailureNum + "_" + parityDNFailureNum
+ + "_" + fileType;
+ LOG.info("writeFileWithDNFailure: file = " + src
+ + ", fileType = " + fileType
+ + ", dataDNFailureNum = " + dataDNFailureNum
+ + ", parityDNFailureNum = " + parityDNFailureNum);
+
+ Path srcPath = new Path(src);
+ final AtomicInteger pos = new AtomicInteger();
+ final FSDataOutputStream out = fs.create(srcPath);
+ final DFSStripedOutputStream stripedOut
+ = (DFSStripedOutputStream)out.getWrappedStream();
+
+ int[] dataDNFailureIndices = StripedFileTestUtil.randomArray(0, dataBlocks,
+ dataDNFailureNum);
+ Assert.assertNotNull(dataDNFailureIndices);
+ int[] parityDNFailureIndices = StripedFileTestUtil.randomArray(dataBlocks,
+ dataBlocks + parityBlocks, parityDNFailureNum);
+ Assert.assertNotNull(parityDNFailureIndices);
+
+ int[] failedDataNodes = new int[dataDNFailureNum + parityDNFailureNum];
+ System.arraycopy(dataDNFailureIndices, 0, failedDataNodes,
+ 0, dataDNFailureIndices.length);
+ System.arraycopy(parityDNFailureIndices, 0, failedDataNodes,
+ dataDNFailureIndices.length, parityDNFailureIndices.length);
+
+ final int killPos = fileLength/2;
+ for (; pos.get() < fileLength; ) {
+ final int i = pos.getAndIncrement();
+ if (i == killPos) {
+ for(int failedDn : failedDataNodes) {
+ StripedFileTestUtil.killDatanode(cluster, stripedOut, failedDn, pos);
+ }
+ }
+ write(out, i);
+ }
+ out.close();
+
+ // make sure the expected number of Datanode have been killed
+ int dnFailureNum = dataDNFailureNum + parityDNFailureNum;
+ Assert.assertEquals(cluster.getDataNodes().size(), numDNs - dnFailureNum);
+
+ byte[] smallBuf = new byte[1024];
+ byte[] largeBuf = new byte[fileLength + 100];
+ final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
+ StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
+ StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
+ StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+ smallBuf);
+ StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf);
+
+ // delete the file
+ fs.delete(srcPath, true);
+ }
+
+ void write(FSDataOutputStream out, int i) throws IOException {
+ try {
+ out.write(StripedFileTestUtil.getByte(i));
+ } catch (IOException e) {
+ throw new IOException("Failed at i=" + i, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index 0a27614,851e5b9..00a4575
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@@ -516,16 -489,16 +516,16 @@@ public class TestPBHelper
@Test
public void testConvertLocatedBlock() {
LocatedBlock lb = createLocatedBlock();
- LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb);
- LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto);
- LocatedBlockProto lbProto = PBHelperClient.convert(lb);
- LocatedBlock lb2 = PBHelperClient.convert(lbProto);
++ LocatedBlockProto lbProto = PBHelperClient.convertLocatedBlock(lb);
++ LocatedBlock lb2 = PBHelperClient.convertLocatedBlockProto(lbProto);
compare(lb,lb2);
}
@Test
public void testConvertLocatedBlockNoStorageMedia() {
LocatedBlock lb = createLocatedBlockNoStorageMedia();
- LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb);
- LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto);
- LocatedBlockProto lbProto = PBHelperClient.convert(lb);
- LocatedBlock lb2 = PBHelperClient.convert(lbProto);
++ LocatedBlockProto lbProto = PBHelperClient.convertLocatedBlock(lb);
++ LocatedBlock lb2 = PBHelperClient.convertLocatedBlockProto(lbProto);
compare(lb,lb2);
}
@@@ -535,8 -508,8 +535,8 @@@
for (int i=0;i<3;i++) {
lbl.add(createLocatedBlock());
}
- List<LocatedBlockProto> lbpl = PBHelper.convertLocatedBlocks2(lbl);
- List<LocatedBlock> lbl2 = PBHelper.convertLocatedBlocks(lbpl);
- List<LocatedBlockProto> lbpl = PBHelperClient.convertLocatedBlock2(lbl);
- List<LocatedBlock> lbl2 = PBHelperClient.convertLocatedBlock(lbpl);
++ List<LocatedBlockProto> lbpl = PBHelperClient.convertLocatedBlocks2(lbl);
++ List<LocatedBlock> lbl2 = PBHelperClient.convertLocatedBlocks(lbpl);
assertEquals(lbl.size(), lbl2.size());
for (int i=0;i<lbl.size();i++) {
compare(lbl.get(i), lbl2.get(2));
@@@ -549,8 -522,8 +549,8 @@@
for (int i=0;i<3;i++) {
lbl[i] = createLocatedBlock();
}
- LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlocks(lbl);
- LocatedBlock [] lbl2 = PBHelper.convertLocatedBlocks(lbpl);
- LocatedBlockProto [] lbpl = PBHelperClient.convertLocatedBlock(lbl);
- LocatedBlock [] lbl2 = PBHelperClient.convertLocatedBlock(lbpl);
++ LocatedBlockProto [] lbpl = PBHelperClient.convertLocatedBlocks(lbl);
++ LocatedBlock [] lbl2 = PBHelperClient.convertLocatedBlocks(lbpl);
assertEquals(lbl.length, lbl2.length);
for (int i=0;i<lbl.length;i++) {
compare(lbl[i], lbl2[i]);
@@@ -664,99 -637,6 +664,99 @@@
.setType(AclEntryType.OTHER).build();
AclStatus s = new AclStatus.Builder().owner("foo").group("bar").addEntry(e)
.build();
- Assert.assertEquals(s, PBHelper.convert(PBHelper.convert(s)));
+ Assert.assertEquals(s, PBHelperClient.convert(PBHelperClient.convert(s)));
}
+
+ @Test
+ public void testBlockECRecoveryCommand() {
+ DatanodeInfo[] dnInfos0 = new DatanodeInfo[] {
+ DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() };
+ DatanodeStorageInfo targetDnInfos_0 = BlockManagerTestUtil
+ .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
+ new DatanodeStorage("s00"));
+ DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil
+ .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
+ new DatanodeStorage("s01"));
+ DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] {
+ targetDnInfos_0, targetDnInfos_1 };
+ short[] liveBlkIndices0 = new short[2];
+ BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo(
+ new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0,
+ liveBlkIndices0, ErasureCodingPolicyManager.getSystemDefaultPolicy());
+ DatanodeInfo[] dnInfos1 = new DatanodeInfo[] {
+ DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() };
+ DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil
+ .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
+ new DatanodeStorage("s02"));
+ DatanodeStorageInfo targetDnInfos_3 = BlockManagerTestUtil
+ .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
+ new DatanodeStorage("s03"));
+ DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] {
+ targetDnInfos_2, targetDnInfos_3 };
+ short[] liveBlkIndices1 = new short[2];
+ BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo(
+ new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1,
+ liveBlkIndices1, ErasureCodingPolicyManager.getSystemDefaultPolicy());
+ List<BlockECRecoveryInfo> blkRecoveryInfosList = new ArrayList<BlockECRecoveryInfo>();
+ blkRecoveryInfosList.add(blkECRecoveryInfo0);
+ blkRecoveryInfosList.add(blkECRecoveryInfo1);
+ BlockECRecoveryCommand blkECRecoveryCmd = new BlockECRecoveryCommand(
+ DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, blkRecoveryInfosList);
+ BlockECRecoveryCommandProto blkECRecoveryCmdProto = PBHelper
+ .convert(blkECRecoveryCmd);
+ blkECRecoveryCmd = PBHelper.convert(blkECRecoveryCmdProto);
+ Iterator<BlockECRecoveryInfo> iterator = blkECRecoveryCmd.getECTasks()
+ .iterator();
+ assertBlockECRecoveryInfoEquals(blkECRecoveryInfo0, iterator.next());
+ assertBlockECRecoveryInfoEquals(blkECRecoveryInfo1, iterator.next());
+ }
+
+ private void assertBlockECRecoveryInfoEquals(
+ BlockECRecoveryInfo blkECRecoveryInfo1,
+ BlockECRecoveryInfo blkECRecoveryInfo2) {
+ assertEquals(blkECRecoveryInfo1.getExtendedBlock(),
+ blkECRecoveryInfo2.getExtendedBlock());
+
+ DatanodeInfo[] sourceDnInfos1 = blkECRecoveryInfo1.getSourceDnInfos();
+ DatanodeInfo[] sourceDnInfos2 = blkECRecoveryInfo2.getSourceDnInfos();
+ assertDnInfosEqual(sourceDnInfos1, sourceDnInfos2);
+
+ DatanodeInfo[] targetDnInfos1 = blkECRecoveryInfo1.getTargetDnInfos();
+ DatanodeInfo[] targetDnInfos2 = blkECRecoveryInfo2.getTargetDnInfos();
+ assertDnInfosEqual(targetDnInfos1, targetDnInfos2);
+
+ String[] targetStorageIDs1 = blkECRecoveryInfo1.getTargetStorageIDs();
+ String[] targetStorageIDs2 = blkECRecoveryInfo2.getTargetStorageIDs();
+ assertEquals(targetStorageIDs1.length, targetStorageIDs2.length);
+ for (int i = 0; i < targetStorageIDs1.length; i++) {
+ assertEquals(targetStorageIDs1[i], targetStorageIDs2[i]);
+ }
+
+ short[] liveBlockIndices1 = blkECRecoveryInfo1.getLiveBlockIndices();
+ short[] liveBlockIndices2 = blkECRecoveryInfo2.getLiveBlockIndices();
+ for (int i = 0; i < liveBlockIndices1.length; i++) {
+ assertEquals(liveBlockIndices1[i], liveBlockIndices2[i]);
+ }
+
+ ErasureCodingPolicy ecPolicy1 = blkECRecoveryInfo1.getErasureCodingPolicy();
+ ErasureCodingPolicy ecPolicy2 = blkECRecoveryInfo2.getErasureCodingPolicy();
+ // Compare ECPolicies same as default ECPolicy as we used system default
+ // ECPolicy used in this test
+ compareECPolicies(ErasureCodingPolicyManager.getSystemDefaultPolicy(), ecPolicy1);
+ compareECPolicies(ErasureCodingPolicyManager.getSystemDefaultPolicy(), ecPolicy2);
+ }
+
+ private void compareECPolicies(ErasureCodingPolicy ecPolicy1, ErasureCodingPolicy ecPolicy2) {
+ assertEquals(ecPolicy1.getName(), ecPolicy2.getName());
+ assertEquals(ecPolicy1.getNumDataUnits(), ecPolicy2.getNumDataUnits());
+ assertEquals(ecPolicy1.getNumParityUnits(), ecPolicy2.getNumParityUnits());
+ }
+
+ private void assertDnInfosEqual(DatanodeInfo[] dnInfos1,
+ DatanodeInfo[] dnInfos2) {
+ assertEquals(dnInfos1.length, dnInfos2.length);
+ for (int i = 0; i < dnInfos1.length; i++) {
+ compare(dnInfos1[i], dnInfos2[i]);
+ }
+ }
}