You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/30 17:42:04 UTC
[04/50] [abbrv] hadoop git commit: Merge commit
'456e901a4c5c639267ee87b8e5f1319f256d20c2' (HDFS-6407. Add sorting and
pagination in the datanode tab of the NN Web UI. Contributed by Haohui Mai.)
into HDFS-7285-merge
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index b71e59e,0000000..4ca8fe6
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@@ -1,653 -1,0 +1,653 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * This class supports writing files in striped layout and erasure coded format.
+ * Each stripe contains a sequence of cells.
+ */
+@InterfaceAudience.Private
+public class DFSStripedOutputStream extends DFSOutputStream {
+ static class MultipleBlockingQueue<T> {
+ private final List<BlockingQueue<T>> queues;
+
+ MultipleBlockingQueue(int numQueue, int queueSize) {
+ queues = new ArrayList<>(numQueue);
+ for (int i = 0; i < numQueue; i++) {
+ queues.add(new LinkedBlockingQueue<T>(queueSize));
+ }
+ }
+
+ boolean isEmpty() {
+ for(int i = 0; i < queues.size(); i++) {
+ if (!queues.get(i).isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ int numQueues() {
+ return queues.size();
+ }
+
+ void offer(int i, T object) {
+ final boolean b = queues.get(i).offer(object);
+ Preconditions.checkState(b, "Failed to offer " + object
+ + " to queue, i=" + i);
+ }
+
+ T take(int i) throws InterruptedIOException {
+ try {
+ return queues.get(i).take();
+ } catch(InterruptedException ie) {
+ throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, ie);
+ }
+ }
+
+ T poll(int i) {
+ return queues.get(i).poll();
+ }
+
+ T peek(int i) {
+ return queues.get(i).peek();
+ }
+ }
+
+ /** Coordinate the communication between the streamers. */
+ class Coordinator {
+ private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
+ private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
+
+ private final MultipleBlockingQueue<LocatedBlock> newBlocks;
+ private final MultipleBlockingQueue<ExtendedBlock> updateBlocks;
+
+ Coordinator(final DfsClientConf conf, final int numDataBlocks,
+ final int numAllBlocks) {
+ followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
+ endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1);
+
+ newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
+ updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
+ }
+
+ MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
+ return followingBlocks;
+ }
+
+ MultipleBlockingQueue<LocatedBlock> getNewBlocks() {
+ return newBlocks;
+ }
+
+ MultipleBlockingQueue<ExtendedBlock> getUpdateBlocks() {
+ return updateBlocks;
+ }
+
+ StripedDataStreamer getStripedDataStreamer(int i) {
+ return DFSStripedOutputStream.this.getStripedDataStreamer(i);
+ }
+
+ void offerEndBlock(int i, ExtendedBlock block) {
+ endBlocks.offer(i, block);
+ }
+
+ ExtendedBlock takeEndBlock(int i) throws InterruptedIOException {
+ return endBlocks.take(i);
+ }
+
+ boolean hasAllEndBlocks() {
+ for(int i = 0; i < endBlocks.numQueues(); i++) {
+ if (endBlocks.peek(i) == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
+ ExtendedBlock b = endBlocks.peek(i);
+ if (b == null) {
+ // streamer just has failed, put end block and continue
+ b = block;
+ offerEndBlock(i, b);
+ }
+ b.setNumBytes(newBytes);
+ }
+
+ /** @return a block representing the entire block group. */
+ ExtendedBlock getBlockGroup() {
+ final StripedDataStreamer s0 = getStripedDataStreamer(0);
+ final ExtendedBlock b0 = s0.getBlock();
+ if (b0 == null) {
+ return null;
+ }
+
+ final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0;
+ final ExtendedBlock block = new ExtendedBlock(b0);
+ long numBytes = b0.getNumBytes();
+ for (int i = 1; i < numDataBlocks; i++) {
+ final StripedDataStreamer si = getStripedDataStreamer(i);
+ final ExtendedBlock bi = si.getBlock();
+ if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) {
+ block.setGenerationStamp(bi.getGenerationStamp());
+ }
+ numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock();
+ }
+ block.setNumBytes(numBytes);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes());
+ }
+ return block;
+ }
+ }
+
+ /** Buffers for writing the data and parity cells of a stripe. */
+ class CellBuffers {
+ private final ByteBuffer[] buffers;
+ private final byte[][] checksumArrays;
+
+ CellBuffers(int numParityBlocks) throws InterruptedException{
+ if (cellSize % bytesPerChecksum != 0) {
+ throw new HadoopIllegalArgumentException("Invalid values: "
+ + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (="
+ + bytesPerChecksum + ") must divide cell size (=" + cellSize + ").");
+ }
+
+ checksumArrays = new byte[numParityBlocks][];
+ final int size = getChecksumSize() * (cellSize / bytesPerChecksum);
+ for (int i = 0; i < checksumArrays.length; i++) {
+ checksumArrays[i] = new byte[size];
+ }
+
+ buffers = new ByteBuffer[numAllBlocks];
+ for (int i = 0; i < buffers.length; i++) {
+ buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
+ }
+ }
+
+ private ByteBuffer[] getBuffers() {
+ return buffers;
+ }
+
+ byte[] getChecksumArray(int i) {
+ return checksumArrays[i - numDataBlocks];
+ }
+
+ private int addTo(int i, byte[] b, int off, int len) {
+ final ByteBuffer buf = buffers[i];
+ final int pos = buf.position() + len;
+ Preconditions.checkState(pos <= cellSize);
+ buf.put(b, off, len);
+ return pos;
+ }
+
+ private void clear() {
+ for (int i = 0; i< numAllBlocks; i++) {
+ buffers[i].clear();
+ if (i >= numDataBlocks) {
+ Arrays.fill(buffers[i].array(), (byte) 0);
+ }
+ }
+ }
+
+ private void release() {
+ for (int i = 0; i < numAllBlocks; i++) {
+ byteArrayManager.release(buffers[i].array());
+ }
+ }
+
+ private void flipDataBuffers() {
+ for (int i = 0; i < numDataBlocks; i++) {
+ buffers[i].flip();
+ }
+ }
+ }
+
+ private final Coordinator coordinator;
+ private final CellBuffers cellBuffers;
+ private final RawErasureEncoder encoder;
+ private final List<StripedDataStreamer> streamers;
+ private final DFSPacket[] currentPackets; // current Packet of each streamer
+
+ /** Size of each striping cell, must be a multiple of bytesPerChecksum */
+ private final int cellSize;
+ private final int numAllBlocks;
+ private final int numDataBlocks;
+
+ @Override
+ ExtendedBlock getBlock() {
+ return coordinator.getBlockGroup();
+ }
+
+ /** Construct a new output stream for creating a file. */
+ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
+ EnumSet<CreateFlag> flag, Progressable progress,
+ DataChecksum checksum, String[] favoredNodes)
+ throws IOException {
+ super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating DFSStripedOutputStream for " + src);
+ }
+
+ final ErasureCodingPolicy ecPolicy = stat.getErasureCodingPolicy();
+ final int numParityBlocks = ecPolicy.getNumParityUnits();
+ cellSize = ecPolicy.getCellSize();
+ numDataBlocks = ecPolicy.getNumDataUnits();
+ numAllBlocks = numDataBlocks + numParityBlocks;
+
+ encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
+ numDataBlocks, numParityBlocks);
+
+ coordinator = new Coordinator(dfsClient.getConf(),
+ numDataBlocks, numAllBlocks);
+ try {
+ cellBuffers = new CellBuffers(numParityBlocks);
+ } catch (InterruptedException ie) {
+ throw DFSUtil.toInterruptedIOException(
+ "Failed to create cell buffers", ie);
+ }
+
+ List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
+ for (short i = 0; i < numAllBlocks; i++) {
+ StripedDataStreamer streamer = new StripedDataStreamer(stat,
+ dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
+ favoredNodes, i, coordinator);
+ s.add(streamer);
+ }
+ streamers = Collections.unmodifiableList(s);
+ currentPackets = new DFSPacket[streamers.size()];
+ setCurrentStreamer(0);
+ }
+
+ StripedDataStreamer getStripedDataStreamer(int i) {
+ return streamers.get(i);
+ }
+
+ int getCurrentIndex() {
+ return getCurrentStreamer().getIndex();
+ }
+
+ private synchronized StripedDataStreamer getCurrentStreamer() {
+ return (StripedDataStreamer)streamer;
+ }
+
+ private synchronized StripedDataStreamer setCurrentStreamer(int newIdx)
+ throws IOException {
+ // backup currentPacket for current streamer
+ int oldIdx = streamers.indexOf(streamer);
+ if (oldIdx >= 0) {
+ currentPackets[oldIdx] = currentPacket;
+ }
+
+ streamer = streamers.get(newIdx);
+ currentPacket = currentPackets[newIdx];
+ adjustChunkBoundary();
+
+ return getCurrentStreamer();
+ }
+
+ /**
+ * Encode the buffers, i.e. compute parities.
+ *
+ * @param buffers data buffers + parity buffers
+ */
+ private static void encode(RawErasureEncoder encoder, int numData,
+ ByteBuffer[] buffers) {
+ final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
+ final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
+ System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
+ System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
+
+ encoder.encode(dataBuffers, parityBuffers);
+ }
+
+
+ private void checkStreamers() throws IOException {
+ int count = 0;
+ for(StripedDataStreamer s : streamers) {
+ if (!s.isFailed()) {
+ if (s.getBlock() != null) {
+ s.getErrorState().initExternalError();
+ }
+ count++;
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("checkStreamers: " + streamers);
+ LOG.debug("count=" + count);
+ }
+ if (count < numDataBlocks) {
+ throw new IOException("Failed: the number of remaining blocks = "
+ + count + " < the number of data blocks = " + numDataBlocks);
+ }
+ }
+
+ private void handleStreamerFailure(String err,
+ Exception e) throws IOException {
+ LOG.warn("Failed: " + err + ", " + this, e);
+ getCurrentStreamer().setFailed(true);
+ checkStreamers();
+ currentPacket = null;
+ }
+
+ @Override
+ protected synchronized void writeChunk(byte[] bytes, int offset, int len,
+ byte[] checksum, int ckoff, int cklen) throws IOException {
+ final int index = getCurrentIndex();
+ final StripedDataStreamer current = getCurrentStreamer();
+ final int pos = cellBuffers.addTo(index, bytes, offset, len);
+ final boolean cellFull = pos == cellSize;
+
+ final long oldBytes = current.getBytesCurBlock();
+ if (!current.isFailed()) {
+ try {
+ super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
+ } catch(Exception e) {
+ handleStreamerFailure("offset=" + offset + ", length=" + len, e);
+ }
+ }
+
+ if (current.isFailed()) {
+ final long newBytes = oldBytes + len;
+ coordinator.setBytesEndBlock(index, newBytes, current.getBlock());
+ current.setBytesCurBlock(newBytes);
+ }
+
+ // Two extra steps are needed when a striping cell is full:
+ // 1. Forward the current index pointer
+ // 2. Generate parity packets if a full stripe of data cells are present
+ if (cellFull) {
+ int next = index + 1;
+ //When all data cells in a stripe are ready, we need to encode
+ //them and generate some parity cells. These cells will be
+ //converted to packets and put to their DataStreamer's queue.
+ if (next == numDataBlocks) {
+ cellBuffers.flipDataBuffers();
+ writeParityCells();
+ next = 0;
+ }
+ setCurrentStreamer(next);
+ }
+ }
+
+ private int stripeDataSize() {
+ return numDataBlocks * cellSize;
+ }
+
+ @Override
+ public void hflush() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void hsync() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void start() {
+ for (StripedDataStreamer streamer : streamers) {
+ streamer.start();
+ }
+ }
+
+ @Override
+ synchronized void abort() throws IOException {
+ if (isClosed()) {
+ return;
+ }
+ for (StripedDataStreamer streamer : streamers) {
+ streamer.getLastException().set(new IOException("Lease timeout of "
+ + (dfsClient.getConf().getHdfsTimeout()/1000) +
+ " seconds expired."));
+ }
+ closeThreads(true);
+ dfsClient.endFileLease(fileId);
+ }
+
+ @Override
+ boolean isClosed() {
+ if (closed) {
+ return true;
+ }
+ for(StripedDataStreamer s : streamers) {
+ if (!s.streamerClosed()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ protected void closeThreads(boolean force) throws IOException {
+ final MultipleIOException.Builder b = new MultipleIOException.Builder();
+ try {
+ for (StripedDataStreamer streamer : streamers) {
+ try {
+ streamer.close(force);
+ streamer.join();
+ streamer.closeSocket();
+ } catch (Exception e) {
+ try {
+ handleStreamerFailure("force=" + force, e);
+ } catch (IOException ioe) {
+ b.add(ioe);
+ }
+ } finally {
+ streamer.setSocketToNull();
+ }
+ }
+ } finally {
+ setClosed();
+ }
+ final IOException ioe = b.build();
+ if (ioe != null) {
+ throw ioe;
+ }
+ }
+
+ /**
+ * Simply add bytesCurBlock together. Note that this result is not accurately
+ * the size of the block group.
+ */
+ private long getCurrentSumBytes() {
+ long sum = 0;
+ for (int i = 0; i < numDataBlocks; i++) {
+ sum += streamers.get(i).getBytesCurBlock();
+ }
+ return sum;
+ }
+
+ private void writeParityCellsForLastStripe() throws IOException {
+ final long currentBlockGroupBytes = getCurrentSumBytes();
+ if (currentBlockGroupBytes % stripeDataSize() == 0) {
+ return;
+ }
+
+ final int firstCellSize =
+ (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize);
+ final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize?
+ firstCellSize : cellSize;
+ final ByteBuffer[] buffers = cellBuffers.getBuffers();
+
+ for (int i = 0; i < numAllBlocks; i++) {
+ // Pad zero bytes to make all cells exactly the size of parityCellSize
+ // If internal block is smaller than parity block, pad zero bytes.
+ // Also pad zero bytes to all parity cells
+ final int position = buffers[i].position();
+ assert position <= parityCellSize : "If an internal block is smaller" +
+ " than parity block, then its last cell should be small than last" +
+ " parity cell";
+ for (int j = 0; j < parityCellSize - position; j++) {
+ buffers[i].put((byte) 0);
+ }
+ buffers[i].flip();
+ }
+
+ writeParityCells();
+ }
+
+ void writeParityCells() throws IOException {
+ final ByteBuffer[] buffers = cellBuffers.getBuffers();
+ //encode the data cells
+ encode(encoder, numDataBlocks, buffers);
+ for (int i = numDataBlocks; i < numAllBlocks; i++) {
+ writeParity(i, buffers[i], cellBuffers.getChecksumArray(i));
+ }
+ cellBuffers.clear();
+ }
+
+ void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf
+ ) throws IOException {
+ final StripedDataStreamer current = setCurrentStreamer(index);
+ final int len = buffer.limit();
+
+ final long oldBytes = current.getBytesCurBlock();
+ if (!current.isFailed()) {
+ try {
+ DataChecksum sum = getDataChecksum();
+ sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
+ for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
+ int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
+ int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
+ super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset,
+ getChecksumSize());
+ }
+ } catch(Exception e) {
+ handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
+ }
+ }
+
+ if (current.isFailed()) {
+ final long newBytes = oldBytes + len;
+ current.setBytesCurBlock(newBytes);
+ }
+ }
+
+ @Override
+ void setClosed() {
+ super.setClosed();
+ for (int i = 0; i < numAllBlocks; i++) {
+ streamers.get(i).release();
+ }
+ cellBuffers.release();
+ }
+
+ @Override
+ protected synchronized void closeImpl() throws IOException {
+ if (isClosed()) {
+ final MultipleIOException.Builder b = new MultipleIOException.Builder();
+ for(int i = 0; i < streamers.size(); i++) {
+ final StripedDataStreamer si = getStripedDataStreamer(i);
+ try {
+ si.getLastException().check(true);
+ } catch (IOException e) {
+ b.add(e);
+ }
+ }
+ final IOException ioe = b.build();
+ if (ioe != null) {
+ throw ioe;
+ }
+ return;
+ }
+
+ try {
+ // flush from all upper layers
+ try {
+ flushBuffer();
+ // if the last stripe is incomplete, generate and write parity cells
+ writeParityCellsForLastStripe();
+ enqueueAllCurrentPackets();
+ } catch(Exception e) {
+ handleStreamerFailure("closeImpl", e);
+ }
+
+ for (int i = 0; i < numAllBlocks; i++) {
+ final StripedDataStreamer s = setCurrentStreamer(i);
+ if (!s.isFailed()) {
+ try {
+ if (s.getBytesCurBlock() > 0) {
- setCurrentPacket2Empty();
++ setCurrentPacketToEmpty();
+ }
+ // flush all data to Datanode
+ flushInternal();
+ } catch(Exception e) {
+ handleStreamerFailure("closeImpl", e);
+ }
+ }
+ }
+
+ closeThreads(false);
+ final ExtendedBlock lastBlock = coordinator.getBlockGroup();
+ TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
+ try {
+ completeFile(lastBlock);
+ } finally {
+ scope.close();
+ }
+ dfsClient.endFileLease(fileId);
+ } catch (ClosedChannelException ignored) {
+ } finally {
+ setClosed();
+ }
+ }
+
+ private void enqueueAllCurrentPackets() throws IOException {
+ int idx = streamers.indexOf(getCurrentStreamer());
+ for(int i = 0; i < streamers.size(); i++) {
+ setCurrentStreamer(i);
+ if (currentPacket != null) {
+ enqueueCurrentPacket();
+ }
+ }
+ setCurrentStreamer(idx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index d55d00b,8e81fdc..50a367a
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@@ -199,12 -200,7 +201,13 @@@ import org.apache.hadoop.hdfs.protocol.
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneResponseProto;
+ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index f2facd7,4ca5b26..c083b5e
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@@ -3124,193 -3067,7 +3141,194 @@@ public class PBHelper
setTotalRpcs(context.getTotalRpcs()).
setCurRpc(context.getCurRpc()).
setId(context.getReportId()).
+ setLeaseId(context.getLeaseId()).
build();
}
+
+ public static ECSchema convertECSchema(ECSchemaProto schema) {
+ List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList();
+ Map<String, String> options = new HashMap<>(optionsList.size());
+ for (ECSchemaOptionEntryProto option : optionsList) {
+ options.put(option.getKey(), option.getValue());
+ }
+ return new ECSchema(schema.getCodecName(), schema.getDataUnits(),
+ schema.getParityUnits(), options);
+ }
+
+ public static ECSchemaProto convertECSchema(ECSchema schema) {
+ ECSchemaProto.Builder builder = ECSchemaProto.newBuilder()
+ .setCodecName(schema.getCodecName())
+ .setDataUnits(schema.getNumDataUnits())
+ .setParityUnits(schema.getNumParityUnits());
+ Set<Entry<String, String>> entrySet = schema.getExtraOptions().entrySet();
+ for (Entry<String, String> entry : entrySet) {
+ builder.addOptions(ECSchemaOptionEntryProto.newBuilder()
+ .setKey(entry.getKey()).setValue(entry.getValue()).build());
+ }
+ return builder.build();
+ }
+
+ public static ErasureCodingPolicy convertErasureCodingPolicy(
+ ErasureCodingPolicyProto policy) {
+ return new ErasureCodingPolicy(policy.getName(),
+ convertECSchema(policy.getSchema()),
+ policy.getCellSize());
+ }
+
+ public static ErasureCodingPolicyProto convertErasureCodingPolicy(
+ ErasureCodingPolicy policy) {
+ ErasureCodingPolicyProto.Builder builder = ErasureCodingPolicyProto
+ .newBuilder()
+ .setName(policy.getName())
+ .setSchema(convertECSchema(policy.getSchema()))
+ .setCellSize(policy.getCellSize());
+ return builder.build();
+ }
+
+ public static ErasureCodingZoneProto convertErasureCodingZone(
+ ErasureCodingZone ecZone) {
+ return ErasureCodingZoneProto.newBuilder().setDir(ecZone.getDir())
+ .setEcPolicy(convertErasureCodingPolicy(ecZone.getErasureCodingPolicy()))
+ .build();
+ }
+
+ public static ErasureCodingZone convertErasureCodingZone(
+ ErasureCodingZoneProto ecZoneProto) {
+ return new ErasureCodingZone(ecZoneProto.getDir(),
+ convertErasureCodingPolicy(ecZoneProto.getEcPolicy()));
+ }
+
+ public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
+ BlockECRecoveryInfoProto blockEcRecoveryInfoProto) {
+ ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock();
+ ExtendedBlock block = convert(blockProto);
+
+ DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto
+ .getSourceDnInfos();
+ DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto);
+
+ DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto
+ .getTargetDnInfos();
+ DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto);
+
+ StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto
+ .getTargetStorageUuids();
+ String[] targetStorageUuids = convert(targetStorageUuidsProto);
+
+ StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto
+ .getTargetStorageTypes();
+ StorageType[] convertStorageTypes = convertStorageTypes(
+ targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto
+ .getStorageTypesList().size());
+
+ List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto
+ .getLiveBlockIndicesList();
+ short[] liveBlkIndices = new short[liveBlockIndicesList.size()];
+ for (int i = 0; i < liveBlockIndicesList.size(); i++) {
+ liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue();
+ }
+
+ ErasureCodingPolicy ecPolicy =
+ convertErasureCodingPolicy(blockEcRecoveryInfoProto.getEcPolicy());
+
+ return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
+ targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy);
+ }
+
+ public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
+ BlockECRecoveryInfo blockEcRecoveryInfo) {
+ BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto
+ .newBuilder();
+ builder.setBlock(convert(blockEcRecoveryInfo.getExtendedBlock()));
+
+ DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos();
+ builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
+
+ DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos();
+ builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
+
+ String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs();
+ builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs));
+
+ StorageType[] targetStorageTypes = blockEcRecoveryInfo
+ .getTargetStorageTypes();
+ builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
+
+ short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
+ builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
+
+ builder.setEcPolicy(convertErasureCodingPolicy(blockEcRecoveryInfo
+ .getErasureCodingPolicy()));
+
+ return builder.build();
+ }
+
+ private static List<Integer> convertIntArray(short[] liveBlockIndices) {
+ List<Integer> liveBlockIndicesList = new ArrayList<Integer>();
+ for (short s : liveBlockIndices) {
+ liveBlockIndicesList.add((int) s);
+ }
+ return liveBlockIndicesList;
+ }
+
+ private static StorageTypesProto convertStorageTypesProto(
+ StorageType[] targetStorageTypes) {
+ StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
+ for (StorageType storageType : targetStorageTypes) {
+ builder.addStorageTypes(convertStorageType(storageType));
+ }
+ return builder.build();
+ }
+
+ private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) {
+ StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder();
+ for (String storageUuid : targetStorageIDs) {
+ builder.addStorageUuids(storageUuid);
+ }
+ return builder.build();
+ }
+
+ private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) {
+ DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder();
+ for (DatanodeInfo datanodeInfo : dnInfos) {
+ builder.addDatanodes(convert(datanodeInfo));
+ }
+ return builder.build();
+ }
+
+ private static String[] convert(StorageUuidsProto targetStorageUuidsProto) {
+ List<String> storageUuidsList = targetStorageUuidsProto
+ .getStorageUuidsList();
+ String[] storageUuids = new String[storageUuidsList.size()];
+ for (int i = 0; i < storageUuidsList.size(); i++) {
+ storageUuids[i] = storageUuidsList.get(i);
+ }
+ return storageUuids;
+ }
+
+ public static BlockECRecoveryCommandProto convert(
+ BlockECRecoveryCommand blkECRecoveryCmd) {
+ BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto
+ .newBuilder();
+ Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd
+ .getECTasks();
+ for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) {
+ builder
+ .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo));
+ }
+ return builder.build();
+ }
+
+ public static BlockECRecoveryCommand convert(
+ BlockECRecoveryCommandProto blkECRecoveryCmdProto) {
+ Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<BlockECRecoveryInfo>();
+ List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto
+ .getBlockECRecoveryinfoList();
+ for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) {
+ blkECRecoveryInfos
+ .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto));
+ }
+ return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
+ blkECRecoveryInfos);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 930001a,f9847ca..555f506
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@@ -291,10 -310,7 +316,8 @@@ public class Dispatcher
/** Dispatch the move to the proxy source & wait for the response. */
private void dispatch() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Start moving " + this);
- }
+ LOG.info("Start moving " + this);
+ assert !(reportedBlock instanceof DBlockStriped);
Socket sock = new Socket();
DataOutputStream out = null;
@@@ -323,7 -339,8 +346,8 @@@
sendRequest(out, eb, accessToken);
receiveResponse(in);
- nnc.getBytesMoved().addAndGet(block.getNumBytes());
+ nnc.getBytesMoved().addAndGet(reportedBlock.getNumBytes());
+ target.getDDatanode().setHasSuccess();
LOG.info("Successfully moved " + this);
} catch (IOException e) {
LOG.warn("Failed to move " + this + ": " + e.getMessage());
@@@ -656,29 -650,25 +695,39 @@@
* @return the total size of the received blocks in the number of bytes.
*/
private long getBlockList() throws IOException {
- final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
+ final long size = Math.min(getBlocksSize, blocksToReceive);
- final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
+ final BlocksWithLocations newBlksLocs =
+ nnc.getBlocks(getDatanodeInfo(), size);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
+ + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2)
- + ") returns " + newBlocks.getBlocks().length + " blocks.");
++ + ") returns " + newBlksLocs.getBlocks().length + " blocks.");
+ }
+
long bytesReceived = 0;
- for (BlockWithLocations blk : newBlocks.getBlocks()) {
+ for (BlockWithLocations blkLocs : newBlksLocs.getBlocks()) {
+ // Skip small blocks.
- if (blk.getBlock().getNumBytes() < getBlocksMinBlockSize) {
++ if (blkLocs.getBlock().getNumBytes() < getBlocksMinBlockSize) {
+ continue;
+ }
- bytesReceived += blk.getBlock().getNumBytes();
+ DBlock block;
+ if (blkLocs instanceof StripedBlockWithLocations) {
+ StripedBlockWithLocations sblkLocs =
+ (StripedBlockWithLocations) blkLocs;
+ // approximate size
+ bytesReceived += sblkLocs.getBlock().getNumBytes() /
+ sblkLocs.getDataBlockNum();
+ block = new DBlockStriped(sblkLocs.getBlock(), sblkLocs.getIndices(),
+ sblkLocs.getDataBlockNum());
+ } else{
+ bytesReceived += blkLocs.getBlock().getNumBytes();
+ block = new DBlock(blkLocs.getBlock());
+ }
+
synchronized (globalBlocks) {
- final DBlock block = globalBlocks.get(blk.getBlock());
+ block = globalBlocks.putIfAbsent(blkLocs.getBlock(), block);
synchronized (block) {
block.clearLocations();
@@@ -944,8 -954,22 +1015,21 @@@
return new DDatanode(datanode, maxConcurrentMovesPerNode);
}
+
public void executePendingMove(final PendingMove p) {
- // move the block
+ // move the reportedBlock
+ final DDatanode targetDn = p.target.getDDatanode();
+ ExecutorService moveExecutor = targetDn.getMoveExecutor();
+ if (moveExecutor == null) {
+ final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
+ if (nThreads > 0) {
+ moveExecutor = targetDn.initMoveExecutor(nThreads);
+ }
+ }
+ if (moveExecutor == null) {
+ LOG.warn("No mover threads available: skip moving " + p);
+ return;
+ }
-
moveExecutor.execute(new Runnable() {
@Override
public void run() {
@@@ -996,11 -1020,8 +1080,8 @@@
return getBytesMoved() - bytesLastMoved;
}
- /** The sleeping period before checking if reportedBlock move is completed again */
- static private long blockMoveWaitTime = 30000L;
-
/**
- * Wait for all block move confirmations.
+ * Wait for all reportedBlock move confirmations.
* @return true if there is failed move execution
*/
public static boolean waitForMoveCompletion(
@@@ -1027,10 -1048,22 +1108,22 @@@
}
/**
+ * @return true if some moves are success.
+ */
+ public static boolean checkForSuccess(
+ Iterable<? extends StorageGroup> targets) {
+ boolean hasSuccess = false;
+ for (StorageGroup t : targets) {
+ hasSuccess |= t.getDDatanode().hasSuccess;
+ }
+ return hasSuccess;
+ }
+
+ /**
- * Decide if the block is a good candidate to be moved from source to target.
- * A block is a good candidate if
+ * Decide if the block/blockGroup is a good candidate to be moved from source
+ * to target. A block is a good candidate if
* 1. the block is not in the process of being moved/has not been moved;
- * 2. the block does not have a replica on the target;
+ * 2. the block does not have a replica/internalBlock on the target;
* 3. doing the move does not reduce the number of racks that the block has
*/
private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index 4308278,dea31c4..bf11914
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@@ -19,19 -19,21 +19,20 @@@ package org.apache.hadoop.hdfs.server.b
import java.util.LinkedList;
-import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
- import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.util.LightWeightGSet;
/**
- * BlockInfo class maintains for a given block
- * the {@link BlockCollection} it is part of and datanodes where the replicas of
- * the block are stored.
+ * For a given block (or an erasure coding block group), BlockInfo class
+ * maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes
+ * where the replicas of the block, or blocks belonging to the erasure coding
+ * block group, are stored.
*/
-@InterfaceAudience.Private
-public abstract class BlockInfo extends Block
+public abstract class BlockInfo extends Block
implements LightWeightGSet.LinkedElement {
public static final BlockInfo[] EMPTY_ARRAY = {};
+
private BlockCollection bc;
/** For implementing {@link LightWeightGSet.LinkedElement} interface */
@@@ -177,27 -188,7 +178,12 @@@
*/
abstract void replaceBlock(BlockInfo newBlock);
+ public abstract boolean isStriped();
+
+ /** @return true if there is no datanode storage associated with the block */
+ abstract boolean hasNoStorage();
+
/**
- * Find specified DatanodeDescriptor.
- * @return index or -1 if not found.
- */
- boolean findDatanode(DatanodeDescriptor dn) {
- int len = getCapacity();
- for (int idx = 0; idx < len; idx++) {
- DatanodeDescriptor cur = getDatanode(idx);
- if(cur == dn) {
- return true;
- }
- }
- return false;
- }
-
- /**
* Find specified DatanodeStorageInfo.
* @return DatanodeStorageInfo or null if not found.
*/
@@@ -303,27 -294,43 +289,26 @@@
/**
* BlockInfo represents a block that is not being constructed.
- * In order to start modifying the block, the BlockInfo should be converted
- * to {@link BlockInfoContiguousUnderConstruction}.
+ * In order to start modifying the block, the BlockInfo should be converted to
- * {@link BlockInfoUnderConstructionContiguous} or
- * {@link BlockInfoUnderConstructionStriped}.
- * @return {@link HdfsServerConstants.BlockUCState#COMPLETE}
++ * {@link BlockInfoContiguousUnderConstruction} or
++ * {@link BlockInfoStripedUnderConstruction}.
+ * @return {@link BlockUCState#COMPLETE}
*/
- public HdfsServerConstants.BlockUCState getBlockUCState() {
- return HdfsServerConstants.BlockUCState.COMPLETE;
+ public BlockUCState getBlockUCState() {
+ return BlockUCState.COMPLETE;
}
/**
* Is this block complete?
*
- * @return true if the state of the block is
- * {@link HdfsServerConstants.BlockUCState#COMPLETE}
+ * @return true if the state of the block is {@link BlockUCState#COMPLETE}
*/
public boolean isComplete() {
- return getBlockUCState().equals(HdfsServerConstants.BlockUCState.COMPLETE);
+ return getBlockUCState().equals(BlockUCState.COMPLETE);
}
- /**
- * Convert a complete block to an under construction block.
- * @return BlockInfoUnderConstruction - an under construction block.
- */
- public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction(
- BlockUCState s, DatanodeStorageInfo[] targets) {
- if(isComplete()) {
- BlockInfoContiguousUnderConstruction ucBlock =
- new BlockInfoContiguousUnderConstruction(this,
- getBlockCollection().getPreferredBlockReplication(), s, targets);
- ucBlock.setBlockCollection(getBlockCollection());
- return ucBlock;
- }
- // the block is already under construction
- BlockInfoContiguousUnderConstruction ucBlock =
- (BlockInfoContiguousUnderConstruction)this;
- ucBlock.setBlockUCState(s);
- ucBlock.setExpectedLocations(targets);
- ucBlock.setBlockCollection(getBlockCollection());
- return ucBlock;
+ public boolean isDeleted() {
+ return (bc == null);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index d9adccc,eff89a8..bb9bf5b
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@@ -122,36 -120,4 +122,36 @@@ public class BlockInfoContiguous extend
"newBlock already exists.";
}
}
+
+ /**
+ * Convert a complete block to an under construction block.
+ * @return BlockInfoUnderConstruction - an under construction block.
+ */
- public BlockInfoUnderConstructionContiguous convertToBlockUnderConstruction(
++ public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction(
+ BlockUCState s, DatanodeStorageInfo[] targets) {
+ if(isComplete()) {
- BlockInfoUnderConstructionContiguous ucBlock =
- new BlockInfoUnderConstructionContiguous(this,
++ BlockInfoContiguousUnderConstruction ucBlock =
++ new BlockInfoContiguousUnderConstruction(this,
+ getBlockCollection().getPreferredBlockReplication(), s, targets);
+ ucBlock.setBlockCollection(getBlockCollection());
+ return ucBlock;
+ }
+ // the block is already under construction
- BlockInfoUnderConstructionContiguous ucBlock =
- (BlockInfoUnderConstructionContiguous) this;
++ BlockInfoContiguousUnderConstruction ucBlock =
++ (BlockInfoContiguousUnderConstruction) this;
+ ucBlock.setBlockUCState(s);
+ ucBlock.setExpectedLocations(targets);
+ ucBlock.setBlockCollection(getBlockCollection());
+ return ucBlock;
+ }
+
+ @Override
+ public final boolean isStriped() {
+ return false;
+ }
+
+ @Override
+ final boolean hasNoStorage() {
+ return getStorageInfo(0) == null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
index 0000000,7ca6419..96b209d
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
@@@ -1,0 -1,403 +1,281 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs.server.blockmanagement;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Iterator;
+ import java.util.List;
+
+ import org.apache.hadoop.hdfs.protocol.Block;
+ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+ import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+ /**
+ * Represents a block that is currently being constructed.<br>
+ * This is usually the last block of a file opened for write or append.
+ */
-public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
++public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous
++ implements BlockInfoUnderConstruction{
+ /** Block state. See {@link BlockUCState} */
+ private BlockUCState blockUCState;
+
+ /**
+ * Block replicas as assigned when the block was allocated.
+ * This defines the pipeline order.
+ */
+ private List<ReplicaUnderConstruction> replicas;
+
+ /**
+ * Index of the primary data node doing the recovery. Useful for log
+ * messages.
+ */
+ private int primaryNodeIndex = -1;
+
+ /**
+ * The new generation stamp, which this block will have
+ * after the recovery succeeds. Also used as a recovery id to identify
+ * the right recovery if any of the abandoned recoveries re-appear.
+ */
+ private long blockRecoveryId = 0;
+
+ /**
+ * The block source to use in the event of copy-on-write truncate.
+ */
+ private Block truncateBlock;
+
+ /**
- * ReplicaUnderConstruction contains information about replicas while
- * they are under construction.
- * The GS, the length and the state of the replica is as reported by
- * the data-node.
- * It is not guaranteed, but expected, that data-nodes actually have
- * corresponding replicas.
- */
- static class ReplicaUnderConstruction extends Block {
- private final DatanodeStorageInfo expectedLocation;
- private ReplicaState state;
- private boolean chosenAsPrimary;
-
- ReplicaUnderConstruction(Block block,
- DatanodeStorageInfo target,
- ReplicaState state) {
- super(block);
- this.expectedLocation = target;
- this.state = state;
- this.chosenAsPrimary = false;
- }
-
- /**
- * Expected block replica location as assigned when the block was allocated.
- * This defines the pipeline order.
- * It is not guaranteed, but expected, that the data-node actually has
- * the replica.
- */
- private DatanodeStorageInfo getExpectedStorageLocation() {
- return expectedLocation;
- }
-
- /**
- * Get replica state as reported by the data-node.
- */
- ReplicaState getState() {
- return state;
- }
-
- /**
- * Whether the replica was chosen for recovery.
- */
- boolean getChosenAsPrimary() {
- return chosenAsPrimary;
- }
-
- /**
- * Set replica state.
- */
- void setState(ReplicaState s) {
- state = s;
- }
-
- /**
- * Set whether this replica was chosen for recovery.
- */
- void setChosenAsPrimary(boolean chosenAsPrimary) {
- this.chosenAsPrimary = chosenAsPrimary;
- }
-
- /**
- * Is data-node the replica belongs to alive.
- */
- boolean isAlive() {
- return expectedLocation.getDatanodeDescriptor().isAlive;
- }
-
- @Override // Block
- public int hashCode() {
- return super.hashCode();
- }
-
- @Override // Block
- public boolean equals(Object obj) {
- // Sufficient to rely on super's implementation
- return (this == obj) || super.equals(obj);
- }
-
- @Override
- public String toString() {
- final StringBuilder b = new StringBuilder(50);
- appendStringTo(b);
- return b.toString();
- }
-
- @Override
- public void appendStringTo(StringBuilder sb) {
- sb.append("ReplicaUC[")
- .append(expectedLocation)
- .append("|")
- .append(state)
- .append("]");
- }
- }
-
- /**
+ * Create block and set its state to
+ * {@link BlockUCState#UNDER_CONSTRUCTION}.
+ */
+ public BlockInfoContiguousUnderConstruction(Block blk, short replication) {
+ this(blk, replication, BlockUCState.UNDER_CONSTRUCTION, null);
+ }
+
+ /**
+ * Create a block that is currently being constructed.
+ */
+ public BlockInfoContiguousUnderConstruction(Block blk, short replication,
+ BlockUCState state, DatanodeStorageInfo[] targets) {
+ super(blk, replication);
+ assert getBlockUCState() != BlockUCState.COMPLETE :
- "BlockInfoUnderConstruction cannot be in COMPLETE state";
++ "BlockInfoContiguousUnderConstruction cannot be in COMPLETE state";
+ this.blockUCState = state;
+ setExpectedLocations(targets);
+ }
+
- /**
- * Convert an under construction block to a complete block.
- *
- * @return BlockInfo - a complete block.
- * @throws IOException if the state of the block
- * (the generation stamp and the length) has not been committed by
- * the client or it does not have at least a minimal number of replicas
- * reported from data-nodes.
- */
- BlockInfo convertToCompleteBlock() throws IOException {
++ @Override
++ public BlockInfoContiguous convertToCompleteBlock() throws IOException {
+ assert getBlockUCState() != BlockUCState.COMPLETE :
+ "Trying to convert a COMPLETE block";
+ return new BlockInfoContiguous(this);
+ }
+
- /** Set expected locations */
++ @Override
+ public void setExpectedLocations(DatanodeStorageInfo[] targets) {
+ int numLocations = targets == null ? 0 : targets.length;
- this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
- for(int i = 0; i < numLocations; i++)
- replicas.add(
- new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW));
++ this.replicas = new ArrayList<>(numLocations);
++ for(int i = 0; i < numLocations; i++) {
++ replicas.add(new ReplicaUnderConstruction(this, targets[i],
++ ReplicaState.RBW));
++ }
+ }
+
- /**
- * Create array of expected replica locations
- * (as has been assigned by chooseTargets()).
- */
++ @Override
+ public DatanodeStorageInfo[] getExpectedStorageLocations() {
+ int numLocations = replicas == null ? 0 : replicas.size();
+ DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
- for(int i = 0; i < numLocations; i++)
++ for (int i = 0; i < numLocations; i++) {
+ storages[i] = replicas.get(i).getExpectedStorageLocation();
++ }
+ return storages;
+ }
+
- /** Get the number of expected locations */
++ @Override
+ public int getNumExpectedLocations() {
+ return replicas == null ? 0 : replicas.size();
+ }
+
+ /**
+ * Return the state of the block under construction.
+ * @see BlockUCState
+ */
+ @Override // BlockInfo
+ public BlockUCState getBlockUCState() {
+ return blockUCState;
+ }
+
+ void setBlockUCState(BlockUCState s) {
+ blockUCState = s;
+ }
+
- /** Get block recovery ID */
++ @Override
+ public long getBlockRecoveryId() {
+ return blockRecoveryId;
+ }
+
- /** Get recover block */
++ @Override
+ public Block getTruncateBlock() {
+ return truncateBlock;
+ }
+
++ @Override
++ public Block toBlock(){
++ return this;
++ }
++
+ public void setTruncateBlock(Block recoveryBlock) {
+ this.truncateBlock = recoveryBlock;
+ }
+
- /**
- * Process the recorded replicas. When about to commit or finish the
- * pipeline recovery sort out bad replicas.
- * @param genStamp The final generation stamp for the block.
- */
++ @Override
+ public void setGenerationStampAndVerifyReplicas(long genStamp) {
+ // Set the generation stamp for the block.
+ setGenerationStamp(genStamp);
+ if (replicas == null)
+ return;
+
+ // Remove the replicas with wrong gen stamp.
+ // The replica list is unchanged.
+ for (ReplicaUnderConstruction r : replicas) {
+ if (genStamp != r.getGenerationStamp()) {
+ r.getExpectedStorageLocation().removeBlock(this);
+ NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica "
+ + "from location: {}", r.getExpectedStorageLocation());
+ }
+ }
+ }
+
- /**
- * Commit block's length and generation stamp as reported by the client.
- * Set block state to {@link BlockUCState#COMMITTED}.
- * @param block - contains client reported block length and generation
- * @throws IOException if block ids are inconsistent.
- */
- void commitBlock(Block block) throws IOException {
++ @Override
++ public void commitBlock(Block block) throws IOException {
+ if(getBlockId() != block.getBlockId())
+ throw new IOException("Trying to commit inconsistent block: id = "
+ + block.getBlockId() + ", expected id = " + getBlockId());
+ blockUCState = BlockUCState.COMMITTED;
+ this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
+ // Sort out invalid replicas.
+ setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
+ }
+
- /**
- * Initialize lease recovery for this block.
- * Find the first alive data-node starting from the previous primary and
- * make it primary.
- */
++ @Override
+ public void initializeBlockRecovery(long recoveryId) {
+ setBlockUCState(BlockUCState.UNDER_RECOVERY);
+ blockRecoveryId = recoveryId;
+ if (replicas.size() == 0) {
+ NameNode.blockStateChangeLog.warn("BLOCK*"
- + " BlockInfoUnderConstruction.initLeaseRecovery:"
++ + " BlockInfoContiguousUnderConstruction.initLeaseRecovery:"
+ + " No blocks found, lease removed.");
+ }
+ boolean allLiveReplicasTriedAsPrimary = true;
- for (int i = 0; i < replicas.size(); i++) {
++ for (ReplicaUnderConstruction replica : replicas) {
+ // Check if all replicas have been tried or not.
- if (replicas.get(i).isAlive()) {
- allLiveReplicasTriedAsPrimary =
- (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary());
++ if (replica.isAlive()) {
++ allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary &&
++ replica.getChosenAsPrimary());
+ }
+ }
+ if (allLiveReplicasTriedAsPrimary) {
+ // Just set all the replicas to be chosen whether they are alive or not.
- for (int i = 0; i < replicas.size(); i++) {
- replicas.get(i).setChosenAsPrimary(false);
++ for (ReplicaUnderConstruction replica : replicas) {
++ replica.setChosenAsPrimary(false);
+ }
+ }
+ long mostRecentLastUpdate = 0;
+ ReplicaUnderConstruction primary = null;
+ primaryNodeIndex = -1;
+ for(int i = 0; i < replicas.size(); i++) {
+ // Skip alive replicas which have been chosen for recovery.
+ if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) {
+ continue;
+ }
+ final ReplicaUnderConstruction ruc = replicas.get(i);
+ final long lastUpdate = ruc.getExpectedStorageLocation()
+ .getDatanodeDescriptor().getLastUpdateMonotonic();
+ if (lastUpdate > mostRecentLastUpdate) {
+ primaryNodeIndex = i;
+ primary = ruc;
+ mostRecentLastUpdate = lastUpdate;
+ }
+ }
+ if (primary != null) {
- primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this);
++ primary.getExpectedStorageLocation().getDatanodeDescriptor()
++ .addBlockToBeRecovered(this);
+ primary.setChosenAsPrimary(true);
+ NameNode.blockStateChangeLog.debug(
+ "BLOCK* {} recovery started, primary={}", this, primary);
+ }
+ }
+
- void addReplicaIfNotPresent(DatanodeStorageInfo storage,
- Block block,
- ReplicaState rState) {
++ @Override
++ public void addReplicaIfNotPresent(DatanodeStorageInfo storage,
++ Block block, ReplicaState rState) {
+ Iterator<ReplicaUnderConstruction> it = replicas.iterator();
+ while (it.hasNext()) {
+ ReplicaUnderConstruction r = it.next();
+ DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation();
+ if(expectedLocation == storage) {
+ // Record the gen stamp from the report
+ r.setGenerationStamp(block.getGenerationStamp());
+ return;
+ } else if (expectedLocation != null &&
+ expectedLocation.getDatanodeDescriptor() ==
+ storage.getDatanodeDescriptor()) {
+
+ // The Datanode reported that the block is on a different storage
+ // than the one chosen by BlockPlacementPolicy. This can occur as
+ // we allow Datanodes to choose the target storage. Update our
+ // state by removing the stale entry and adding a new one.
+ it.remove();
+ break;
+ }
+ }
+ replicas.add(new ReplicaUnderConstruction(block, storage, rState));
+ }
+
- @Override // BlockInfo
- // BlockInfoUnderConstruction participates in maps the same way as BlockInfo
- public int hashCode() {
- return super.hashCode();
- }
-
- @Override // BlockInfo
- public boolean equals(Object obj) {
- // Sufficient to rely on super's implementation
- return (this == obj) || super.equals(obj);
- }
-
+ @Override
+ public String toString() {
+ final StringBuilder b = new StringBuilder(100);
+ appendStringTo(b);
+ return b.toString();
+ }
+
+ @Override
+ public void appendStringTo(StringBuilder sb) {
+ super.appendStringTo(sb);
+ appendUCParts(sb);
+ }
+
+ private void appendUCParts(StringBuilder sb) {
+ sb.append("{UCState=").append(blockUCState)
+ .append(", truncateBlock=" + truncateBlock)
+ .append(", primaryNodeIndex=").append(primaryNodeIndex)
+ .append(", replicas=[");
+ if (replicas != null) {
+ Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
+ if (iter.hasNext()) {
+ iter.next().appendStringTo(sb);
+ while (iter.hasNext()) {
+ sb.append(", ");
+ iter.next().appendStringTo(sb);
+ }
+ }
+ }
+ sb.append("]}");
+ }
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index b88b554,0000000..14d2fcc
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@@ -1,279 -1,0 +1,279 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+
+/**
+ * Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
+ *
+ * We still use triplets to store DatanodeStorageInfo for each block in the
+ * block group, as well as the previous/next block in the corresponding
+ * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
+ * are sorted and strictly mapped to the corresponding block.
+ *
+ * Normally each block belonging to group is stored in only one DataNode.
+ * However, it is possible that some block is over-replicated. Thus the triplet
+ * array's size can be larger than (m+k). Thus currently we use an extra byte
+ * array to record the block index for each triplet.
+ */
+public class BlockInfoStriped extends BlockInfo {
+ private final ErasureCodingPolicy ecPolicy;
+ /**
+ * Always the same size with triplets. Record the block index for each triplet
+ * TODO: actually this is only necessary for over-replicated block. Thus can
+ * be further optimized to save memory usage.
+ */
+ private byte[] indices;
+
+ public BlockInfoStriped(Block blk, ErasureCodingPolicy ecPolicy) {
+ super(blk, (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()));
+ indices = new byte[ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()];
+ initIndices();
+ this.ecPolicy = ecPolicy;
+ }
+
+ BlockInfoStriped(BlockInfoStriped b) {
+ this(b, b.getErasureCodingPolicy());
+ this.setBlockCollection(b.getBlockCollection());
+ }
+
+ public short getTotalBlockNum() {
+ return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
+ }
+
+ public short getDataBlockNum() {
+ return (short) ecPolicy.getNumDataUnits();
+ }
+
+ public short getParityBlockNum() {
+ return (short) ecPolicy.getNumParityUnits();
+ }
+
+ /**
+ * If the block is committed/completed and its length is less than a full
+ * stripe, it returns the the number of actual data blocks.
+ * Otherwise it returns the number of data units specified by erasure coding policy.
+ */
+ public short getRealDataBlockNum() {
+ if (isComplete() || getBlockUCState() == BlockUCState.COMMITTED) {
+ return (short) Math.min(getDataBlockNum(),
+ (getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1);
+ } else {
+ return getDataBlockNum();
+ }
+ }
+
+ public short getRealTotalBlockNum() {
+ return (short) (getRealDataBlockNum() + getParityBlockNum());
+ }
+
+ public ErasureCodingPolicy getErasureCodingPolicy() {
+ return ecPolicy;
+ }
+
+ private void initIndices() {
+ for (int i = 0; i < indices.length; i++) {
+ indices[i] = -1;
+ }
+ }
+
+ private int findSlot() {
+ int i = getTotalBlockNum();
+ for (; i < getCapacity(); i++) {
+ if (getStorageInfo(i) == null) {
+ return i;
+ }
+ }
+ // need to expand the triplet size
+ ensureCapacity(i + 1, true);
+ return i;
+ }
+
+ @Override
+ boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
+ int blockIndex = BlockIdManager.getBlockIndex(reportedBlock);
+ int index = blockIndex;
+ DatanodeStorageInfo old = getStorageInfo(index);
+ if (old != null && !old.equals(storage)) { // over replicated
+ // check if the storage has been stored
+ int i = findStorageInfo(storage);
+ if (i == -1) {
+ index = findSlot();
+ } else {
+ return true;
+ }
+ }
+ addStorage(storage, index, blockIndex);
+ return true;
+ }
+
+ private void addStorage(DatanodeStorageInfo storage, int index,
+ int blockIndex) {
+ setStorageInfo(index, storage);
+ setNext(index, null);
+ setPrevious(index, null);
+ indices[index] = (byte) blockIndex;
+ }
+
+ private int findStorageInfoFromEnd(DatanodeStorageInfo storage) {
+ final int len = getCapacity();
+ for(int idx = len - 1; idx >= 0; idx--) {
+ DatanodeStorageInfo cur = getStorageInfo(idx);
+ if (storage.equals(cur)) {
+ return idx;
+ }
+ }
+ return -1;
+ }
+
+ int getStorageBlockIndex(DatanodeStorageInfo storage) {
+ int i = this.findStorageInfo(storage);
+ return i == -1 ? -1 : indices[i];
+ }
+
+ /**
+ * Identify the block stored in the given datanode storage. Note that
+ * the returned block has the same block Id with the one seen/reported by the
+ * DataNode.
+ */
+ Block getBlockOnStorage(DatanodeStorageInfo storage) {
+ int index = getStorageBlockIndex(storage);
+ if (index < 0) {
+ return null;
+ } else {
+ Block block = new Block(this);
+ block.setBlockId(this.getBlockId() + index);
+ return block;
+ }
+ }
+
+ @Override
+ boolean removeStorage(DatanodeStorageInfo storage) {
+ int dnIndex = findStorageInfoFromEnd(storage);
+ if (dnIndex < 0) { // the node is not found
+ return false;
+ }
+ assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
+ "Block is still in the list and must be removed first.";
+ // set the triplet to null
+ setStorageInfo(dnIndex, null);
+ setNext(dnIndex, null);
+ setPrevious(dnIndex, null);
+ indices[dnIndex] = -1;
+ return true;
+ }
+
+ private void ensureCapacity(int totalSize, boolean keepOld) {
+ if (getCapacity() < totalSize) {
+ Object[] old = triplets;
+ byte[] oldIndices = indices;
+ triplets = new Object[totalSize * 3];
+ indices = new byte[totalSize];
+ initIndices();
+
+ if (keepOld) {
+ System.arraycopy(old, 0, triplets, 0, old.length);
+ System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
+ }
+ }
+ }
+
+ @Override
+ void replaceBlock(BlockInfo newBlock) {
+ assert newBlock instanceof BlockInfoStriped;
+ BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock;
+ final int size = getCapacity();
+ newBlockGroup.ensureCapacity(size, false);
+ for (int i = 0; i < size; i++) {
+ final DatanodeStorageInfo storage = this.getStorageInfo(i);
+ if (storage != null) {
+ final int blockIndex = indices[i];
+ final boolean removed = storage.removeBlock(this);
+ assert removed : "currentBlock not found.";
+
+ newBlockGroup.addStorage(storage, i, blockIndex);
+ storage.insertToList(newBlockGroup);
+ }
+ }
+ }
+
+ public long spaceConsumed() {
+ // In case striped blocks, total usage by this striped blocks should
+ // be the total of data blocks and parity blocks because
+ // `getNumBytes` is the total of actual data block size.
+ return StripedBlockUtil.spaceConsumedByStripedBlock(getNumBytes(),
+ ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits(),
+ BLOCK_STRIPED_CELL_SIZE);
+ }
+
+ @Override
+ public final boolean isStriped() {
+ return true;
+ }
+
+ @Override
+ public int numNodes() {
+ assert this.triplets != null : "BlockInfo is not initialized";
+ assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+ int num = 0;
+ for (int idx = getCapacity()-1; idx >= 0; idx--) {
+ if (getStorageInfo(idx) != null) {
+ num++;
+ }
+ }
+ return num;
+ }
+
+ /**
+ * Convert a complete block to an under construction block.
+ * @return BlockInfoUnderConstruction - an under construction block.
+ */
- public BlockInfoUnderConstructionStriped convertToBlockUnderConstruction(
++ public BlockInfoStripedUnderConstruction convertToBlockUnderConstruction(
+ BlockUCState s, DatanodeStorageInfo[] targets) {
- final BlockInfoUnderConstructionStriped ucBlock;
++ final BlockInfoStripedUnderConstruction ucBlock;
+ if(isComplete()) {
- ucBlock = new BlockInfoUnderConstructionStriped(this, ecPolicy,
++ ucBlock = new BlockInfoStripedUnderConstruction(this, ecPolicy,
+ s, targets);
+ ucBlock.setBlockCollection(getBlockCollection());
+ } else {
+ // the block is already under construction
- ucBlock = (BlockInfoUnderConstructionStriped) this;
++ ucBlock = (BlockInfoStripedUnderConstruction) this;
+ ucBlock.setBlockUCState(s);
+ ucBlock.setExpectedLocations(targets);
+ ucBlock.setBlockCollection(getBlockCollection());
+ }
+ return ucBlock;
+ }
+
+ @Override
+ final boolean hasNoStorage() {
+ final int len = getCapacity();
+ for(int idx = 0; idx < len; idx++) {
+ if (getStorageInfo(idx) != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
index 0000000,0000000..9de8294
new file mode 100644
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
@@@ -1,0 -1,0 +1,297 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hdfs.server.blockmanagement;
++
++import org.apache.hadoop.hdfs.protocol.Block;
++import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
++import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
++import org.apache.hadoop.hdfs.server.namenode.NameNode;
++import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
++
++import java.io.IOException;
++
++import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.COMPLETE;
++import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
++
++/**
++ * Represents a striped block that is currently being constructed.
++ * This is usually the last block of a file opened for write or append.
++ */
++public class BlockInfoStripedUnderConstruction extends BlockInfoStriped
++ implements BlockInfoUnderConstruction{
++ private BlockUCState blockUCState;
++
++ /**
++ * Block replicas as assigned when the block was allocated.
++ */
++ private ReplicaUnderConstruction[] replicas;
++
++ /**
++ * Index of the primary data node doing the recovery. Useful for log
++ * messages.
++ */
++ private int primaryNodeIndex = -1;
++
++ /**
++ * The new generation stamp, which this block will have
++ * after the recovery succeeds. Also used as a recovery id to identify
++ * the right recovery if any of the abandoned recoveries re-appear.
++ */
++ private long blockRecoveryId = 0;
++
++ /**
++ * Constructor with null storage targets.
++ */
++ public BlockInfoStripedUnderConstruction(Block blk, ErasureCodingPolicy ecPolicy) {
++ this(blk, ecPolicy, UNDER_CONSTRUCTION, null);
++ }
++
++ /**
++ * Create a striped block that is currently being constructed.
++ */
++ public BlockInfoStripedUnderConstruction(Block blk, ErasureCodingPolicy ecPolicy,
++ BlockUCState state, DatanodeStorageInfo[] targets) {
++ super(blk, ecPolicy);
++ assert getBlockUCState() != COMPLETE :
++ "BlockInfoStripedUnderConstruction cannot be in COMPLETE state";
++ this.blockUCState = state;
++ setExpectedLocations(targets);
++ }
++
++ @Override
++ public BlockInfoStriped convertToCompleteBlock() throws IOException {
++ assert getBlockUCState() != COMPLETE :
++ "Trying to convert a COMPLETE block";
++ return new BlockInfoStriped(this);
++ }
++
++ /** Set expected locations */
++ @Override
++ public void setExpectedLocations(DatanodeStorageInfo[] targets) {
++ int numLocations = targets == null ? 0 : targets.length;
++ this.replicas = new ReplicaUnderConstruction[numLocations];
++ for(int i = 0; i < numLocations; i++) {
++ // when creating a new block we simply sequentially assign block index to
++ // each storage
++ Block blk = new Block(this.getBlockId() + i, 0, this.getGenerationStamp());
++ replicas[i] = new ReplicaUnderConstruction(blk, targets[i],
++ ReplicaState.RBW);
++ }
++ }
++
++ /**
++ * Create array of expected replica locations
++ * (as has been assigned by chooseTargets()).
++ */
++ @Override
++ public DatanodeStorageInfo[] getExpectedStorageLocations() {
++ int numLocations = getNumExpectedLocations();
++ DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
++ for (int i = 0; i < numLocations; i++) {
++ storages[i] = replicas[i].getExpectedStorageLocation();
++ }
++ return storages;
++ }
++
++ /** @return the index array indicating the block index in each storage */
++ public int[] getBlockIndices() {
++ int numLocations = getNumExpectedLocations();
++ int[] indices = new int[numLocations];
++ for (int i = 0; i < numLocations; i++) {
++ indices[i] = BlockIdManager.getBlockIndex(replicas[i]);
++ }
++ return indices;
++ }
++
++ @Override
++ public int getNumExpectedLocations() {
++ return replicas == null ? 0 : replicas.length;
++ }
++
++ /**
++ * Return the state of the block under construction.
++ * @see BlockUCState
++ */
++ @Override // BlockInfo
++ public BlockUCState getBlockUCState() {
++ return blockUCState;
++ }
++
++ void setBlockUCState(BlockUCState s) {
++ blockUCState = s;
++ }
++
++ @Override
++ public long getBlockRecoveryId() {
++ return blockRecoveryId;
++ }
++
++ @Override
++ public Block getTruncateBlock() {
++ return null;
++ }
++
++ @Override
++ public Block toBlock(){
++ return this;
++ }
++
++ @Override
++ public void setGenerationStampAndVerifyReplicas(long genStamp) {
++ // Set the generation stamp for the block.
++ setGenerationStamp(genStamp);
++ if (replicas == null)
++ return;
++
++ // Remove the replicas with wrong gen stamp.
++ // The replica list is unchanged.
++ for (ReplicaUnderConstruction r : replicas) {
++ if (genStamp != r.getGenerationStamp()) {
++ r.getExpectedStorageLocation().removeBlock(this);
++ NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
++ + "from location: {}", r.getExpectedStorageLocation());
++ }
++ }
++ }
++
++ @Override
++ public void commitBlock(Block block) throws IOException {
++ if (getBlockId() != block.getBlockId()) {
++ throw new IOException("Trying to commit inconsistent block: id = "
++ + block.getBlockId() + ", expected id = " + getBlockId());
++ }
++ blockUCState = BlockUCState.COMMITTED;
++ this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
++ // Sort out invalid replicas.
++ setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
++ }
++
++ @Override
++ public void initializeBlockRecovery(long recoveryId) {
++ setBlockUCState(BlockUCState.UNDER_RECOVERY);
++ blockRecoveryId = recoveryId;
++ if (replicas == null || replicas.length == 0) {
++ NameNode.blockStateChangeLog.warn("BLOCK*" +
++ " BlockInfoStripedUnderConstruction.initLeaseRecovery:" +
++ " No blocks found, lease removed.");
++ // sets primary node index and return.
++ primaryNodeIndex = -1;
++ return;
++ }
++ boolean allLiveReplicasTriedAsPrimary = true;
++ for (ReplicaUnderConstruction replica : replicas) {
++ // Check if all replicas have been tried or not.
++ if (replica.isAlive()) {
++ allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary &&
++ replica.getChosenAsPrimary());
++ }
++ }
++ if (allLiveReplicasTriedAsPrimary) {
++ // Just set all the replicas to be chosen whether they are alive or not.
++ for (ReplicaUnderConstruction replica : replicas) {
++ replica.setChosenAsPrimary(false);
++ }
++ }
++ long mostRecentLastUpdate = 0;
++ ReplicaUnderConstruction primary = null;
++ primaryNodeIndex = -1;
++ for(int i = 0; i < replicas.length; i++) {
++ // Skip alive replicas which have been chosen for recovery.
++ if (!(replicas[i].isAlive() && !replicas[i].getChosenAsPrimary())) {
++ continue;
++ }
++ final ReplicaUnderConstruction ruc = replicas[i];
++ final long lastUpdate = ruc.getExpectedStorageLocation()
++ .getDatanodeDescriptor().getLastUpdateMonotonic();
++ if (lastUpdate > mostRecentLastUpdate) {
++ primaryNodeIndex = i;
++ primary = ruc;
++ mostRecentLastUpdate = lastUpdate;
++ }
++ }
++ if (primary != null) {
++ primary.getExpectedStorageLocation().getDatanodeDescriptor()
++ .addBlockToBeRecovered(this);
++ primary.setChosenAsPrimary(true);
++ NameNode.blockStateChangeLog.info(
++ "BLOCK* {} recovery started, primary={}", this, primary);
++ }
++ }
++
++ @Override
++ public void addReplicaIfNotPresent(DatanodeStorageInfo storage,
++ Block reportedBlock, ReplicaState rState) {
++ if (replicas == null) {
++ replicas = new ReplicaUnderConstruction[1];
++ replicas[0] = new ReplicaUnderConstruction(reportedBlock, storage, rState);
++ } else {
++ for (int i = 0; i < replicas.length; i++) {
++ DatanodeStorageInfo expected = replicas[i].getExpectedStorageLocation();
++ if (expected == storage) {
++ replicas[i].setBlockId(reportedBlock.getBlockId());
++ replicas[i].setGenerationStamp(reportedBlock.getGenerationStamp());
++ return;
++ } else if (expected != null && expected.getDatanodeDescriptor() ==
++ storage.getDatanodeDescriptor()) {
++ // The Datanode reported that the block is on a different storage
++ // than the one chosen by BlockPlacementPolicy. This can occur as
++ // we allow Datanodes to choose the target storage. Update our
++ // state by removing the stale entry and adding a new one.
++ replicas[i] = new ReplicaUnderConstruction(reportedBlock, storage,
++ rState);
++ return;
++ }
++ }
++ ReplicaUnderConstruction[] newReplicas =
++ new ReplicaUnderConstruction[replicas.length + 1];
++ System.arraycopy(replicas, 0, newReplicas, 0, replicas.length);
++ newReplicas[newReplicas.length - 1] = new ReplicaUnderConstruction(
++ reportedBlock, storage, rState);
++ replicas = newReplicas;
++ }
++ }
++
++ @Override
++ public String toString() {
++ final StringBuilder b = new StringBuilder(100);
++ appendStringTo(b);
++ return b.toString();
++ }
++
++ @Override
++ public void appendStringTo(StringBuilder sb) {
++ super.appendStringTo(sb);
++ appendUCParts(sb);
++ }
++
++ private void appendUCParts(StringBuilder sb) {
++ sb.append("{UCState=").append(blockUCState).
++ append(", primaryNodeIndex=").append(primaryNodeIndex).
++ append(", replicas=[");
++ if (replicas != null) {
++ int i = 0;
++ for (ReplicaUnderConstruction r : replicas) {
++ r.appendStringTo(sb);
++ if (++i < replicas.length) {
++ sb.append(", ");
++ }
++ }
++ }
++ sb.append("]}");
++ }
++}