You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/30 17:42:04 UTC

[04/50] [abbrv] hadoop git commit: Merge commit '456e901a4c5c639267ee87b8e5f1319f256d20c2' (HDFS-6407. Add sorting and pagination in the datanode tab of the NN Web UI. Contributed by Haohui Mai.) into HDFS-7285-merge

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index b71e59e,0000000..4ca8fe6
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@@ -1,653 -1,0 +1,653 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hdfs;
 +
 +import java.io.IOException;
 +import java.io.InterruptedIOException;
 +import java.nio.ByteBuffer;
 +import java.nio.channels.ClosedChannelException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.EnumSet;
 +import java.util.List;
 +import java.util.concurrent.BlockingQueue;
 +import java.util.concurrent.LinkedBlockingQueue;
 +
 +import org.apache.hadoop.HadoopIllegalArgumentException;
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.fs.CreateFlag;
 +import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 +import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 +import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 +import org.apache.hadoop.io.MultipleIOException;
 +import org.apache.hadoop.io.erasurecode.CodecUtil;
 +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 +import org.apache.hadoop.util.DataChecksum;
 +import org.apache.hadoop.util.Progressable;
 +import org.apache.htrace.Sampler;
 +import org.apache.htrace.Trace;
 +import org.apache.htrace.TraceScope;
 +
 +import com.google.common.base.Preconditions;
 +
 +
 +/**
 + * This class supports writing files in striped layout and erasure coded format.
 + * Each stripe contains a sequence of cells.
 + */
 +@InterfaceAudience.Private
 +public class DFSStripedOutputStream extends DFSOutputStream {
 +  static class MultipleBlockingQueue<T> {
 +    private final List<BlockingQueue<T>> queues;
 +
 +    MultipleBlockingQueue(int numQueue, int queueSize) {
 +      queues = new ArrayList<>(numQueue);
 +      for (int i = 0; i < numQueue; i++) {
 +        queues.add(new LinkedBlockingQueue<T>(queueSize));
 +      }
 +    }
 +
 +    boolean isEmpty() {
 +      for(int i = 0; i < queues.size(); i++) {
 +        if (!queues.get(i).isEmpty()) {
 +          return false;
 +        }
 +      }
 +      return true;
 +    }
 +
 +    int numQueues() {
 +      return queues.size();
 +    }
 +
 +    void offer(int i, T object) {
 +      final boolean b = queues.get(i).offer(object);
 +      Preconditions.checkState(b, "Failed to offer " + object
 +          + " to queue, i=" + i);
 +    }
 +
 +    T take(int i) throws InterruptedIOException {
 +      try {
 +        return queues.get(i).take();
 +      } catch(InterruptedException ie) {
 +        throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, ie);
 +      }
 +    }
 +
 +    T poll(int i) {
 +      return queues.get(i).poll();
 +    }
 +
 +    T peek(int i) {
 +      return queues.get(i).peek();
 +    }
 +  }
 +
 +  /** Coordinate the communication between the streamers. */
 +  class Coordinator {
 +    private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
 +    private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
 +
 +    private final MultipleBlockingQueue<LocatedBlock> newBlocks;
 +    private final MultipleBlockingQueue<ExtendedBlock> updateBlocks;
 +
 +    Coordinator(final DfsClientConf conf, final int numDataBlocks,
 +        final int numAllBlocks) {
 +      followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
 +      endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1);
 +
 +      newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
 +      updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
 +    }
 +
 +    MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
 +      return followingBlocks;
 +    }
 +
 +    MultipleBlockingQueue<LocatedBlock> getNewBlocks() {
 +      return newBlocks;
 +    }
 +
 +    MultipleBlockingQueue<ExtendedBlock> getUpdateBlocks() {
 +      return updateBlocks;
 +    }
 +
 +    StripedDataStreamer getStripedDataStreamer(int i) {
 +      return DFSStripedOutputStream.this.getStripedDataStreamer(i);
 +    }
 +
 +    void offerEndBlock(int i, ExtendedBlock block) {
 +      endBlocks.offer(i, block);
 +    }
 +
 +    ExtendedBlock takeEndBlock(int i) throws InterruptedIOException {
 +      return endBlocks.take(i);
 +    }
 +
 +    boolean hasAllEndBlocks() {
 +      for(int i = 0; i < endBlocks.numQueues(); i++) {
 +        if (endBlocks.peek(i) == null) {
 +          return false;
 +        }
 +      }
 +      return true;
 +    }
 +
 +    void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
 +      ExtendedBlock b = endBlocks.peek(i);
 +      if (b == null) {
 +        // streamer just has failed, put end block and continue
 +        b = block;
 +        offerEndBlock(i, b);
 +      }
 +      b.setNumBytes(newBytes);
 +    }
 +
 +    /** @return a block representing the entire block group. */
 +    ExtendedBlock getBlockGroup() {
 +      final StripedDataStreamer s0 = getStripedDataStreamer(0);
 +      final ExtendedBlock b0 = s0.getBlock();
 +      if (b0 == null) {
 +        return null;
 +      }
 +
 +      final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0;
 +      final ExtendedBlock block = new ExtendedBlock(b0);
 +      long numBytes = b0.getNumBytes();
 +      for (int i = 1; i < numDataBlocks; i++) {
 +        final StripedDataStreamer si = getStripedDataStreamer(i);
 +        final ExtendedBlock bi = si.getBlock();
 +        if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) {
 +          block.setGenerationStamp(bi.getGenerationStamp());
 +        }
 +        numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock();
 +      }
 +      block.setNumBytes(numBytes);
 +      if (LOG.isDebugEnabled()) {
 +        LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes());
 +      }
 +      return block;
 +    }
 +  }
 +
 +  /** Buffers for writing the data and parity cells of a stripe. */
 +  class CellBuffers {
 +    private final ByteBuffer[] buffers;
 +    private final byte[][] checksumArrays;
 +
 +    CellBuffers(int numParityBlocks) throws InterruptedException{
 +      if (cellSize % bytesPerChecksum != 0) {
 +        throw new HadoopIllegalArgumentException("Invalid values: "
 +            + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (="
 +            + bytesPerChecksum + ") must divide cell size (=" + cellSize + ").");
 +      }
 +
 +      checksumArrays = new byte[numParityBlocks][];
 +      final int size = getChecksumSize() * (cellSize / bytesPerChecksum);
 +      for (int i = 0; i < checksumArrays.length; i++) {
 +        checksumArrays[i] = new byte[size];
 +      }
 +
 +      buffers = new ByteBuffer[numAllBlocks];
 +      for (int i = 0; i < buffers.length; i++) {
 +        buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
 +      }
 +    }
 +
 +    private ByteBuffer[] getBuffers() {
 +      return buffers;
 +    }
 +
 +    byte[] getChecksumArray(int i) {
 +      return checksumArrays[i - numDataBlocks];
 +    }
 +
 +    private int addTo(int i, byte[] b, int off, int len) {
 +      final ByteBuffer buf = buffers[i];
 +      final int pos = buf.position() + len;
 +      Preconditions.checkState(pos <= cellSize);
 +      buf.put(b, off, len);
 +      return pos;
 +    }
 +
 +    private void clear() {
 +      for (int i = 0; i< numAllBlocks; i++) {
 +        buffers[i].clear();
 +        if (i >= numDataBlocks) {
 +          Arrays.fill(buffers[i].array(), (byte) 0);
 +        }
 +      }
 +    }
 +
 +    private void release() {
 +      for (int i = 0; i < numAllBlocks; i++) {
 +        byteArrayManager.release(buffers[i].array());
 +      }
 +    }
 +
 +    private void flipDataBuffers() {
 +      for (int i = 0; i < numDataBlocks; i++) {
 +        buffers[i].flip();
 +      }
 +    }
 +  }
 +
 +  private final Coordinator coordinator;
 +  private final CellBuffers cellBuffers;
 +  private final RawErasureEncoder encoder;
 +  private final List<StripedDataStreamer> streamers;
 +  private final DFSPacket[] currentPackets; // current Packet of each streamer
 +
 +  /** Size of each striping cell, must be a multiple of bytesPerChecksum */
 +  private final int cellSize;
 +  private final int numAllBlocks;
 +  private final int numDataBlocks;
 +
 +  @Override
 +  ExtendedBlock getBlock() {
 +    return coordinator.getBlockGroup();
 +  }
 +
 +  /** Construct a new output stream for creating a file. */
 +  DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
 +                         EnumSet<CreateFlag> flag, Progressable progress,
 +                         DataChecksum checksum, String[] favoredNodes)
 +                         throws IOException {
 +    super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
 +    if (LOG.isDebugEnabled()) {
 +      LOG.debug("Creating DFSStripedOutputStream for " + src);
 +    }
 +
 +    final ErasureCodingPolicy ecPolicy = stat.getErasureCodingPolicy();
 +    final int numParityBlocks = ecPolicy.getNumParityUnits();
 +    cellSize = ecPolicy.getCellSize();
 +    numDataBlocks = ecPolicy.getNumDataUnits();
 +    numAllBlocks = numDataBlocks + numParityBlocks;
 +
 +    encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
 +        numDataBlocks, numParityBlocks);
 +
 +    coordinator = new Coordinator(dfsClient.getConf(),
 +        numDataBlocks, numAllBlocks);
 +    try {
 +      cellBuffers = new CellBuffers(numParityBlocks);
 +    } catch (InterruptedException ie) {
 +      throw DFSUtil.toInterruptedIOException(
 +          "Failed to create cell buffers", ie);
 +    }
 +
 +    List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
 +    for (short i = 0; i < numAllBlocks; i++) {
 +      StripedDataStreamer streamer = new StripedDataStreamer(stat,
 +          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
 +          favoredNodes, i, coordinator);
 +      s.add(streamer);
 +    }
 +    streamers = Collections.unmodifiableList(s);
 +    currentPackets = new DFSPacket[streamers.size()];
 +    setCurrentStreamer(0);
 +  }
 +
 +  StripedDataStreamer getStripedDataStreamer(int i) {
 +    return streamers.get(i);
 +  }
 +
 +  int getCurrentIndex() {
 +    return getCurrentStreamer().getIndex();
 +  }
 +
 +  private synchronized StripedDataStreamer getCurrentStreamer() {
 +    return (StripedDataStreamer)streamer;
 +  }
 +
 +  private synchronized StripedDataStreamer setCurrentStreamer(int newIdx)
 +      throws IOException {
 +    // backup currentPacket for current streamer
 +    int oldIdx = streamers.indexOf(streamer);
 +    if (oldIdx >= 0) {
 +      currentPackets[oldIdx] = currentPacket;
 +    }
 +
 +    streamer = streamers.get(newIdx);
 +    currentPacket = currentPackets[newIdx];
 +    adjustChunkBoundary();
 +
 +    return getCurrentStreamer();
 +  }
 +
 +  /**
 +   * Encode the buffers, i.e. compute parities.
 +   *
 +   * @param buffers data buffers + parity buffers
 +   */
 +  private static void encode(RawErasureEncoder encoder, int numData,
 +      ByteBuffer[] buffers) {
 +    final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
 +    final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
 +    System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
 +    System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
 +
 +    encoder.encode(dataBuffers, parityBuffers);
 +  }
 +
 +
 +  private void checkStreamers() throws IOException {
 +    int count = 0;
 +    for(StripedDataStreamer s : streamers) {
 +      if (!s.isFailed()) {
 +        if (s.getBlock() != null) {
 +          s.getErrorState().initExternalError();
 +        }
 +        count++;
 +      }
 +    }
 +    if (LOG.isDebugEnabled()) {
 +      LOG.debug("checkStreamers: " + streamers);
 +      LOG.debug("count=" + count);
 +    }
 +    if (count < numDataBlocks) {
 +      throw new IOException("Failed: the number of remaining blocks = "
 +          + count + " < the number of data blocks = " + numDataBlocks);
 +    }
 +  }
 +
 +  private void handleStreamerFailure(String err,
 +                                     Exception e) throws IOException {
 +    LOG.warn("Failed: " + err + ", " + this, e);
 +    getCurrentStreamer().setFailed(true);
 +    checkStreamers();
 +    currentPacket = null;
 +  }
 +
 +  @Override
 +  protected synchronized void writeChunk(byte[] bytes, int offset, int len,
 +      byte[] checksum, int ckoff, int cklen) throws IOException {
 +    final int index = getCurrentIndex();
 +    final StripedDataStreamer current = getCurrentStreamer();
 +    final int pos = cellBuffers.addTo(index, bytes, offset, len);
 +    final boolean cellFull = pos == cellSize;
 +
 +    final long oldBytes = current.getBytesCurBlock();
 +    if (!current.isFailed()) {
 +      try {
 +        super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
 +      } catch(Exception e) {
 +        handleStreamerFailure("offset=" + offset + ", length=" + len, e);
 +      }
 +    }
 +
 +    if (current.isFailed()) {
 +      final long newBytes = oldBytes + len;
 +      coordinator.setBytesEndBlock(index, newBytes, current.getBlock());
 +      current.setBytesCurBlock(newBytes);
 +    }
 +
 +    // Two extra steps are needed when a striping cell is full:
 +    // 1. Forward the current index pointer
 +    // 2. Generate parity packets if a full stripe of data cells are present
 +    if (cellFull) {
 +      int next = index + 1;
 +      //When all data cells in a stripe are ready, we need to encode
 +      //them and generate some parity cells. These cells will be
 +      //converted to packets and put to their DataStreamer's queue.
 +      if (next == numDataBlocks) {
 +        cellBuffers.flipDataBuffers();
 +        writeParityCells();
 +        next = 0;
 +      }
 +      setCurrentStreamer(next);
 +    }
 +  }
 +
 +  private int stripeDataSize() {
 +    return numDataBlocks * cellSize;
 +  }
 +
 +  @Override
 +  public void hflush() {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public void hsync() {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  protected synchronized void start() {
 +    for (StripedDataStreamer streamer : streamers) {
 +      streamer.start();
 +    }
 +  }
 +
 +  @Override
 +  synchronized void abort() throws IOException {
 +    if (isClosed()) {
 +      return;
 +    }
 +    for (StripedDataStreamer streamer : streamers) {
 +      streamer.getLastException().set(new IOException("Lease timeout of "
 +          + (dfsClient.getConf().getHdfsTimeout()/1000) +
 +          " seconds expired."));
 +    }
 +    closeThreads(true);
 +    dfsClient.endFileLease(fileId);
 +  }
 +
 +  @Override
 +  boolean isClosed() {
 +    if (closed) {
 +      return true;
 +    }
 +    for(StripedDataStreamer s : streamers) {
 +      if (!s.streamerClosed()) {
 +        return false;
 +      }
 +    }
 +    return true;
 +  }
 +
 +  @Override
 +  protected void closeThreads(boolean force) throws IOException {
 +    final MultipleIOException.Builder b = new MultipleIOException.Builder();
 +    try {
 +      for (StripedDataStreamer streamer : streamers) {
 +        try {
 +          streamer.close(force);
 +          streamer.join();
 +          streamer.closeSocket();
 +        } catch (Exception e) {
 +          try {
 +            handleStreamerFailure("force=" + force, e);
 +          } catch (IOException ioe) {
 +            b.add(ioe);
 +          }
 +        } finally {
 +          streamer.setSocketToNull();
 +        }
 +      }
 +    } finally {
 +      setClosed();
 +    }
 +    final IOException ioe = b.build();
 +    if (ioe != null) {
 +      throw ioe;
 +    }
 +  }
 +
 +  /**
 +   * Simply add bytesCurBlock together. Note that this result is not accurately
 +   * the size of the block group.
 +   */
 +  private long getCurrentSumBytes() {
 +    long sum = 0;
 +    for (int i = 0; i < numDataBlocks; i++) {
 +      sum += streamers.get(i).getBytesCurBlock();
 +    }
 +    return sum;
 +  }
 +
 +  private void writeParityCellsForLastStripe() throws IOException {
 +    final long currentBlockGroupBytes = getCurrentSumBytes();
 +    if (currentBlockGroupBytes % stripeDataSize() == 0) {
 +      return;
 +    }
 +
 +    final int firstCellSize =
 +        (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize);
 +    final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize?
 +        firstCellSize : cellSize;
 +    final ByteBuffer[] buffers = cellBuffers.getBuffers();
 +
 +    for (int i = 0; i < numAllBlocks; i++) {
 +      // Pad zero bytes to make all cells exactly the size of parityCellSize
 +      // If internal block is smaller than parity block, pad zero bytes.
 +      // Also pad zero bytes to all parity cells
 +      final int position = buffers[i].position();
 +      assert position <= parityCellSize : "If an internal block is smaller" +
 +          " than parity block, then its last cell should be small than last" +
 +          " parity cell";
 +      for (int j = 0; j < parityCellSize - position; j++) {
 +        buffers[i].put((byte) 0);
 +      }
 +      buffers[i].flip();
 +    }
 +
 +    writeParityCells();
 +  }
 +
 +  void writeParityCells() throws IOException {
 +    final ByteBuffer[] buffers = cellBuffers.getBuffers();
 +    //encode the data cells
 +    encode(encoder, numDataBlocks, buffers);
 +    for (int i = numDataBlocks; i < numAllBlocks; i++) {
 +      writeParity(i, buffers[i], cellBuffers.getChecksumArray(i));
 +    }
 +    cellBuffers.clear();
 +  }
 +
 +  void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf
 +      ) throws IOException {
 +    final StripedDataStreamer current = setCurrentStreamer(index);
 +    final int len = buffer.limit();
 +
 +    final long oldBytes = current.getBytesCurBlock();
 +    if (!current.isFailed()) {
 +      try {
 +        DataChecksum sum = getDataChecksum();
 +        sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
 +        for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
 +          int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
 +          int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
 +          super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset,
 +              getChecksumSize());
 +        }
 +      } catch(Exception e) {
 +        handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
 +      }
 +    }
 +
 +    if (current.isFailed()) {
 +      final long newBytes = oldBytes + len;
 +      current.setBytesCurBlock(newBytes);
 +    }
 +  }
 +
 +  @Override
 +  void setClosed() {
 +    super.setClosed();
 +    for (int i = 0; i < numAllBlocks; i++) {
 +      streamers.get(i).release();
 +    }
 +    cellBuffers.release();
 +  }
 +
 +  @Override
 +  protected synchronized void closeImpl() throws IOException {
 +    if (isClosed()) {
 +      final MultipleIOException.Builder b = new MultipleIOException.Builder();
 +      for(int i = 0; i < streamers.size(); i++) {
 +        final StripedDataStreamer si = getStripedDataStreamer(i);
 +        try {
 +          si.getLastException().check(true);
 +        } catch (IOException e) {
 +          b.add(e);
 +        }
 +      }
 +      final IOException ioe = b.build();
 +      if (ioe != null) {
 +        throw ioe;
 +      }
 +      return;
 +    }
 +
 +    try {
 +      // flush from all upper layers
 +      try {
 +        flushBuffer();
 +        // if the last stripe is incomplete, generate and write parity cells
 +        writeParityCellsForLastStripe();
 +        enqueueAllCurrentPackets();
 +      } catch(Exception e) {
 +        handleStreamerFailure("closeImpl", e);
 +      }
 +
 +      for (int i = 0; i < numAllBlocks; i++) {
 +        final StripedDataStreamer s = setCurrentStreamer(i);
 +        if (!s.isFailed()) {
 +          try {
 +            if (s.getBytesCurBlock() > 0) {
-               setCurrentPacket2Empty();
++              setCurrentPacketToEmpty();
 +            }
 +            // flush all data to Datanode
 +            flushInternal();
 +          } catch(Exception e) {
 +            handleStreamerFailure("closeImpl", e);
 +          }
 +        }
 +      }
 +
 +      closeThreads(false);
 +      final ExtendedBlock lastBlock = coordinator.getBlockGroup();
 +      TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
 +      try {
 +        completeFile(lastBlock);
 +      } finally {
 +        scope.close();
 +      }
 +      dfsClient.endFileLease(fileId);
 +    } catch (ClosedChannelException ignored) {
 +    } finally {
 +      setClosed();
 +    }
 +  }
 +
 +  private void enqueueAllCurrentPackets() throws IOException {
 +    int idx = streamers.indexOf(getCurrentStreamer());
 +    for(int i = 0; i < streamers.size(); i++) {
 +      setCurrentStreamer(i);
 +      if (currentPacket != null) {
 +        enqueueCurrentPacket();
 +      }
 +    }
 +    setCurrentStreamer(idx);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index d55d00b,8e81fdc..50a367a
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@@ -199,12 -200,7 +201,13 @@@ import org.apache.hadoop.hdfs.protocol.
  import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
  import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto;
  import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
 +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
 +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesResponseProto;
 +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneRequestProto;
 +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneResponseProto;
 +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneRequestProto;
 +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneResponseProto;
+ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index f2facd7,4ca5b26..c083b5e
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@@ -3124,193 -3067,7 +3141,194 @@@ public class PBHelper 
          setTotalRpcs(context.getTotalRpcs()).
          setCurRpc(context.getCurRpc()).
          setId(context.getReportId()).
+         setLeaseId(context.getLeaseId()).
          build();
    }
 +
 +  public static ECSchema convertECSchema(ECSchemaProto schema) {
 +    List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList();
 +    Map<String, String> options = new HashMap<>(optionsList.size());
 +    for (ECSchemaOptionEntryProto option : optionsList) {
 +      options.put(option.getKey(), option.getValue());
 +    }
 +    return new ECSchema(schema.getCodecName(), schema.getDataUnits(),
 +        schema.getParityUnits(), options);
 +  }
 +
 +  public static ECSchemaProto convertECSchema(ECSchema schema) {
 +    ECSchemaProto.Builder builder = ECSchemaProto.newBuilder()
 +        .setCodecName(schema.getCodecName())
 +        .setDataUnits(schema.getNumDataUnits())
 +        .setParityUnits(schema.getNumParityUnits());
 +    Set<Entry<String, String>> entrySet = schema.getExtraOptions().entrySet();
 +    for (Entry<String, String> entry : entrySet) {
 +      builder.addOptions(ECSchemaOptionEntryProto.newBuilder()
 +          .setKey(entry.getKey()).setValue(entry.getValue()).build());
 +    }
 +    return builder.build();
 +  }
 +
 +  public static ErasureCodingPolicy convertErasureCodingPolicy(
 +      ErasureCodingPolicyProto policy) {
 +    return new ErasureCodingPolicy(policy.getName(),
 +        convertECSchema(policy.getSchema()),
 +        policy.getCellSize());
 +  }
 +
 +  public static ErasureCodingPolicyProto convertErasureCodingPolicy(
 +      ErasureCodingPolicy policy) {
 +    ErasureCodingPolicyProto.Builder builder = ErasureCodingPolicyProto
 +        .newBuilder()
 +        .setName(policy.getName())
 +        .setSchema(convertECSchema(policy.getSchema()))
 +        .setCellSize(policy.getCellSize());
 +    return builder.build();
 +  }
 +
 +  public static ErasureCodingZoneProto convertErasureCodingZone(
 +      ErasureCodingZone ecZone) {
 +    return ErasureCodingZoneProto.newBuilder().setDir(ecZone.getDir())
 +        .setEcPolicy(convertErasureCodingPolicy(ecZone.getErasureCodingPolicy()))
 +        .build();
 +  }
 +
 +  public static ErasureCodingZone convertErasureCodingZone(
 +      ErasureCodingZoneProto ecZoneProto) {
 +    return new ErasureCodingZone(ecZoneProto.getDir(),
 +        convertErasureCodingPolicy(ecZoneProto.getEcPolicy()));
 +  }
 +  
 +  public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
 +      BlockECRecoveryInfoProto blockEcRecoveryInfoProto) {
 +    ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock();
 +    ExtendedBlock block = convert(blockProto);
 +
 +    DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto
 +        .getSourceDnInfos();
 +    DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto);
 +
 +    DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto
 +        .getTargetDnInfos();
 +    DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto);
 +
 +    StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto
 +        .getTargetStorageUuids();
 +    String[] targetStorageUuids = convert(targetStorageUuidsProto);
 +
 +    StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto
 +        .getTargetStorageTypes();
 +    StorageType[] convertStorageTypes = convertStorageTypes(
 +        targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto
 +            .getStorageTypesList().size());
 +
 +    List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto
 +        .getLiveBlockIndicesList();
 +    short[] liveBlkIndices = new short[liveBlockIndicesList.size()];
 +    for (int i = 0; i < liveBlockIndicesList.size(); i++) {
 +      liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue();
 +    }
 +
 +    ErasureCodingPolicy ecPolicy =
 +        convertErasureCodingPolicy(blockEcRecoveryInfoProto.getEcPolicy());
 +
 +    return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
 +        targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy);
 +  }
 +
 +  public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
 +      BlockECRecoveryInfo blockEcRecoveryInfo) {
 +    BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto
 +        .newBuilder();
 +    builder.setBlock(convert(blockEcRecoveryInfo.getExtendedBlock()));
 +
 +    DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos();
 +    builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
 +
 +    DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos();
 +    builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
 +
 +    String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs();
 +    builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs));
 +
 +    StorageType[] targetStorageTypes = blockEcRecoveryInfo
 +        .getTargetStorageTypes();
 +    builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
 +
 +    short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
 +    builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
 +
 +    builder.setEcPolicy(convertErasureCodingPolicy(blockEcRecoveryInfo
 +        .getErasureCodingPolicy()));
 +
 +    return builder.build();
 +  }
 +
 +  private static List<Integer> convertIntArray(short[] liveBlockIndices) {
 +    List<Integer> liveBlockIndicesList = new ArrayList<Integer>();
 +    for (short s : liveBlockIndices) {
 +      liveBlockIndicesList.add((int) s);
 +    }
 +    return liveBlockIndicesList;
 +  }
 +
 +  private static StorageTypesProto convertStorageTypesProto(
 +      StorageType[] targetStorageTypes) {
 +    StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
 +    for (StorageType storageType : targetStorageTypes) {
 +      builder.addStorageTypes(convertStorageType(storageType));
 +    }
 +    return builder.build();
 +  }
 +
 +  private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) {
 +    StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder();
 +    for (String storageUuid : targetStorageIDs) {
 +      builder.addStorageUuids(storageUuid);
 +    }
 +    return builder.build();
 +  }
 +
 +  private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) {
 +    DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder();
 +    for (DatanodeInfo datanodeInfo : dnInfos) {
 +      builder.addDatanodes(convert(datanodeInfo));
 +    }
 +    return builder.build();
 +  }
 +
 +  private static String[] convert(StorageUuidsProto targetStorageUuidsProto) {
 +    List<String> storageUuidsList = targetStorageUuidsProto
 +        .getStorageUuidsList();
 +    String[] storageUuids = new String[storageUuidsList.size()];
 +    for (int i = 0; i < storageUuidsList.size(); i++) {
 +      storageUuids[i] = storageUuidsList.get(i);
 +    }
 +    return storageUuids;
 +  }
 +  
 +  public static BlockECRecoveryCommandProto convert(
 +      BlockECRecoveryCommand blkECRecoveryCmd) {
 +    BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto
 +        .newBuilder();
 +    Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd
 +        .getECTasks();
 +    for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) {
 +      builder
 +          .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo));
 +    }
 +    return builder.build();
 +  }
 +  
 +  public static BlockECRecoveryCommand convert(
 +      BlockECRecoveryCommandProto blkECRecoveryCmdProto) {
 +    Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<BlockECRecoveryInfo>();
 +    List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto
 +        .getBlockECRecoveryinfoList();
 +    for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) {
 +      blkECRecoveryInfos
 +          .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto));
 +    }
 +    return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
 +        blkECRecoveryInfos);
 +  }
  }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 930001a,f9847ca..555f506
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@@ -291,10 -310,7 +316,8 @@@ public class Dispatcher 
  
      /** Dispatch the move to the proxy source & wait for the response. */
      private void dispatch() {
-       if (LOG.isDebugEnabled()) {
-         LOG.debug("Start moving " + this);
-       }
+       LOG.info("Start moving " + this);
 +      assert !(reportedBlock instanceof DBlockStriped);
  
        Socket sock = new Socket();
        DataOutputStream out = null;
@@@ -323,7 -339,8 +346,8 @@@
  
          sendRequest(out, eb, accessToken);
          receiveResponse(in);
 -        nnc.getBytesMoved().addAndGet(block.getNumBytes());
 +        nnc.getBytesMoved().addAndGet(reportedBlock.getNumBytes());
+         target.getDDatanode().setHasSuccess();
          LOG.info("Successfully moved " + this);
        } catch (IOException e) {
          LOG.warn("Failed to move " + this + ": " + e.getMessage());
@@@ -656,29 -650,25 +695,39 @@@
       * @return the total size of the received blocks in the number of bytes.
       */
      private long getBlockList() throws IOException {
-       final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
+       final long size = Math.min(getBlocksSize, blocksToReceive);
 -      final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
 +      final BlocksWithLocations newBlksLocs =
 +          nnc.getBlocks(getDatanodeInfo(), size);
  
+       if (LOG.isTraceEnabled()) {
+         LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
+             + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2)
 -            + ") returns " + newBlocks.getBlocks().length + " blocks.");
++            + ") returns " + newBlksLocs.getBlocks().length + " blocks.");
+       }
+ 
        long bytesReceived = 0;
 -      for (BlockWithLocations blk : newBlocks.getBlocks()) {
 +      for (BlockWithLocations blkLocs : newBlksLocs.getBlocks()) {
+         // Skip small blocks.
 -        if (blk.getBlock().getNumBytes() < getBlocksMinBlockSize) {
++        if (blkLocs.getBlock().getNumBytes() < getBlocksMinBlockSize) {
+           continue;
+         }
  
 -        bytesReceived += blk.getBlock().getNumBytes();
 +        DBlock block;
 +        if (blkLocs instanceof StripedBlockWithLocations) {
 +          StripedBlockWithLocations sblkLocs =
 +              (StripedBlockWithLocations) blkLocs;
 +          // approximate size
 +          bytesReceived += sblkLocs.getBlock().getNumBytes() /
 +              sblkLocs.getDataBlockNum();
 +          block = new DBlockStriped(sblkLocs.getBlock(), sblkLocs.getIndices(),
 +              sblkLocs.getDataBlockNum());
 +        } else{
 +          bytesReceived += blkLocs.getBlock().getNumBytes();
 +          block = new DBlock(blkLocs.getBlock());
 +        }
 +
          synchronized (globalBlocks) {
 -          final DBlock block = globalBlocks.get(blk.getBlock());
 +          block = globalBlocks.putIfAbsent(blkLocs.getBlock(), block);
            synchronized (block) {
              block.clearLocations();
  
@@@ -944,8 -954,22 +1015,21 @@@
      return new DDatanode(datanode, maxConcurrentMovesPerNode);
    }
  
+ 
    public void executePendingMove(final PendingMove p) {
 -    // move the block
 +    // move the reportedBlock
+     final DDatanode targetDn = p.target.getDDatanode();
+     ExecutorService moveExecutor = targetDn.getMoveExecutor();
+     if (moveExecutor == null) {
+       final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
+       if (nThreads > 0) {
+         moveExecutor = targetDn.initMoveExecutor(nThreads);
+       }
+     }
+     if (moveExecutor == null) {
+       LOG.warn("No mover threads available: skip moving " + p);
+       return;
+     }
 -
      moveExecutor.execute(new Runnable() {
        @Override
        public void run() {
@@@ -996,11 -1020,8 +1080,8 @@@
      return getBytesMoved() - bytesLastMoved;
    }
  
-   /** The sleeping period before checking if reportedBlock move is completed again */
-   static private long blockMoveWaitTime = 30000L;
- 
    /**
 -   * Wait for all block move confirmations.
 +   * Wait for all reportedBlock move confirmations.
     * @return true if there is failed move execution
     */
    public static boolean waitForMoveCompletion(
@@@ -1027,10 -1048,22 +1108,22 @@@
    }
  
    /**
+    * @return true if some moves are success.
+    */
+   public static boolean checkForSuccess(
+       Iterable<? extends StorageGroup> targets) {
+     boolean hasSuccess = false;
+     for (StorageGroup t : targets) {
+       hasSuccess |= t.getDDatanode().hasSuccess;
+     }
+     return hasSuccess;
+   }
+ 
+   /**
 -   * Decide if the block is a good candidate to be moved from source to target.
 -   * A block is a good candidate if
 +   * Decide if the block/blockGroup is a good candidate to be moved from source
 +   * to target. A block is a good candidate if
     * 1. the block is not in the process of being moved/has not been moved;
 -   * 2. the block does not have a replica on the target;
 +   * 2. the block does not have a replica/internalBlock on the target;
     * 3. doing the move does not reduce the number of racks that the block has
     */
    private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index 4308278,dea31c4..bf11914
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@@ -19,19 -19,21 +19,20 @@@ package org.apache.hadoop.hdfs.server.b
  
  import java.util.LinkedList;
  
 -import org.apache.hadoop.classification.InterfaceAudience;
  import org.apache.hadoop.hdfs.protocol.Block;
- import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
  import org.apache.hadoop.util.LightWeightGSet;
  
  /**
 - * BlockInfo class maintains for a given block
 - * the {@link BlockCollection} it is part of and datanodes where the replicas of
 - * the block are stored.
 + * For a given block (or an erasure coding block group), BlockInfo class
 + * maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes
 + * where the replicas of the block, or blocks belonging to the erasure coding
 + * block group, are stored.
   */
 -@InterfaceAudience.Private
 -public abstract class  BlockInfo extends Block
 +public abstract class BlockInfo extends Block
      implements LightWeightGSet.LinkedElement {
    public static final BlockInfo[] EMPTY_ARRAY = {};
+ 
    private BlockCollection bc;
  
    /** For implementing {@link LightWeightGSet.LinkedElement} interface */
@@@ -177,27 -188,7 +178,12 @@@
     */
    abstract void replaceBlock(BlockInfo newBlock);
  
 +  public abstract boolean isStriped();
 +
 +  /** @return true if there is no datanode storage associated with the block */
 +  abstract boolean hasNoStorage();
 +
    /**
-    * Find specified DatanodeDescriptor.
-    * @return index or -1 if not found.
-    */
-   boolean findDatanode(DatanodeDescriptor dn) {
-     int len = getCapacity();
-     for (int idx = 0; idx < len; idx++) {
-       DatanodeDescriptor cur = getDatanode(idx);
-       if(cur == dn) {
-         return true;
-       }
-     }
-     return false;
-   }
- 
-   /**
     * Find specified DatanodeStorageInfo.
     * @return DatanodeStorageInfo or null if not found.
     */
@@@ -303,27 -294,43 +289,26 @@@
  
    /**
     * BlockInfo represents a block that is not being constructed.
 -   * In order to start modifying the block, the BlockInfo should be converted
 -   * to {@link BlockInfoContiguousUnderConstruction}.
 +   * In order to start modifying the block, the BlockInfo should be converted to
-    * {@link BlockInfoUnderConstructionContiguous} or
-    * {@link BlockInfoUnderConstructionStriped}.
-    * @return {@link HdfsServerConstants.BlockUCState#COMPLETE}
++   * {@link BlockInfoContiguousUnderConstruction} or
++   * {@link BlockInfoStripedUnderConstruction}.
+    * @return {@link BlockUCState#COMPLETE}
     */
-   public HdfsServerConstants.BlockUCState getBlockUCState() {
-     return HdfsServerConstants.BlockUCState.COMPLETE;
+   public BlockUCState getBlockUCState() {
+     return BlockUCState.COMPLETE;
    }
  
    /**
     * Is this block complete?
     *
-    * @return true if the state of the block is
-    *         {@link HdfsServerConstants.BlockUCState#COMPLETE}
+    * @return true if the state of the block is {@link BlockUCState#COMPLETE}
     */
    public boolean isComplete() {
-     return getBlockUCState().equals(HdfsServerConstants.BlockUCState.COMPLETE);
+     return getBlockUCState().equals(BlockUCState.COMPLETE);
    }
  
 -  /**
 -   * Convert a complete block to an under construction block.
 -   * @return BlockInfoUnderConstruction -  an under construction block.
 -   */
 -  public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction(
 -      BlockUCState s, DatanodeStorageInfo[] targets) {
 -    if(isComplete()) {
 -      BlockInfoContiguousUnderConstruction ucBlock =
 -          new BlockInfoContiguousUnderConstruction(this,
 -          getBlockCollection().getPreferredBlockReplication(), s, targets);
 -      ucBlock.setBlockCollection(getBlockCollection());
 -      return ucBlock;
 -    }
 -    // the block is already under construction
 -    BlockInfoContiguousUnderConstruction ucBlock =
 -        (BlockInfoContiguousUnderConstruction)this;
 -    ucBlock.setBlockUCState(s);
 -    ucBlock.setExpectedLocations(targets);
 -    ucBlock.setBlockCollection(getBlockCollection());
 -    return ucBlock;
 +  public boolean isDeleted() {
 +    return (bc == null);
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index d9adccc,eff89a8..bb9bf5b
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@@ -122,36 -120,4 +122,36 @@@ public class BlockInfoContiguous extend
            "newBlock already exists.";
      }
    }
 +
 +  /**
 +   * Convert a complete block to an under construction block.
 +   * @return BlockInfoUnderConstruction -  an under construction block.
 +   */
-   public BlockInfoUnderConstructionContiguous convertToBlockUnderConstruction(
++  public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction(
 +      BlockUCState s, DatanodeStorageInfo[] targets) {
 +    if(isComplete()) {
-       BlockInfoUnderConstructionContiguous ucBlock =
-           new BlockInfoUnderConstructionContiguous(this,
++      BlockInfoContiguousUnderConstruction ucBlock =
++          new BlockInfoContiguousUnderConstruction(this,
 +          getBlockCollection().getPreferredBlockReplication(), s, targets);
 +      ucBlock.setBlockCollection(getBlockCollection());
 +      return ucBlock;
 +    }
 +    // the block is already under construction
-     BlockInfoUnderConstructionContiguous ucBlock =
-         (BlockInfoUnderConstructionContiguous) this;
++    BlockInfoContiguousUnderConstruction ucBlock =
++        (BlockInfoContiguousUnderConstruction) this;
 +    ucBlock.setBlockUCState(s);
 +    ucBlock.setExpectedLocations(targets);
 +    ucBlock.setBlockCollection(getBlockCollection());
 +    return ucBlock;
 +  }
 +
 +  @Override
 +  public final boolean isStriped() {
 +    return false;
 +  }
 +
 +  @Override
 +  final boolean hasNoStorage() {
 +    return getStorageInfo(0) == null;
 +  }
  }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
index 0000000,7ca6419..96b209d
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
@@@ -1,0 -1,403 +1,281 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hdfs.server.blockmanagement;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Iterator;
+ import java.util.List;
+ 
+ import org.apache.hadoop.hdfs.protocol.Block;
+ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+ import org.apache.hadoop.hdfs.server.namenode.NameNode;
+ 
+ /**
+  * Represents a block that is currently being constructed.<br>
+  * This is usually the last block of a file opened for write or append.
+  */
 -public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
++public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous
++    implements BlockInfoUnderConstruction{
+   /** Block state. See {@link BlockUCState} */
+   private BlockUCState blockUCState;
+ 
+   /**
+    * Block replicas as assigned when the block was allocated.
+    * This defines the pipeline order.
+    */
+   private List<ReplicaUnderConstruction> replicas;
+ 
+   /**
+    * Index of the primary data node doing the recovery. Useful for log
+    * messages.
+    */
+   private int primaryNodeIndex = -1;
+ 
+   /**
+    * The new generation stamp, which this block will have
+    * after the recovery succeeds. Also used as a recovery id to identify
+    * the right recovery if any of the abandoned recoveries re-appear.
+    */
+   private long blockRecoveryId = 0;
+ 
+   /**
+    * The block source to use in the event of copy-on-write truncate.
+    */
+   private Block truncateBlock;
+ 
+   /**
 -   * ReplicaUnderConstruction contains information about replicas while
 -   * they are under construction.
 -   * The GS, the length and the state of the replica is as reported by 
 -   * the data-node.
 -   * It is not guaranteed, but expected, that data-nodes actually have
 -   * corresponding replicas.
 -   */
 -  static class ReplicaUnderConstruction extends Block {
 -    private final DatanodeStorageInfo expectedLocation;
 -    private ReplicaState state;
 -    private boolean chosenAsPrimary;
 -
 -    ReplicaUnderConstruction(Block block,
 -                             DatanodeStorageInfo target,
 -                             ReplicaState state) {
 -      super(block);
 -      this.expectedLocation = target;
 -      this.state = state;
 -      this.chosenAsPrimary = false;
 -    }
 -
 -    /**
 -     * Expected block replica location as assigned when the block was allocated.
 -     * This defines the pipeline order.
 -     * It is not guaranteed, but expected, that the data-node actually has
 -     * the replica.
 -     */
 -    private DatanodeStorageInfo getExpectedStorageLocation() {
 -      return expectedLocation;
 -    }
 -
 -    /**
 -     * Get replica state as reported by the data-node.
 -     */
 -    ReplicaState getState() {
 -      return state;
 -    }
 -
 -    /**
 -     * Whether the replica was chosen for recovery.
 -     */
 -    boolean getChosenAsPrimary() {
 -      return chosenAsPrimary;
 -    }
 -
 -    /**
 -     * Set replica state.
 -     */
 -    void setState(ReplicaState s) {
 -      state = s;
 -    }
 -
 -    /**
 -     * Set whether this replica was chosen for recovery.
 -     */
 -    void setChosenAsPrimary(boolean chosenAsPrimary) {
 -      this.chosenAsPrimary = chosenAsPrimary;
 -    }
 -
 -    /**
 -     * Is data-node the replica belongs to alive.
 -     */
 -    boolean isAlive() {
 -      return expectedLocation.getDatanodeDescriptor().isAlive;
 -    }
 -
 -    @Override // Block
 -    public int hashCode() {
 -      return super.hashCode();
 -    }
 -
 -    @Override // Block
 -    public boolean equals(Object obj) {
 -      // Sufficient to rely on super's implementation
 -      return (this == obj) || super.equals(obj);
 -    }
 -
 -    @Override
 -    public String toString() {
 -      final StringBuilder b = new StringBuilder(50);
 -      appendStringTo(b);
 -      return b.toString();
 -    }
 -    
 -    @Override
 -    public void appendStringTo(StringBuilder sb) {
 -      sb.append("ReplicaUC[")
 -        .append(expectedLocation)
 -        .append("|")
 -        .append(state)
 -        .append("]");
 -    }
 -  }
 -
 -  /**
+    * Create block and set its state to
+    * {@link BlockUCState#UNDER_CONSTRUCTION}.
+    */
+   public BlockInfoContiguousUnderConstruction(Block blk, short replication) {
+     this(blk, replication, BlockUCState.UNDER_CONSTRUCTION, null);
+   }
+ 
+   /**
+    * Create a block that is currently being constructed.
+    */
+   public BlockInfoContiguousUnderConstruction(Block blk, short replication,
+       BlockUCState state, DatanodeStorageInfo[] targets) {
+     super(blk, replication);
+     assert getBlockUCState() != BlockUCState.COMPLETE :
 -      "BlockInfoUnderConstruction cannot be in COMPLETE state";
++      "BlockInfoContiguousUnderConstruction cannot be in COMPLETE state";
+     this.blockUCState = state;
+     setExpectedLocations(targets);
+   }
+ 
 -  /**
 -   * Convert an under construction block to a complete block.
 -   * 
 -   * @return BlockInfo - a complete block.
 -   * @throws IOException if the state of the block 
 -   * (the generation stamp and the length) has not been committed by 
 -   * the client or it does not have at least a minimal number of replicas 
 -   * reported from data-nodes. 
 -   */
 -  BlockInfo convertToCompleteBlock() throws IOException {
++  @Override
++  public BlockInfoContiguous convertToCompleteBlock() throws IOException {
+     assert getBlockUCState() != BlockUCState.COMPLETE :
+       "Trying to convert a COMPLETE block";
+     return new BlockInfoContiguous(this);
+   }
+ 
 -  /** Set expected locations */
++  @Override
+   public void setExpectedLocations(DatanodeStorageInfo[] targets) {
+     int numLocations = targets == null ? 0 : targets.length;
 -    this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
 -    for(int i = 0; i < numLocations; i++)
 -      replicas.add(
 -        new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW));
++    this.replicas = new ArrayList<>(numLocations);
++    for(int i = 0; i < numLocations; i++) {
++      replicas.add(new ReplicaUnderConstruction(this, targets[i],
++          ReplicaState.RBW));
++    }
+   }
+ 
 -  /**
 -   * Create array of expected replica locations
 -   * (as has been assigned by chooseTargets()).
 -   */
++  @Override
+   public DatanodeStorageInfo[] getExpectedStorageLocations() {
+     int numLocations = replicas == null ? 0 : replicas.size();
+     DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
 -    for(int i = 0; i < numLocations; i++)
++    for (int i = 0; i < numLocations; i++) {
+       storages[i] = replicas.get(i).getExpectedStorageLocation();
++    }
+     return storages;
+   }
+ 
 -  /** Get the number of expected locations */
++  @Override
+   public int getNumExpectedLocations() {
+     return replicas == null ? 0 : replicas.size();
+   }
+ 
+   /**
+    * Return the state of the block under construction.
+    * @see BlockUCState
+    */
+   @Override // BlockInfo
+   public BlockUCState getBlockUCState() {
+     return blockUCState;
+   }
+ 
+   void setBlockUCState(BlockUCState s) {
+     blockUCState = s;
+   }
+ 
 -  /** Get block recovery ID */
++  @Override
+   public long getBlockRecoveryId() {
+     return blockRecoveryId;
+   }
+ 
 -  /** Get recover block */
++  @Override
+   public Block getTruncateBlock() {
+     return truncateBlock;
+   }
+ 
++  @Override
++  public Block toBlock(){
++    return this;
++  }
++
+   public void setTruncateBlock(Block recoveryBlock) {
+     this.truncateBlock = recoveryBlock;
+   }
+ 
 -  /**
 -   * Process the recorded replicas. When about to commit or finish the
 -   * pipeline recovery sort out bad replicas.
 -   * @param genStamp  The final generation stamp for the block.
 -   */
++  @Override
+   public void setGenerationStampAndVerifyReplicas(long genStamp) {
+     // Set the generation stamp for the block.
+     setGenerationStamp(genStamp);
+     if (replicas == null)
+       return;
+ 
+     // Remove the replicas with wrong gen stamp.
+     // The replica list is unchanged.
+     for (ReplicaUnderConstruction r : replicas) {
+       if (genStamp != r.getGenerationStamp()) {
+         r.getExpectedStorageLocation().removeBlock(this);
+         NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica "
+             + "from location: {}", r.getExpectedStorageLocation());
+       }
+     }
+   }
+ 
 -  /**
 -   * Commit block's length and generation stamp as reported by the client.
 -   * Set block state to {@link BlockUCState#COMMITTED}.
 -   * @param block - contains client reported block length and generation 
 -   * @throws IOException if block ids are inconsistent.
 -   */
 -  void commitBlock(Block block) throws IOException {
++  @Override
++  public void commitBlock(Block block) throws IOException {
+     if(getBlockId() != block.getBlockId())
+       throw new IOException("Trying to commit inconsistent block: id = "
+           + block.getBlockId() + ", expected id = " + getBlockId());
+     blockUCState = BlockUCState.COMMITTED;
+     this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
+     // Sort out invalid replicas.
+     setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
+   }
+ 
 -  /**
 -   * Initialize lease recovery for this block.
 -   * Find the first alive data-node starting from the previous primary and
 -   * make it primary.
 -   */
++  @Override
+   public void initializeBlockRecovery(long recoveryId) {
+     setBlockUCState(BlockUCState.UNDER_RECOVERY);
+     blockRecoveryId = recoveryId;
+     if (replicas.size() == 0) {
+       NameNode.blockStateChangeLog.warn("BLOCK*"
 -        + " BlockInfoUnderConstruction.initLeaseRecovery:"
++        + " BlockInfoContiguousUnderConstruction.initLeaseRecovery:"
+         + " No blocks found, lease removed.");
+     }
+     boolean allLiveReplicasTriedAsPrimary = true;
 -    for (int i = 0; i < replicas.size(); i++) {
++    for (ReplicaUnderConstruction replica : replicas) {
+       // Check if all replicas have been tried or not.
 -      if (replicas.get(i).isAlive()) {
 -        allLiveReplicasTriedAsPrimary =
 -            (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary());
++      if (replica.isAlive()) {
++        allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary &&
++            replica.getChosenAsPrimary());
+       }
+     }
+     if (allLiveReplicasTriedAsPrimary) {
+       // Just set all the replicas to be chosen whether they are alive or not.
 -      for (int i = 0; i < replicas.size(); i++) {
 -        replicas.get(i).setChosenAsPrimary(false);
++      for (ReplicaUnderConstruction replica : replicas) {
++        replica.setChosenAsPrimary(false);
+       }
+     }
+     long mostRecentLastUpdate = 0;
+     ReplicaUnderConstruction primary = null;
+     primaryNodeIndex = -1;
+     for(int i = 0; i < replicas.size(); i++) {
+       // Skip alive replicas which have been chosen for recovery.
+       if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) {
+         continue;
+       }
+       final ReplicaUnderConstruction ruc = replicas.get(i);
+       final long lastUpdate = ruc.getExpectedStorageLocation()
+           .getDatanodeDescriptor().getLastUpdateMonotonic();
+       if (lastUpdate > mostRecentLastUpdate) {
+         primaryNodeIndex = i;
+         primary = ruc;
+         mostRecentLastUpdate = lastUpdate;
+       }
+     }
+     if (primary != null) {
 -      primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this);
++      primary.getExpectedStorageLocation().getDatanodeDescriptor()
++          .addBlockToBeRecovered(this);
+       primary.setChosenAsPrimary(true);
+       NameNode.blockStateChangeLog.debug(
+           "BLOCK* {} recovery started, primary={}", this, primary);
+     }
+   }
+ 
 -  void addReplicaIfNotPresent(DatanodeStorageInfo storage,
 -                     Block block,
 -                     ReplicaState rState) {
++  @Override
++  public void addReplicaIfNotPresent(DatanodeStorageInfo storage,
++      Block block, ReplicaState rState) {
+     Iterator<ReplicaUnderConstruction> it = replicas.iterator();
+     while (it.hasNext()) {
+       ReplicaUnderConstruction r = it.next();
+       DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation();
+       if(expectedLocation == storage) {
+         // Record the gen stamp from the report
+         r.setGenerationStamp(block.getGenerationStamp());
+         return;
+       } else if (expectedLocation != null &&
+                  expectedLocation.getDatanodeDescriptor() ==
+                      storage.getDatanodeDescriptor()) {
+ 
+         // The Datanode reported that the block is on a different storage
+         // than the one chosen by BlockPlacementPolicy. This can occur as
+         // we allow Datanodes to choose the target storage. Update our
+         // state by removing the stale entry and adding a new one.
+         it.remove();
+         break;
+       }
+     }
+     replicas.add(new ReplicaUnderConstruction(block, storage, rState));
+   }
+ 
 -  @Override // BlockInfo
 -  // BlockInfoUnderConstruction participates in maps the same way as BlockInfo
 -  public int hashCode() {
 -    return super.hashCode();
 -  }
 -
 -  @Override // BlockInfo
 -  public boolean equals(Object obj) {
 -    // Sufficient to rely on super's implementation
 -    return (this == obj) || super.equals(obj);
 -  }
 -
+   @Override
+   public String toString() {
+     final StringBuilder b = new StringBuilder(100);
+     appendStringTo(b);
+     return b.toString();
+   }
+ 
+   @Override
+   public void appendStringTo(StringBuilder sb) {
+     super.appendStringTo(sb);
+     appendUCParts(sb);
+   }
+ 
+   private void appendUCParts(StringBuilder sb) {
+     sb.append("{UCState=").append(blockUCState)
+       .append(", truncateBlock=" + truncateBlock)
+       .append(", primaryNodeIndex=").append(primaryNodeIndex)
+       .append(", replicas=[");
+     if (replicas != null) {
+       Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
+       if (iter.hasNext()) {
+         iter.next().appendStringTo(sb);
+         while (iter.hasNext()) {
+           sb.append(", ");
+           iter.next().appendStringTo(sb);
+         }
+       }
+     }
+     sb.append("]}");
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index b88b554,0000000..14d2fcc
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@@ -1,279 -1,0 +1,279 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hdfs.server.blockmanagement;
 +
 +import org.apache.hadoop.hdfs.protocol.Block;
 +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 +import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 +
 +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
 +
 +/**
 + * Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
 + *
 + * We still use triplets to store DatanodeStorageInfo for each block in the
 + * block group, as well as the previous/next block in the corresponding
 + * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
 + * are sorted and strictly mapped to the corresponding block.
 + *
 + * Normally each block belonging to group is stored in only one DataNode.
 + * However, it is possible that some block is over-replicated. Thus the triplet
 + * array's size can be larger than (m+k). Thus currently we use an extra byte
 + * array to record the block index for each triplet.
 + */
 +public class BlockInfoStriped extends BlockInfo {
 +  private final ErasureCodingPolicy ecPolicy;
 +  /**
 +   * Always the same size with triplets. Record the block index for each triplet
 +   * TODO: actually this is only necessary for over-replicated block. Thus can
 +   * be further optimized to save memory usage.
 +   */
 +  private byte[] indices;
 +
 +  public BlockInfoStriped(Block blk, ErasureCodingPolicy ecPolicy) {
 +    super(blk, (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()));
 +    indices = new byte[ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()];
 +    initIndices();
 +    this.ecPolicy = ecPolicy;
 +  }
 +
 +  BlockInfoStriped(BlockInfoStriped b) {
 +    this(b, b.getErasureCodingPolicy());
 +    this.setBlockCollection(b.getBlockCollection());
 +  }
 +
 +  public short getTotalBlockNum() {
 +    return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
 +  }
 +
 +  public short getDataBlockNum() {
 +    return (short) ecPolicy.getNumDataUnits();
 +  }
 +
 +  public short getParityBlockNum() {
 +    return (short) ecPolicy.getNumParityUnits();
 +  }
 +
 +  /**
 +   * If the block is committed/completed and its length is less than a full
 +   * stripe, it returns the the number of actual data blocks.
 +   * Otherwise it returns the number of data units specified by erasure coding policy.
 +   */
 +  public short getRealDataBlockNum() {
 +    if (isComplete() || getBlockUCState() == BlockUCState.COMMITTED) {
 +      return (short) Math.min(getDataBlockNum(),
 +          (getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1);
 +    } else {
 +      return getDataBlockNum();
 +    }
 +  }
 +
 +  public short getRealTotalBlockNum() {
 +    return (short) (getRealDataBlockNum() + getParityBlockNum());
 +  }
 +
 +  public ErasureCodingPolicy getErasureCodingPolicy() {
 +    return ecPolicy;
 +  }
 +
 +  private void initIndices() {
 +    for (int i = 0; i < indices.length; i++) {
 +      indices[i] = -1;
 +    }
 +  }
 +
 +  private int findSlot() {
 +    int i = getTotalBlockNum();
 +    for (; i < getCapacity(); i++) {
 +      if (getStorageInfo(i) == null) {
 +        return i;
 +      }
 +    }
 +    // need to expand the triplet size
 +    ensureCapacity(i + 1, true);
 +    return i;
 +  }
 +
 +  @Override
 +  boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
 +    int blockIndex = BlockIdManager.getBlockIndex(reportedBlock);
 +    int index = blockIndex;
 +    DatanodeStorageInfo old = getStorageInfo(index);
 +    if (old != null && !old.equals(storage)) { // over replicated
 +      // check if the storage has been stored
 +      int i = findStorageInfo(storage);
 +      if (i == -1) {
 +        index = findSlot();
 +      } else {
 +        return true;
 +      }
 +    }
 +    addStorage(storage, index, blockIndex);
 +    return true;
 +  }
 +
 +  private void addStorage(DatanodeStorageInfo storage, int index,
 +      int blockIndex) {
 +    setStorageInfo(index, storage);
 +    setNext(index, null);
 +    setPrevious(index, null);
 +    indices[index] = (byte) blockIndex;
 +  }
 +
 +  private int findStorageInfoFromEnd(DatanodeStorageInfo storage) {
 +    final int len = getCapacity();
 +    for(int idx = len - 1; idx >= 0; idx--) {
 +      DatanodeStorageInfo cur = getStorageInfo(idx);
 +      if (storage.equals(cur)) {
 +        return idx;
 +      }
 +    }
 +    return -1;
 +  }
 +
 +  int getStorageBlockIndex(DatanodeStorageInfo storage) {
 +    int i = this.findStorageInfo(storage);
 +    return i == -1 ? -1 : indices[i];
 +  }
 +
 +  /**
 +   * Identify the block stored in the given datanode storage. Note that
 +   * the returned block has the same block Id with the one seen/reported by the
 +   * DataNode.
 +   */
 +  Block getBlockOnStorage(DatanodeStorageInfo storage) {
 +    int index = getStorageBlockIndex(storage);
 +    if (index < 0) {
 +      return null;
 +    } else {
 +      Block block = new Block(this);
 +      block.setBlockId(this.getBlockId() + index);
 +      return block;
 +    }
 +  }
 +
 +  @Override
 +  boolean removeStorage(DatanodeStorageInfo storage) {
 +    int dnIndex = findStorageInfoFromEnd(storage);
 +    if (dnIndex < 0) { // the node is not found
 +      return false;
 +    }
 +    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
 +        "Block is still in the list and must be removed first.";
 +    // set the triplet to null
 +    setStorageInfo(dnIndex, null);
 +    setNext(dnIndex, null);
 +    setPrevious(dnIndex, null);
 +    indices[dnIndex] = -1;
 +    return true;
 +  }
 +
 +  private void ensureCapacity(int totalSize, boolean keepOld) {
 +    if (getCapacity() < totalSize) {
 +      Object[] old = triplets;
 +      byte[] oldIndices = indices;
 +      triplets = new Object[totalSize * 3];
 +      indices = new byte[totalSize];
 +      initIndices();
 +
 +      if (keepOld) {
 +        System.arraycopy(old, 0, triplets, 0, old.length);
 +        System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
 +      }
 +    }
 +  }
 +
 +  @Override
 +  void replaceBlock(BlockInfo newBlock) {
 +    assert newBlock instanceof BlockInfoStriped;
 +    BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock;
 +    final int size = getCapacity();
 +    newBlockGroup.ensureCapacity(size, false);
 +    for (int i = 0; i < size; i++) {
 +      final DatanodeStorageInfo storage = this.getStorageInfo(i);
 +      if (storage != null) {
 +        final int blockIndex = indices[i];
 +        final boolean removed = storage.removeBlock(this);
 +        assert removed : "currentBlock not found.";
 +
 +        newBlockGroup.addStorage(storage, i, blockIndex);
 +        storage.insertToList(newBlockGroup);
 +      }
 +    }
 +  }
 +
 +  public long spaceConsumed() {
 +    // In case striped blocks, total usage by this striped blocks should
 +    // be the total of data blocks and parity blocks because
 +    // `getNumBytes` is the total of actual data block size.
 +    return StripedBlockUtil.spaceConsumedByStripedBlock(getNumBytes(),
 +        ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits(),
 +        BLOCK_STRIPED_CELL_SIZE);
 +    }
 +
 +  @Override
 +  public final boolean isStriped() {
 +    return true;
 +  }
 +
 +  @Override
 +  public int numNodes() {
 +    assert this.triplets != null : "BlockInfo is not initialized";
 +    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
 +    int num = 0;
 +    for (int idx = getCapacity()-1; idx >= 0; idx--) {
 +      if (getStorageInfo(idx) != null) {
 +        num++;
 +      }
 +    }
 +    return num;
 +  }
 +
 +  /**
 +   * Convert a complete block to an under construction block.
 +   * @return BlockInfoUnderConstruction -  an under construction block.
 +   */
-   public BlockInfoUnderConstructionStriped convertToBlockUnderConstruction(
++  public BlockInfoStripedUnderConstruction convertToBlockUnderConstruction(
 +      BlockUCState s, DatanodeStorageInfo[] targets) {
-     final BlockInfoUnderConstructionStriped ucBlock;
++    final BlockInfoStripedUnderConstruction ucBlock;
 +    if(isComplete()) {
-       ucBlock = new BlockInfoUnderConstructionStriped(this, ecPolicy,
++      ucBlock = new BlockInfoStripedUnderConstruction(this, ecPolicy,
 +          s, targets);
 +      ucBlock.setBlockCollection(getBlockCollection());
 +    } else {
 +      // the block is already under construction
-       ucBlock = (BlockInfoUnderConstructionStriped) this;
++      ucBlock = (BlockInfoStripedUnderConstruction) this;
 +      ucBlock.setBlockUCState(s);
 +      ucBlock.setExpectedLocations(targets);
 +      ucBlock.setBlockCollection(getBlockCollection());
 +    }
 +    return ucBlock;
 +  }
 +
 +  @Override
 +  final boolean hasNoStorage() {
 +    final int len = getCapacity();
 +    for(int idx = 0; idx < len; idx++) {
 +      if (getStorageInfo(idx) != null) {
 +        return false;
 +      }
 +    }
 +    return true;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
index 0000000,0000000..9de8294
new file mode 100644
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
@@@ -1,0 -1,0 +1,297 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hdfs.server.blockmanagement;
++
++import org.apache.hadoop.hdfs.protocol.Block;
++import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
++import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
++import org.apache.hadoop.hdfs.server.namenode.NameNode;
++import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
++
++import java.io.IOException;
++
++import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.COMPLETE;
++import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
++
++/**
++ * Represents a striped block that is currently being constructed.
++ * This is usually the last block of a file opened for write or append.
++ */
++public class BlockInfoStripedUnderConstruction extends BlockInfoStriped
++    implements BlockInfoUnderConstruction{
++  private BlockUCState blockUCState;
++
++  /**
++   * Block replicas as assigned when the block was allocated.
++   */
++  private ReplicaUnderConstruction[] replicas;
++
++  /**
++   * Index of the primary data node doing the recovery. Useful for log
++   * messages.
++   */
++  private int primaryNodeIndex = -1;
++
++  /**
++   * The new generation stamp, which this block will have
++   * after the recovery succeeds. Also used as a recovery id to identify
++   * the right recovery if any of the abandoned recoveries re-appear.
++   */
++  private long blockRecoveryId = 0;
++
++  /**
++   * Constructor with null storage targets.
++   */
++  public BlockInfoStripedUnderConstruction(Block blk, ErasureCodingPolicy ecPolicy) {
++    this(blk, ecPolicy, UNDER_CONSTRUCTION, null);
++  }
++
++  /**
++   * Create a striped block that is currently being constructed.
++   */
++  public BlockInfoStripedUnderConstruction(Block blk, ErasureCodingPolicy ecPolicy,
++      BlockUCState state, DatanodeStorageInfo[] targets) {
++    super(blk, ecPolicy);
++    assert getBlockUCState() != COMPLETE :
++      "BlockInfoStripedUnderConstruction cannot be in COMPLETE state";
++    this.blockUCState = state;
++    setExpectedLocations(targets);
++  }
++
++  @Override
++  public BlockInfoStriped convertToCompleteBlock() throws IOException {
++    assert getBlockUCState() != COMPLETE :
++      "Trying to convert a COMPLETE block";
++    return new BlockInfoStriped(this);
++  }
++
++  /** Set expected locations */
++  @Override
++  public void setExpectedLocations(DatanodeStorageInfo[] targets) {
++    int numLocations = targets == null ? 0 : targets.length;
++    this.replicas = new ReplicaUnderConstruction[numLocations];
++    for(int i = 0; i < numLocations; i++) {
++      // when creating a new block we simply sequentially assign block index to
++      // each storage
++      Block blk = new Block(this.getBlockId() + i, 0, this.getGenerationStamp());
++      replicas[i] = new ReplicaUnderConstruction(blk, targets[i],
++          ReplicaState.RBW);
++    }
++  }
++
++  /**
++   * Create array of expected replica locations
++   * (as has been assigned by chooseTargets()).
++   */
++  @Override
++  public DatanodeStorageInfo[] getExpectedStorageLocations() {
++    int numLocations = getNumExpectedLocations();
++    DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
++    for (int i = 0; i < numLocations; i++) {
++      storages[i] = replicas[i].getExpectedStorageLocation();
++    }
++    return storages;
++  }
++
++  /** @return the index array indicating the block index in each storage */
++  public int[] getBlockIndices() {
++    int numLocations = getNumExpectedLocations();
++    int[] indices = new int[numLocations];
++    for (int i = 0; i < numLocations; i++) {
++      indices[i] = BlockIdManager.getBlockIndex(replicas[i]);
++    }
++    return indices;
++  }
++
++  @Override
++  public int getNumExpectedLocations() {
++    return replicas == null ? 0 : replicas.length;
++  }
++
++  /**
++   * Return the state of the block under construction.
++   * @see BlockUCState
++   */
++  @Override // BlockInfo
++  public BlockUCState getBlockUCState() {
++    return blockUCState;
++  }
++
++  void setBlockUCState(BlockUCState s) {
++    blockUCState = s;
++  }
++
++  @Override
++  public long getBlockRecoveryId() {
++    return blockRecoveryId;
++  }
++
++  @Override
++  public Block getTruncateBlock() {
++    return null;
++  }
++
++  @Override
++  public Block toBlock(){
++    return this;
++  }
++
++  @Override
++  public void setGenerationStampAndVerifyReplicas(long genStamp) {
++    // Set the generation stamp for the block.
++    setGenerationStamp(genStamp);
++    if (replicas == null)
++      return;
++
++    // Remove the replicas with wrong gen stamp.
++    // The replica list is unchanged.
++    for (ReplicaUnderConstruction r : replicas) {
++      if (genStamp != r.getGenerationStamp()) {
++        r.getExpectedStorageLocation().removeBlock(this);
++        NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
++            + "from location: {}", r.getExpectedStorageLocation());
++      }
++    }
++  }
++
++  @Override
++  public void commitBlock(Block block) throws IOException {
++    if (getBlockId() != block.getBlockId()) {
++      throw new IOException("Trying to commit inconsistent block: id = "
++          + block.getBlockId() + ", expected id = " + getBlockId());
++    }
++    blockUCState = BlockUCState.COMMITTED;
++    this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
++    // Sort out invalid replicas.
++    setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
++  }
++
++  @Override
++  public void initializeBlockRecovery(long recoveryId) {
++    setBlockUCState(BlockUCState.UNDER_RECOVERY);
++    blockRecoveryId = recoveryId;
++    if (replicas == null || replicas.length == 0) {
++      NameNode.blockStateChangeLog.warn("BLOCK*" +
++          " BlockInfoStripedUnderConstruction.initLeaseRecovery:" +
++          " No blocks found, lease removed.");
++      // sets primary node index and return.
++      primaryNodeIndex = -1;
++      return;
++    }
++    boolean allLiveReplicasTriedAsPrimary = true;
++    for (ReplicaUnderConstruction replica : replicas) {
++      // Check if all replicas have been tried or not.
++      if (replica.isAlive()) {
++        allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary &&
++            replica.getChosenAsPrimary());
++      }
++    }
++    if (allLiveReplicasTriedAsPrimary) {
++      // Just set all the replicas to be chosen whether they are alive or not.
++      for (ReplicaUnderConstruction replica : replicas) {
++        replica.setChosenAsPrimary(false);
++      }
++    }
++    long mostRecentLastUpdate = 0;
++    ReplicaUnderConstruction primary = null;
++    primaryNodeIndex = -1;
++    for(int i = 0; i < replicas.length; i++) {
++      // Skip alive replicas which have been chosen for recovery.
++      if (!(replicas[i].isAlive() && !replicas[i].getChosenAsPrimary())) {
++        continue;
++      }
++      final ReplicaUnderConstruction ruc = replicas[i];
++      final long lastUpdate = ruc.getExpectedStorageLocation()
++          .getDatanodeDescriptor().getLastUpdateMonotonic();
++      if (lastUpdate > mostRecentLastUpdate) {
++        primaryNodeIndex = i;
++        primary = ruc;
++        mostRecentLastUpdate = lastUpdate;
++      }
++    }
++    if (primary != null) {
++      primary.getExpectedStorageLocation().getDatanodeDescriptor()
++          .addBlockToBeRecovered(this);
++      primary.setChosenAsPrimary(true);
++      NameNode.blockStateChangeLog.info(
++          "BLOCK* {} recovery started, primary={}", this, primary);
++    }
++  }
++
++  @Override
++  public void addReplicaIfNotPresent(DatanodeStorageInfo storage,
++      Block reportedBlock, ReplicaState rState) {
++    if (replicas == null) {
++      replicas = new ReplicaUnderConstruction[1];
++      replicas[0] = new ReplicaUnderConstruction(reportedBlock, storage, rState);
++    } else {
++      for (int i = 0; i < replicas.length; i++) {
++        DatanodeStorageInfo expected = replicas[i].getExpectedStorageLocation();
++        if (expected == storage) {
++          replicas[i].setBlockId(reportedBlock.getBlockId());
++          replicas[i].setGenerationStamp(reportedBlock.getGenerationStamp());
++          return;
++        } else if (expected != null && expected.getDatanodeDescriptor() ==
++            storage.getDatanodeDescriptor()) {
++          // The Datanode reported that the block is on a different storage
++          // than the one chosen by BlockPlacementPolicy. This can occur as
++          // we allow Datanodes to choose the target storage. Update our
++          // state by removing the stale entry and adding a new one.
++          replicas[i] = new ReplicaUnderConstruction(reportedBlock, storage,
++              rState);
++          return;
++        }
++      }
++      ReplicaUnderConstruction[] newReplicas =
++          new ReplicaUnderConstruction[replicas.length + 1];
++      System.arraycopy(replicas, 0, newReplicas, 0, replicas.length);
++      newReplicas[newReplicas.length - 1] = new ReplicaUnderConstruction(
++          reportedBlock, storage, rState);
++      replicas = newReplicas;
++    }
++  }
++
++  @Override
++  public String toString() {
++    final StringBuilder b = new StringBuilder(100);
++    appendStringTo(b);
++    return b.toString();
++  }
++
++  @Override
++  public void appendStringTo(StringBuilder sb) {
++    super.appendStringTo(sb);
++    appendUCParts(sb);
++  }
++
++  private void appendUCParts(StringBuilder sb) {
++    sb.append("{UCState=").append(blockUCState).
++        append(", primaryNodeIndex=").append(primaryNodeIndex).
++        append(", replicas=[");
++    if (replicas != null) {
++      int i = 0;
++      for (ReplicaUnderConstruction r : replicas) {
++        r.appendStringTo(sb);
++        if (++i < replicas.length) {
++          sb.append(", ");
++        }
++      }
++    }
++    sb.append("]}");
++  }
++}