You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/30 20:23:04 UTC
[30/52] [abbrv] hadoop git commit: Merge remote-tracking branch
'apache/trunk' into HDFS-7285
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index dabae2c,0000000..7c64b37
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@@ -1,1014 -1,0 +1,1016 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSPacket;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.RemoteBlockReader2;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode;
+
+/**
+ * ErasureCodingWorker handles the erasure coding recovery work commands. These
+ * commands would be issued from Namenode as part of Datanode's heart beat
+ * response. BPOfferService delegates the work to this class for handling EC
+ * commands.
+ */
+public final class ErasureCodingWorker {
+ private static final Log LOG = DataNode.LOG;
+
+ private final DataNode datanode;
+ private final Configuration conf;
+
+ private ThreadPoolExecutor STRIPED_BLK_RECOVERY_THREAD_POOL;
+ private ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
+ private final int STRIPED_READ_TIMEOUT_MILLIS;
+ private final int STRIPED_READ_BUFFER_SIZE;
+
+ public ErasureCodingWorker(Configuration conf, DataNode datanode) {
+ this.datanode = datanode;
+ this.conf = conf;
+
+ STRIPED_READ_TIMEOUT_MILLIS = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_KEY,
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
+ initializeStripedReadThreadPool(conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY,
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT));
+ STRIPED_READ_BUFFER_SIZE = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
+ DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
+
+ initializeStripedBlkRecoveryThreadPool(conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY,
+ DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT));
+ }
+
+ private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
+ return CodecUtil.createRSRawDecoder(conf, numDataUnits, numParityUnits);
+ }
+
+ private void initializeStripedReadThreadPool(int num) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using striped reads; pool threads=" + num);
+ }
+ STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new Daemon.DaemonFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = super.newThread(r);
+ t.setName("stripedRead-" + threadIndex.getAndIncrement());
+ return t;
+ }
+ }, new ThreadPoolExecutor.CallerRunsPolicy() {
+ @Override
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
+ LOG.info("Execution for striped reading rejected, "
+ + "Executing in current thread");
+ // will run in the current thread
+ super.rejectedExecution(runnable, e);
+ }
+ });
+ STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
+ }
+
+ private void initializeStripedBlkRecoveryThreadPool(int num) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using striped block recovery; pool threads=" + num);
+ }
+ STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+ new Daemon.DaemonFactory() {
+ private final AtomicInteger threadIdx = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = super.newThread(r);
+ t.setName("stripedBlockRecovery-" + threadIdx.getAndIncrement());
+ return t;
+ }
+ });
+ STRIPED_BLK_RECOVERY_THREAD_POOL.allowCoreThreadTimeOut(true);
+ }
+
+ /**
+ * Handles the Erasure Coding recovery work commands.
+ *
+ * @param ecTasks
+ * BlockECRecoveryInfo
+ */
+ public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
+ for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
+ try {
+ STRIPED_BLK_RECOVERY_THREAD_POOL
+ .submit(new ReconstructAndTransferBlock(recoveryInfo));
+ } catch (Throwable e) {
+ LOG.warn("Failed to recover striped block "
+ + recoveryInfo.getExtendedBlock().getLocalBlock(), e);
+ }
+ }
+ }
+
+ /**
+ * ReconstructAndTransferBlock recover one or more missed striped block in the
+ * striped block group, the minimum number of live striped blocks should be
+ * no less than data block number.
+ *
+ * | <- Striped Block Group -> |
+ * blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group
+ * | | | |
+ * v v v v
+ * +------+ +------+ +------+ +------+
+ * |cell_0| |cell_1| |cell_2| |cell_3| ...
+ * +------+ +------+ +------+ +------+
+ * |cell_4| |cell_5| |cell_6| |cell_7| ...
+ * +------+ +------+ +------+ +------+
+ * |cell_8| |cell_9| |cell10| |cell11| ...
+ * +------+ +------+ +------+ +------+
+ * ... ... ... ...
+ *
+ *
+ * We use following steps to recover striped block group, in each round, we
+ * recover <code>bufferSize</code> data until finish, the
+ * <code>bufferSize</code> is configurable and may be less or larger than
+ * cell size:
+ * step1: read <code>bufferSize</code> data from minimum number of sources
+ * required by recovery.
+ * step2: decode data for targets.
+ * step3: transfer data to targets.
+ *
+ * In step1, try to read <code>bufferSize</code> data from minimum number
+ * of sources , if there is corrupt or stale sources, read from new source
+ * will be scheduled. The best sources are remembered for next round and
+ * may be updated in each round.
+ *
+ * In step2, typically if source blocks we read are all data blocks, we
+ * need to call encode, and if there is one parity block, we need to call
+ * decode. Notice we only read once and recover all missed striped block
+ * if they are more than one.
+ *
+ * In step3, send the recovered data to targets by constructing packet
+ * and send them directly. Same as continuous block replication, we
+ * don't check the packet ack. Since the datanode doing the recovery work
+ * are one of the source datanodes, so the recovered data are sent
+ * remotely.
+ *
+ * There are some points we can do further improvements in next phase:
+ * 1. we can read the block file directly on the local datanode,
+ * currently we use remote block reader. (Notice short-circuit is not
+ * a good choice, see inline comments).
+ * 2. We need to check the packet ack for EC recovery? Since EC recovery
+ * is more expensive than continuous block replication, it needs to
+ * read from several other datanodes, should we make sure the
+ * recovered result received by targets?
+ */
+ private class ReconstructAndTransferBlock implements Runnable {
+ private final int dataBlkNum;
+ private final int parityBlkNum;
+ private final int cellSize;
+
+ private RawErasureDecoder decoder;
+
+ // Striped read buffer size
+ private int bufferSize;
+
+ private final ExtendedBlock blockGroup;
+ private final int minRequiredSources;
+ // position in striped internal block
+ private long positionInBlock;
+
+ // sources
+ private final short[] liveIndices;
+ private final DatanodeInfo[] sources;
+
+ private final List<StripedReader> stripedReaders;
+
+ // The buffers and indices for striped blocks whose length is 0
+ private ByteBuffer[] zeroStripeBuffers;
+ private short[] zeroStripeIndices;
+
+ // targets
+ private final DatanodeInfo[] targets;
+ private final StorageType[] targetStorageTypes;
+
+ private final short[] targetIndices;
+ private final ByteBuffer[] targetBuffers;
+
+ private final Socket[] targetSockets;
+ private final DataOutputStream[] targetOutputStreams;
+ private final DataInputStream[] targetInputStreams;
+
+ private final long[] blockOffset4Targets;
+ private final long[] seqNo4Targets;
+
+ private final static int WRITE_PACKET_SIZE = 64 * 1024;
+ private DataChecksum checksum;
+ private int maxChunksPerPacket;
+ private byte[] packetBuf;
+ private byte[] checksumBuf;
+ private int bytesPerChecksum;
+ private int checksumSize;
+
+ private final CachingStrategy cachingStrategy;
+
+ private final Map<Future<Void>, Integer> futures = new HashMap<>();
+ private final CompletionService<Void> readService =
+ new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL);
+
+ ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) {
+ ErasureCodingPolicy ecPolicy = recoveryInfo.getErasureCodingPolicy();
+ dataBlkNum = ecPolicy.getNumDataUnits();
+ parityBlkNum = ecPolicy.getNumParityUnits();
+ cellSize = ecPolicy.getCellSize();
+
+ blockGroup = recoveryInfo.getExtendedBlock();
+ final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 1);
+ minRequiredSources = Math.min(cellsNum, dataBlkNum);
+
+ liveIndices = recoveryInfo.getLiveBlockIndices();
+ sources = recoveryInfo.getSourceDnInfos();
+ stripedReaders = new ArrayList<>(sources.length);
+
+ Preconditions.checkArgument(liveIndices.length >= minRequiredSources,
+ "No enough live striped blocks.");
+ Preconditions.checkArgument(liveIndices.length == sources.length,
+ "liveBlockIndices and source dns should match");
+
+ if (minRequiredSources < dataBlkNum) {
+ zeroStripeBuffers =
+ new ByteBuffer[dataBlkNum - minRequiredSources];
+ zeroStripeIndices = new short[dataBlkNum - minRequiredSources];
+ }
+
+ targets = recoveryInfo.getTargetDnInfos();
+ targetStorageTypes = recoveryInfo.getTargetStorageTypes();
+ targetIndices = new short[targets.length];
+ targetBuffers = new ByteBuffer[targets.length];
+
+ Preconditions.checkArgument(targetIndices.length <= parityBlkNum,
+ "Too much missed striped blocks.");
+
+ targetSockets = new Socket[targets.length];
+ targetOutputStreams = new DataOutputStream[targets.length];
+ targetInputStreams = new DataInputStream[targets.length];
+
+ blockOffset4Targets = new long[targets.length];
+ seqNo4Targets = new long[targets.length];
+
+ for (int i = 0; i < targets.length; i++) {
+ blockOffset4Targets[i] = 0;
+ seqNo4Targets[i] = 0;
+ }
+
+ getTargetIndices();
+ cachingStrategy = CachingStrategy.newDefaultStrategy();
+ }
+
+ private ByteBuffer allocateBuffer(int length) {
+ return ByteBuffer.allocate(length);
+ }
+
+ private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) {
+ return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize,
+ dataBlkNum, i);
+ }
+
+ private long getBlockLen(ExtendedBlock blockGroup, int i) {
+ return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(),
+ cellSize, dataBlkNum, i);
+ }
+
+ /**
+ * StripedReader is used to read from one source DN, it contains a block
+ * reader, buffer and striped block index.
+ * Only allocate StripedReader once for one source, and the StripedReader
+ * has the same array order with sources. Typically we only need to allocate
+ * minimum number (minRequiredSources) of StripedReader, and allocate
+ * new for new source DN if some existing DN invalid or slow.
+ * If some source DN is corrupt, set the corresponding blockReader to
+ * null and will never read from it again.
+ *
+ * @param i the array index of sources
+ * @param offsetInBlock offset for the internal block
+ * @return StripedReader
+ */
+ private StripedReader addStripedReader(int i, long offsetInBlock) {
+ StripedReader reader = new StripedReader(liveIndices[i]);
+ stripedReaders.add(reader);
+
+ BlockReader blockReader = newBlockReader(
+ getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]);
+ if (blockReader != null) {
+ initChecksumAndBufferSizeIfNeeded(blockReader);
+ reader.blockReader = blockReader;
+ }
+ reader.buffer = allocateBuffer(bufferSize);
+ return reader;
+ }
+
+ @Override
+ public void run() {
+ datanode.incrementXmitsInProgress();
+ try {
+ // Store the array indices of source DNs we have read successfully.
+ // In each iteration of read, the success list may be updated if
+ // some source DN is corrupted or slow. And use the updated success
+ // list of DNs for next iteration read.
+ int[] success = new int[minRequiredSources];
+
+ int nsuccess = 0;
+ for (int i = 0;
+ i < sources.length && nsuccess < minRequiredSources; i++) {
+ StripedReader reader = addStripedReader(i, 0);
+ if (reader.blockReader != null) {
+ success[nsuccess++] = i;
+ }
+ }
+
+ if (nsuccess < minRequiredSources) {
+ String error = "Can't find minimum sources required by "
+ + "recovery, block id: " + blockGroup.getBlockId();
+ throw new IOException(error);
+ }
+
+ if (zeroStripeBuffers != null) {
+ for (int i = 0; i < zeroStripeBuffers.length; i++) {
+ zeroStripeBuffers[i] = allocateBuffer(bufferSize);
+ }
+ }
+
+ for (int i = 0; i < targets.length; i++) {
+ targetBuffers[i] = allocateBuffer(bufferSize);
+ }
+
+ checksumSize = checksum.getChecksumSize();
+ int chunkSize = bytesPerChecksum + checksumSize;
+ maxChunksPerPacket = Math.max(
+ (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN)/chunkSize, 1);
+ int maxPacketSize = chunkSize * maxChunksPerPacket
+ + PacketHeader.PKT_MAX_HEADER_LEN;
+
+ packetBuf = new byte[maxPacketSize];
+ checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)];
+
+ // targetsStatus store whether some target is success, it will record
+ // any failed target once, if some target failed (invalid DN or transfer
+ // failed), will not transfer data to it any more.
+ boolean[] targetsStatus = new boolean[targets.length];
+ if (initTargetStreams(targetsStatus) == 0) {
+ String error = "All targets are failed.";
+ throw new IOException(error);
+ }
+
+ long firstStripedBlockLength = getBlockLen(blockGroup, 0);
+ while (positionInBlock < firstStripedBlockLength) {
+ int toRead = Math.min(
+ bufferSize, (int)(firstStripedBlockLength - positionInBlock));
+ // step1: read from minimum source DNs required for reconstruction.
+ // The returned success list is the source DNs we do real read from
+ success = readMinimumStripedData4Recovery(success);
+
+ // step2: decode to reconstruct targets
+ long remaining = firstStripedBlockLength - positionInBlock;
+ int toRecoverLen = remaining < bufferSize ?
+ (int)remaining : bufferSize;
+ recoverTargets(success, targetsStatus, toRecoverLen);
+
+ // step3: transfer data
+ if (transferData2Targets(targetsStatus) == 0) {
+ String error = "Transfer failed for all targets.";
+ throw new IOException(error);
+ }
+
+ clearBuffers();
+ positionInBlock += toRead;
+ }
+
+ endTargetBlocks(targetsStatus);
+
+ // Currently we don't check the acks for packets, this is similar as
+ // block replication.
+ } catch (Throwable e) {
+ LOG.warn("Failed to recover striped block: " + blockGroup, e);
+ } finally {
+ datanode.decrementXmitsInProgress();
+ // close block readers
+ for (StripedReader stripedReader : stripedReaders) {
+ closeBlockReader(stripedReader.blockReader);
+ }
+ for (int i = 0; i < targets.length; i++) {
+ IOUtils.closeStream(targetOutputStreams[i]);
+ IOUtils.closeStream(targetInputStreams[i]);
+ IOUtils.closeStream(targetSockets[i]);
+ }
+ }
+ }
+
+ // init checksum from block reader
+ private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) {
+ if (checksum == null) {
+ checksum = blockReader.getDataChecksum();
+ bytesPerChecksum = checksum.getBytesPerChecksum();
+ // The bufferSize is flat to divide bytesPerChecksum
+ int readBufferSize = STRIPED_READ_BUFFER_SIZE;
+ bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum :
+ readBufferSize - readBufferSize % bytesPerChecksum;
+ } else {
+ assert blockReader.getDataChecksum().equals(checksum);
+ }
+ }
+
+ private void getTargetIndices() {
+ BitSet bitset = new BitSet(dataBlkNum + parityBlkNum);
+ for (int i = 0; i < sources.length; i++) {
+ bitset.set(liveIndices[i]);
+ }
+ int m = 0;
+ int k = 0;
+ for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+ if (!bitset.get(i)) {
+ if (getBlockLen(blockGroup, i) > 0) {
+ if (m < targets.length) {
+ targetIndices[m++] = (short)i;
+ }
+ } else {
+ zeroStripeIndices[k++] = (short)i;
+ }
+ }
+ }
+ }
+
+ private long getReadLength(int index) {
+ long blockLen = getBlockLen(blockGroup, index);
+ long remaining = blockLen - positionInBlock;
+ return remaining > bufferSize ? bufferSize : remaining;
+ }
+
+ /**
+ * Read from minimum source DNs required for reconstruction in the iteration.
+ * First try the success list which we think they are the best DNs
+ * If source DN is corrupt or slow, try to read some other source DN,
+ * and will update the success list.
+ *
+ * Remember the updated success list and return it for following
+ * operations and next iteration read.
+ *
+ * @param success the initial success list of source DNs we think best
+ * @return updated success list of source DNs we do real read
+ * @throws IOException
+ */
+ private int[] readMinimumStripedData4Recovery(final int[] success)
+ throws IOException {
+ int nsuccess = 0;
+ int[] newSuccess = new int[minRequiredSources];
+ BitSet used = new BitSet(sources.length);
+ /*
+ * Read from minimum source DNs required, the success list contains
+ * source DNs which we think best.
+ */
+ for (int i = 0; i < minRequiredSources; i++) {
+ StripedReader reader = stripedReaders.get(success[i]);
+ if (getReadLength(liveIndices[success[i]]) > 0) {
+ Callable<Void> readCallable = readFromBlock(
+ reader.blockReader, reader.buffer);
+ Future<Void> f = readService.submit(readCallable);
+ futures.put(f, success[i]);
+ } else {
+ // If the read length is 0, we don't need to do real read
+ reader.buffer.position(0);
+ newSuccess[nsuccess++] = success[i];
+ }
+ used.set(success[i]);
+ }
+
+ while (!futures.isEmpty()) {
+ try {
+ StripingChunkReadResult result =
+ StripedBlockUtil.getNextCompletedStripedRead(
+ readService, futures, STRIPED_READ_TIMEOUT_MILLIS);
+ int resultIndex = -1;
+ if (result.state == StripingChunkReadResult.SUCCESSFUL) {
+ resultIndex = result.index;
+ } else if (result.state == StripingChunkReadResult.FAILED) {
+ // If read failed for some source DN, we should not use it anymore
+ // and schedule read from another source DN.
+ StripedReader failedReader = stripedReaders.get(result.index);
+ closeBlockReader(failedReader.blockReader);
+ failedReader.blockReader = null;
+ resultIndex = scheduleNewRead(used);
+ } else if (result.state == StripingChunkReadResult.TIMEOUT) {
+ // If timeout, we also schedule a new read.
+ resultIndex = scheduleNewRead(used);
+ }
+ if (resultIndex >= 0) {
+ newSuccess[nsuccess++] = resultIndex;
+ if (nsuccess >= minRequiredSources) {
+ // cancel remaining reads if we read successfully from minimum
+ // number of source DNs required by reconstruction.
+ cancelReads(futures.keySet());
+ futures.clear();
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Read data interrupted.", e);
+ break;
+ }
+ }
+
+ if (nsuccess < minRequiredSources) {
+ String error = "Can't read data from minimum number of sources "
+ + "required by reconstruction, block id: " + blockGroup.getBlockId();
+ throw new IOException(error);
+ }
+
+ return newSuccess;
+ }
+
+ private void paddingBufferToLen(ByteBuffer buffer, int len) {
+ int toPadding = len - buffer.position();
+ for (int i = 0; i < toPadding; i++) {
+ buffer.put((byte) 0);
+ }
+ }
+
+ // Initialize decoder
+ private void initDecoderIfNecessary() {
+ if (decoder == null) {
+ decoder = newDecoder(dataBlkNum, parityBlkNum);
+ }
+ }
+
+ private int[] getErasedIndices(boolean[] targetsStatus) {
+ int[] result = new int[targets.length];
+ int m = 0;
+ for (int i = 0; i < targets.length; i++) {
+ if (targetsStatus[i]) {
+ result[m++] = convertIndex4Decode(targetIndices[i],
+ dataBlkNum, parityBlkNum);
+ }
+ }
+ return Arrays.copyOf(result, m);
+ }
+
+ private void recoverTargets(int[] success, boolean[] targetsStatus,
+ int toRecoverLen) {
+ initDecoderIfNecessary();
+ ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum];
+ for (int i = 0; i < success.length; i++) {
+ StripedReader reader = stripedReaders.get(success[i]);
+ ByteBuffer buffer = reader.buffer;
+ paddingBufferToLen(buffer, toRecoverLen);
+ inputs[convertIndex4Decode(reader.index, dataBlkNum, parityBlkNum)] =
+ (ByteBuffer)buffer.flip();
+ }
+ if (success.length < dataBlkNum) {
+ for (int i = 0; i < zeroStripeBuffers.length; i++) {
+ ByteBuffer buffer = zeroStripeBuffers[i];
+ paddingBufferToLen(buffer, toRecoverLen);
+ int index = convertIndex4Decode(zeroStripeIndices[i], dataBlkNum,
+ parityBlkNum);
+ inputs[index] = (ByteBuffer)buffer.flip();
+ }
+ }
+ int[] erasedIndices = getErasedIndices(targetsStatus);
+ ByteBuffer[] outputs = new ByteBuffer[erasedIndices.length];
+ int m = 0;
+ for (int i = 0; i < targetBuffers.length; i++) {
+ if (targetsStatus[i]) {
+ outputs[m++] = targetBuffers[i];
+ outputs[i].limit(toRecoverLen);
+ }
+ }
+ decoder.decode(inputs, erasedIndices, outputs);
+
+ for (int i = 0; i < targets.length; i++) {
+ if (targetsStatus[i]) {
+ long blockLen = getBlockLen(blockGroup, targetIndices[i]);
+ long remaining = blockLen - positionInBlock;
+ if (remaining < 0) {
+ targetBuffers[i].limit(0);
+ } else if (remaining < toRecoverLen) {
+ targetBuffers[i].limit((int)remaining);
+ }
+ }
+ }
+ }
+
+ /**
+ * Schedule a read from some new source DN if some DN is corrupted
+ * or slow, this is called from the read iteration.
+ * Initially we may only have <code>minRequiredSources</code> number of
+ * StripedReader.
+ * If the position is at the end of target block, don't need to do
+ * real read, and return the array index of source DN, otherwise -1.
+ *
+ * @param used the used source DNs in this iteration.
+ * @return the array index of source DN if don't need to do real read.
+ */
+ private int scheduleNewRead(BitSet used) {
+ StripedReader reader = null;
+ // step1: initially we may only have <code>minRequiredSources</code>
+ // number of StripedReader, and there may be some source DNs we never
+ // read before, so will try to create StripedReader for one new source DN
+ // and try to read from it. If found, go to step 3.
+ int m = stripedReaders.size();
+ while (reader == null && m < sources.length) {
+ reader = addStripedReader(m, positionInBlock);
+ if (getReadLength(liveIndices[m]) > 0) {
+ if (reader.blockReader == null) {
+ reader = null;
+ m++;
+ }
+ } else {
+ used.set(m);
+ return m;
+ }
+ }
+
+ // step2: if there is no new source DN we can use, try to find a source
+ // DN we ever read from but because some reason, e.g., slow, it
+ // is not in the success DN list at the begin of this iteration, so
+ // we have not tried it in this iteration. Now we have a chance to
+ // revisit it again.
+ for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
+ if (!used.get(i)) {
+ StripedReader r = stripedReaders.get(i);
+ if (getReadLength(liveIndices[i]) > 0) {
+ closeBlockReader(r.blockReader);
+ r.blockReader = newBlockReader(
+ getBlock(blockGroup, liveIndices[i]), positionInBlock,
+ sources[i]);
+ if (r.blockReader != null) {
+ m = i;
+ reader = r;
+ }
+ } else {
+ used.set(i);
+ r.buffer.position(0);
+ return i;
+ }
+ }
+ }
+
+ // step3: schedule if find a correct source DN and need to do real read.
+ if (reader != null) {
+ Callable<Void> readCallable = readFromBlock(
+ reader.blockReader, reader.buffer);
+ Future<Void> f = readService.submit(readCallable);
+ futures.put(f, m);
+ used.set(m);
+ }
+
+ return -1;
+ }
+
+ // cancel all reads.
+ private void cancelReads(Collection<Future<Void>> futures) {
+ for (Future<Void> future : futures) {
+ future.cancel(true);
+ }
+ }
+
+ private Callable<Void> readFromBlock(final BlockReader reader,
+ final ByteBuffer buf) {
+ return new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ actualReadFromBlock(reader, buf);
+ return null;
+ } catch (IOException e) {
+ LOG.info(e.getMessage());
+ throw e;
+ }
+ }
+
+ };
+ }
+
+ /**
+ * Read bytes from block
+ */
+ private void actualReadFromBlock(BlockReader reader, ByteBuffer buf)
+ throws IOException {
+ int len = buf.remaining();
+ int n = 0;
+ while (n < len) {
+ int nread = reader.read(buf);
+ if (nread <= 0) {
+ break;
+ }
+ n += nread;
+ }
+ }
+
+ // close block reader
+ private void closeBlockReader(BlockReader blockReader) {
+ try {
+ if (blockReader != null) {
+ blockReader.close();
+ }
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+ private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
+ return NetUtils.createSocketAddr(dnInfo.getXferAddr(
+ datanode.getDnConf().getConnectToDnViaHostname()));
+ }
+
+ private BlockReader newBlockReader(final ExtendedBlock block,
+ long offsetInBlock, DatanodeInfo dnInfo) {
+ if (offsetInBlock >= block.getNumBytes()) {
+ return null;
+ }
+ try {
+ InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo);
+ Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
+ block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ));
+ /*
+ * This can be further improved if the replica is local, then we can
+ * read directly from DN and need to check the replica is FINALIZED
+ * state, notice we should not use short-circuit local read which
+ * requires config for domain-socket in UNIX or legacy config in Windows.
++ *
++ * TODO: add proper tracer
+ */
+ return RemoteBlockReader2.newBlockReader(
+ "dummy", block, blockToken, offsetInBlock,
+ block.getNumBytes() - offsetInBlock, true,
+ "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo,
- null, cachingStrategy);
++ null, cachingStrategy, null);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+ throws IOException {
+ Peer peer = null;
+ boolean success = false;
+ Socket sock = null;
+ final int socketTimeout = datanode.getDnConf().getSocketTimeout();
+ try {
+ sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+ NetUtils.connect(sock, addr, socketTimeout);
+ peer = DFSUtilClient.peerFromSocketAndKey(datanode.getSaslClient(),
+ sock, datanode.getDataEncryptionKeyFactoryForBlock(b),
+ blockToken, datanodeId);
+ peer.setReadTimeout(socketTimeout);
+ success = true;
+ return peer;
+ } finally {
+ if (!success) {
+ IOUtils.cleanup(LOG, peer);
+ IOUtils.closeSocket(sock);
+ }
+ }
+ }
+
+ /**
+ * Send data to targets
+ */
+ private int transferData2Targets(boolean[] targetsStatus) {
+ int nsuccess = 0;
+ for (int i = 0; i < targets.length; i++) {
+ if (targetsStatus[i]) {
+ boolean success = false;
+ try {
+ ByteBuffer buffer = targetBuffers[i];
+
+ if (buffer.remaining() == 0) {
+ continue;
+ }
+
+ checksum.calculateChunkedSums(
+ buffer.array(), 0, buffer.remaining(), checksumBuf, 0);
+
+ int ckOff = 0;
+ while (buffer.remaining() > 0) {
+ DFSPacket packet = new DFSPacket(packetBuf, maxChunksPerPacket,
+ blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, false);
+ int maxBytesToPacket = maxChunksPerPacket * bytesPerChecksum;
+ int toWrite = buffer.remaining() > maxBytesToPacket ?
+ maxBytesToPacket : buffer.remaining();
+ int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * checksumSize;
+ packet.writeChecksum(checksumBuf, ckOff, ckLen);
+ ckOff += ckLen;
+ packet.writeData(buffer, toWrite);
+
+ // Send packet
+ packet.writeTo(targetOutputStreams[i]);
+
+ blockOffset4Targets[i] += toWrite;
+ nsuccess++;
+ success = true;
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ }
+ targetsStatus[i] = success;
+ }
+ }
+ return nsuccess;
+ }
+
+ /**
+ * clear all buffers
+ */
+ private void clearBuffers() {
+ for (StripedReader stripedReader : stripedReaders) {
+ if (stripedReader.buffer != null) {
+ stripedReader.buffer.clear();
+ }
+ }
+
+ if (zeroStripeBuffers != null) {
+ for (int i = 0; i < zeroStripeBuffers.length; i++) {
+ zeroStripeBuffers[i].clear();
+ }
+ }
+
+ for (int i = 0; i < targetBuffers.length; i++) {
+ if (targetBuffers[i] != null) {
+ cleanBuffer(targetBuffers[i]);
+ }
+ }
+ }
+
+ private ByteBuffer cleanBuffer(ByteBuffer buffer) {
+ Arrays.fill(buffer.array(), (byte) 0);
+ return (ByteBuffer)buffer.clear();
+ }
+
+ // send an empty packet to mark the end of the block
+ private void endTargetBlocks(boolean[] targetsStatus) {
+ for (int i = 0; i < targets.length; i++) {
+ if (targetsStatus[i]) {
+ try {
+ DFSPacket packet = new DFSPacket(packetBuf, 0,
+ blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, true);
+ packet.writeTo(targetOutputStreams[i]);
+ targetOutputStreams[i].flush();
+ } catch (IOException e) {
+ LOG.warn(e.getMessage());
+ }
+ }
+ }
+ }
+
+ /**
+ * Initialize output/input streams for transferring data to target
+ * and send create block request.
+ */
+ private int initTargetStreams(boolean[] targetsStatus) {
+ int nsuccess = 0;
+ for (int i = 0; i < targets.length; i++) {
+ Socket socket = null;
+ DataOutputStream out = null;
+ DataInputStream in = null;
+ boolean success = false;
+ try {
+ InetSocketAddress targetAddr =
+ getSocketAddress4Transfer(targets[i]);
+ socket = datanode.newSocket();
+ NetUtils.connect(socket, targetAddr,
+ datanode.getDnConf().getSocketTimeout());
+ socket.setSoTimeout(datanode.getDnConf().getSocketTimeout());
+
+ ExtendedBlock block = getBlock(blockGroup, targetIndices[i]);
+ Token<BlockTokenIdentifier> blockToken =
+ datanode.getBlockAccessToken(block,
+ EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
+
+ long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
+ OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout);
+ InputStream unbufIn = NetUtils.getInputStream(socket);
+ DataEncryptionKeyFactory keyFactory =
+ datanode.getDataEncryptionKeyFactoryForBlock(block);
+ IOStreamPair saslStreams = datanode.getSaslClient().socketSend(
+ socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]);
+
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
+
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- DFSUtil.getSmallBufferSize(conf)));
++ DFSUtilClient.getSmallBufferSize(conf)));
+ in = new DataInputStream(unbufIn);
+
+ DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId());
+ new Sender(out).writeBlock(block, targetStorageTypes[i],
+ blockToken, "", new DatanodeInfo[]{targets[i]},
+ new StorageType[]{targetStorageTypes[i]}, source,
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0,
+ checksum, cachingStrategy, false, false, null);
+
+ targetSockets[i] = socket;
+ targetOutputStreams[i] = out;
+ targetInputStreams[i] = in;
+ nsuccess++;
+ success = true;
+ } catch (Throwable e) {
+ LOG.warn(e.getMessage());
+ } finally {
+ if (!success) {
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(socket);
+ }
+ }
+ targetsStatus[i] = success;
+ }
+ return nsuccess;
+ }
+ }
+
+ private static class StripedReader {
+ private final short index; // internal block index
+ private BlockReader blockReader;
+ private ByteBuffer buffer;
+
+ private StripedReader(short index) {
+ this.index = index;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index 3e001d3,98af592..67c6fc1
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@@ -180,9 -178,8 +180,10 @@@ class FSDirStatAndListingOp
}
final FileEncryptionInfo feInfo = isReservedName ? null
- : fsd.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
+ : FSDirEncryptionZoneOp.getFileEncryptionInfo(fsd, inode,
- iip.getPathSnapshotId(), iip);
++ iip.getPathSnapshotId(), iip);
+ final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.
+ getErasureCodingPolicy(fsd.getFSNamesystem(), iip);
final LocatedBlocks blocks = bm.createLocatedBlocks(
inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
@@@ -443,12 -440,9 +444,12 @@@
long blocksize = 0;
final boolean isEncrypted;
- final FileEncryptionInfo feInfo = isRawPath ? null :
- fsd.getFileEncryptionInfo(node, snapshot, iip);
+ final FileEncryptionInfo feInfo = isRawPath ? null : FSDirEncryptionZoneOp
+ .getFileEncryptionInfo(fsd, node, snapshot, iip);
+ final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
+ fsd.getFSNamesystem(), iip);
+
if (node.isFile()) {
final INodeFile fileNode = node.asFile();
size = fileNode.computeFileSize(snapshot);
@@@ -500,10 -495,8 +503,10 @@@
long blocksize = 0;
LocatedBlocks loc = null;
final boolean isEncrypted;
- final FileEncryptionInfo feInfo = isRawPath ? null :
- fsd.getFileEncryptionInfo(node, snapshot, iip);
+ final FileEncryptionInfo feInfo = isRawPath ? null : FSDirEncryptionZoneOp
+ .getFileEncryptionInfo(fsd, node, snapshot, iip);
+ final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
+ fsd.getFSNamesystem(), iip);
if (node.isFile()) {
final INodeFile fileNode = node.asFile();
size = fileNode.computeFileSize(snapshot);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index b6b151c,4dda27d..a94b61c
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@@ -131,11 -131,9 +131,10 @@@ import org.apache.commons.logging.impl.
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
- import org.apache.hadoop.crypto.key.KeyProvider;
+ import org.apache.hadoop.crypto.key.KeyProvider.Metadata;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 4143964,79a3773..e9363b4
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@@ -2057,18 -2043,6 +2061,18 @@@ class NameNodeRpcServer implements Name
public void removeSpanReceiver(long id) throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
- nn.spanReceiverHost.removeSpanReceiver(id);
+ nn.tracerConfigurationManager.removeSpanReceiver(id);
}
+
+ @Override // ClientProtocol
+ public ErasureCodingPolicy[] getErasureCodingPolicies() throws IOException {
+ checkNNStartup();
+ return namesystem.getErasureCodingPolicies();
+ }
+
+ @Override // ClientProtocol
+ public ErasureCodingPolicy getErasureCodingPolicy(String src) throws IOException {
+ checkNNStartup();
+ return namesystem.getErasureCodingPolicy(src);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 9d24b91,727259f..6dd7b89
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@@ -33,7 -33,7 +33,8 @@@ package hadoop.hdfs.datanode
import "HAServiceProtocol.proto";
import "hdfs.proto";
+import "erasurecoding.proto";
+ import "HdfsServer.proto";
/**
* Information to identify a datanode to a namenode
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
index 0000000,3b60e51..66b2a33
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
@@@ -1,0 -1,201 +1,198 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ /**
+ * These .proto interfaces are private and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+ // This file contains protocol buffers that are used throughout HDFS -- i.e.
+ // by the client, server, and data transfer protocols.
+
+
+ option java_package = "org.apache.hadoop.hdfs.protocol.proto";
+ option java_outer_classname = "HdfsServerProtos";
+ option java_generate_equals_and_hash = true;
+ package hadoop.hdfs;
+
+ import "hdfs.proto";
+
+ /**
- * A list of storage IDs.
- */
-message StorageUuidsProto {
- repeated string storageUuids = 1;
-}
-
-/**
+ * Block access token information
+ */
+ message BlockKeyProto {
+ required uint32 keyId = 1; // Key identifier
+ required uint64 expiryDate = 2; // Expiry time in milliseconds
+ optional bytes keyBytes = 3; // Key secret
+ }
+
+ /**
+ * Current key and set of block keys at the namenode.
+ */
+ message ExportedBlockKeysProto {
+ required bool isBlockTokenEnabled = 1;
+ required uint64 keyUpdateInterval = 2;
+ required uint64 tokenLifeTime = 3;
+ required BlockKeyProto currentKey = 4;
+ repeated BlockKeyProto allKeys = 5;
+ }
+
+ /**
+ * Block and datanodes where is it located
+ */
+ message BlockWithLocationsProto {
+ required BlockProto block = 1; // Block
+ repeated string datanodeUuids = 2; // Datanodes with replicas of the block
+ repeated string storageUuids = 3; // Storages with replicas of the block
+ repeated StorageTypeProto storageTypes = 4;
++
++ optional bytes indices = 5;
++ optional uint32 dataBlockNum = 6;
++ optional uint32 cellSize = 7;
+ }
+
+ /**
+ * List of block with locations
+ */
+ message BlocksWithLocationsProto {
+ repeated BlockWithLocationsProto blocks = 1;
+ }
+
+ /**
+ * Editlog information with available transactions
+ */
+ message RemoteEditLogProto {
+ required uint64 startTxId = 1; // Starting available edit log transaction
+ required uint64 endTxId = 2; // Ending available edit log transaction
+ optional bool isInProgress = 3 [default = false];
+ }
+
+ /**
+ * Enumeration of editlogs available on a remote namenode
+ */
+ message RemoteEditLogManifestProto {
+ repeated RemoteEditLogProto logs = 1;
+ }
+
+ /**
+ * Namespace information that describes namespace on a namenode
+ */
+ message NamespaceInfoProto {
+ required string buildVersion = 1; // Software revision version (e.g. an svn or git revision)
+ required uint32 unused = 2; // Retained for backward compatibility
+ required string blockPoolID = 3; // block pool used by the namespace
+ required StorageInfoProto storageInfo = 4;// Node information
+ required string softwareVersion = 5; // Software version number (e.g. 2.0.0)
+ optional uint64 capabilities = 6 [default = 0]; // feature flags
+ }
+
+ /**
+ * State of a block replica at a datanode
+ */
+ enum ReplicaStateProto {
+ FINALIZED = 0; // State of a replica when it is not modified
+ RBW = 1; // State of replica that is being written to
+ RWR = 2; // State of replica that is waiting to be recovered
+ RUR = 3; // State of replica that is under recovery
+ TEMPORARY = 4; // State of replica that is created for replication
+ }
+
+ /**
+ * Block that needs to be recovered with at a given location
+ */
+ message RecoveringBlockProto {
+ required uint64 newGenStamp = 1; // New genstamp post recovery
+ required LocatedBlockProto block = 2; // Block to be recovered
+ optional BlockProto truncateBlock = 3; // New block for recovery (truncate)
+ }
+
+ /**
+ * Unique signature to identify checkpoint transactions.
+ */
+ message CheckpointSignatureProto {
+ required string blockPoolId = 1;
+ required uint64 mostRecentCheckpointTxId = 2;
+ required uint64 curSegmentTxId = 3;
+ required StorageInfoProto storageInfo = 4;
+ }
+
+ /**
+ * Command returned from primary to checkpointing namenode.
+ * This command has checkpoint signature that identifies
+ * checkpoint transaction and is needed for further
+ * communication related to checkpointing.
+ */
+ message CheckpointCommandProto {
+ // Unique signature to identify checkpoint transation
+ required CheckpointSignatureProto signature = 1;
+
+ // If true, return transfer image to primary upon the completion of checkpoint
+ required bool needToReturnImage = 2;
+ }
+
+ /**
+ * Command sent from one namenode to another namenode.
+ */
+ message NamenodeCommandProto {
+ enum Type {
+ NamenodeCommand = 0; // Base command
+ CheckPointCommand = 1; // Check point command
+ }
+ required uint32 action = 1;
+ required Type type = 2;
+ optional CheckpointCommandProto checkpointCmd = 3;
+ }
+
+ /**
+ * void request
+ */
+ message VersionRequestProto {
+ }
+
+ /**
+ * Version response from namenode.
+ */
+ message VersionResponseProto {
+ required NamespaceInfoProto info = 1;
+ }
+
+ /**
+ * Common node information shared by all the nodes in the cluster
+ */
+ message StorageInfoProto {
+ required uint32 layoutVersion = 1; // Layout version of the file system
+ required uint32 namespceID = 2; // File system namespace ID
+ required string clusterID = 3; // ID of the cluster
+ required uint64 cTime = 4; // File system creation time
+ }
+
+ /**
+ * Information sent by a namenode to identify itself to the primary namenode.
+ */
+ message NamenodeRegistrationProto {
+ required string rpcAddress = 1; // host:port of the namenode RPC address
+ required string httpAddress = 2; // host:port of the namenode http server
+ enum NamenodeRoleProto {
+ NAMENODE = 1;
+ BACKUP = 2;
+ CHECKPOINT = 3;
+ }
+ required StorageInfoProto storageInfo = 3; // Node information
+ optional NamenodeRoleProto role = 4 [default = NAMENODE]; // Namenode role
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
index d1b16b1,50d548a..ce7aee3
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
@@@ -73,21 -74,17 +73,21 @@@ public class TestBlockTokenWithDFS
private static final String FILE_TO_READ = "/fileToRead.dat";
private static final String FILE_TO_WRITE = "/fileToWrite.dat";
private static final String FILE_TO_APPEND = "/fileToAppend.dat";
- private final byte[] rawData = new byte[FILE_SIZE];
{
- ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
+ }
+
+ public static byte[] generateBytes(int fileSize){
Random r = new Random();
+ byte[] rawData = new byte[fileSize];
r.nextBytes(rawData);
+ return rawData;
}
- private void createFile(FileSystem fs, Path filename) throws IOException {
+ private void createFile(FileSystem fs, Path filename, byte[] expected) throws IOException {
FSDataOutputStream out = fs.create(filename);
- out.write(rawData);
+ out.write(expected);
out.close();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
index 917b0f2,df07a62..2bb3d5f
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
@@@ -55,9 -46,8 +54,10 @@@ import org.apache.hadoop.hdfs.server.bl
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.log4j.Level;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
index 5efc94d,a84ddd0..6df88fd
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
@@@ -1660,60 -1657,4 +1662,60 @@@ public class TestFsck
}
}
}
+
+ @Test
+ public void testECFsck() throws Exception {
+ MiniDFSCluster cluster = null;
+ FileSystem fs = null;
+ try {
+ Configuration conf = new HdfsConfiguration();
+ final long precision = 1L;
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, precision);
+ conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
+ int totalSize = ErasureCodingPolicyManager.getSystemDefaultPolicy().getNumDataUnits()
+ + ErasureCodingPolicyManager.getSystemDefaultPolicy().getNumParityUnits();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(totalSize).build();
+ fs = cluster.getFileSystem();
+
+ // create a contiguous file
+ Path replDirPath = new Path("/replicated");
+ Path replFilePath = new Path(replDirPath, "replfile");
+ final short factor = 3;
+ DFSTestUtil.createFile(fs, replFilePath, 1024, factor, 0);
+ DFSTestUtil.waitReplication(fs, replFilePath, factor);
+
+ // create a large striped file
+ Path ecDirPath = new Path("/striped");
+ Path largeFilePath = new Path(ecDirPath, "largeFile");
+ DFSTestUtil.createStripedFile(cluster, largeFilePath, ecDirPath, 1, 2, true);
+
+ // create a small striped file
+ Path smallFilePath = new Path(ecDirPath, "smallFile");
+ DFSTestUtil.writeFile(fs, smallFilePath, "hello world!");
+
+ long replTime = fs.getFileStatus(replFilePath).getAccessTime();
+ long ecTime = fs.getFileStatus(largeFilePath).getAccessTime();
+ Thread.sleep(precision);
+ setupAuditLogs();
+ String outStr = runFsck(conf, 0, true, "/");
+ verifyAuditLogs();
+ assertEquals(replTime, fs.getFileStatus(replFilePath).getAccessTime());
+ assertEquals(ecTime, fs.getFileStatus(largeFilePath).getAccessTime());
+ System.out.println(outStr);
+ assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
+ if (fs != null) {try{fs.close();} catch(Exception e){}}
+ cluster.shutdown();
+
+ // restart the cluster; bring up namenode but not the data nodes
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(0).format(false).build();
+ outStr = runFsck(conf, 1, true, "/");
+ // expect the result is corrupt
+ assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
+ System.out.println(outStr);
+ } finally {
+ if (fs != null) {try{fs.close();} catch(Exception e){}}
+ if (cluster != null) { cluster.shutdown(); }
+ }
+ }
- }
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
----------------------------------------------------------------------