You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/04/13 23:08:32 UTC

[01/12] hadoop git commit: HDFS-7782. Erasure coding: pread from files in striped layout. Contributed by Zhe Zhang and Jing Zhao

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 21d0ce4cc -> 6e202bac1 (forced update)


HDFS-7782. Erasure coding: pread from files in striped layout. Contributed by Zhe Zhang and Jing Zhao


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ae8f0c12
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ae8f0c12
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ae8f0c12

Branch: refs/heads/HDFS-7285
Commit: ae8f0c12db2db9212e7e50a4219d24a3c27110e3
Parents: 38fa860
Author: Zhe Zhang <zh...@apache.org>
Authored: Tue Apr 7 11:20:13 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:12 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  55 +++
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   8 +-
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  79 +++-
 .../hadoop/hdfs/DFSStripedInputStream.java      | 367 +++++++++++++++++++
 .../hadoop/hdfs/protocol/HdfsConstants.java     |   2 +-
 .../hadoop/hdfs/protocol/LocatedBlock.java      |   4 +
 .../hdfs/protocol/LocatedStripedBlock.java      |   5 +
 .../blockmanagement/BlockInfoStriped.java       |   6 +-
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  92 ++++-
 .../apache/hadoop/hdfs/TestReadStripedFile.java | 304 +++++++++++++++
 .../namenode/TestRecoverStripedBlocks.java      |  88 +----
 11 files changed, 897 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index cd06e09..2ef1d36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -236,6 +236,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
       new DFSHedgedReadMetrics();
   private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
+  private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
   private final Sampler<?> traceSampler;
 
   public DfsClientConf getConf() {
@@ -376,6 +377,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     if (numThreads > 0) {
       this.initThreadsNumForHedgedReads(numThreads);
     }
+    numThreads = conf.getInt(
+        DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE,
+        DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE);
+    if (numThreads <= 0) {
+      LOG.warn("The value of "
+          + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE
+          + " must be greater than 0. The current setting is " + numThreads
+          + ". Reset it to the default value "
+          + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE);
+      numThreads =
+          DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE;
+    }
+    this.initThreadsNumForStripedReads(numThreads);
     this.saslClient = new SaslDataTransferClient(
       conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
       TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
@@ -3148,6 +3162,43 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  /**
+   * Create thread pool for parallel reading in striped layout,
+   * STRIPED_READ_THREAD_POOL, if it does not already exist.
+   * @param num Number of threads for striped reads thread pool.
+   */
+  private void initThreadsNumForStripedReads(int num) {
+    assert num > 0;
+    if (STRIPED_READ_THREAD_POOL != null) {
+      return;
+    }
+    synchronized (DFSClient.class) {
+      if (STRIPED_READ_THREAD_POOL == null) {
+        STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
+            TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+            new Daemon.DaemonFactory() {
+          private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+          @Override
+          public Thread newThread(Runnable r) {
+            Thread t = super.newThread(r);
+            t.setName("stripedRead-" + threadIndex.getAndIncrement());
+            return t;
+          }
+        }, new ThreadPoolExecutor.CallerRunsPolicy() {
+          @Override
+          public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
+            LOG.info("Execution for striped reading rejected, "
+                + "Executing in current thread");
+            // will run in the current thread
+            super.rejectedExecution(runnable, e);
+          }
+        });
+        STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
+      }
+    }
+  }
+
   long getHedgedReadTimeout() {
     return this.hedgedReadThresholdMillis;
   }
@@ -3161,6 +3212,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return HEDGED_READ_THREAD_POOL;
   }
 
+  ThreadPoolExecutor getStripedReadsThreadPool() {
+    return STRIPED_READ_THREAD_POOL;
+  }
+
   boolean isHedgedReadsEnabled() {
     return (HEDGED_READ_THREAD_POOL != null) &&
       HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 6185604..35e279d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -645,7 +645,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.namenode.reject-unresolved-dn-topology-mapping";
   public static final boolean DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT =
       false;
-  
+
+  public static final String DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE =
+      "dfs.client.striped.read.threadpool.size";
+  // With default 3+2 schema, each normal read could span 3 DNs. So this
+  // default value accommodates 6 read streams
+  public static final int DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE = 18;
+
   // Slow io warning log threshold settings for dfsclient and datanode.
   public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
     "dfs.datanode.slow.io.warning.threshold.ms";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index dd0f6fe..e94c41a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -43,6 +43,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
@@ -93,7 +94,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   @VisibleForTesting
   public static boolean tcpReadsDisabledForTesting = false;
   private long hedgedReadOpsLoopNumForTesting = 0;
-  private final DFSClient dfsClient;
+  protected final DFSClient dfsClient;
   private AtomicBoolean closed = new AtomicBoolean(false);
   private final String src;
   private final boolean verifyChecksum;
@@ -440,7 +441,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @return located block
    * @throws IOException
    */
-  private LocatedBlock getBlockAt(long offset) throws IOException {
+  protected LocatedBlock getBlockAt(long offset) throws IOException {
     synchronized(infoLock) {
       assert (locatedBlocks != null) : "locatedBlocks is null";
 
@@ -705,7 +706,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * Wraps different possible read implementations so that readBuffer can be
    * strategy-agnostic.
    */
-  private interface ReaderStrategy {
+  interface ReaderStrategy {
     public int doRead(BlockReader blockReader, int off, int len)
         throws ChecksumException, IOException;
   }
@@ -1048,7 +1049,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     return errMsgr.toString();
   }
 
-  private void fetchBlockByteRange(long blockStartOffset, long start, long end,
+  protected void fetchBlockByteRange(long blockStartOffset, long start, long end,
       byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
@@ -1090,13 +1091,42 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     };
   }
 
+  /**
+   * Used when reading contiguous blocks
+   */
   private void actualGetFromOneDataNode(final DNAddrPair datanode,
       long blockStartOffset, final long start, final long end, byte[] buf,
       int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
+    final int length = (int) (end - start + 1);
+    actualGetFromOneDataNode(datanode, block, start, end, buf,
+        new int[]{offset}, new int[]{length}, corruptedBlockMap);
+  }
+
+  /**
+   * Read data from one DataNode.
+   * @param datanode the datanode from which to read data
+   * @param block the block to read
+   * @param startInBlk the startInBlk offset of the block
+   * @param endInBlk the endInBlk offset of the block
+   * @param buf the given byte array into which the data is read
+   * @param offsets the data may be read into multiple segments of the buf
+   *                (when reading a striped block). this array indicates the
+   *                offset of each buf segment.
+   * @param lengths the length of each buf segment
+   * @param corruptedBlockMap map recording list of datanodes with corrupted
+   *                          block replica
+   */
+  void actualGetFromOneDataNode(final DNAddrPair datanode,
+      LocatedBlock block, final long startInBlk, final long endInBlk,
+      byte[] buf, int[] offsets, int[] lengths,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
     DFSClientFaultInjector.get().startFetchFromDatanode();
     int refetchToken = 1; // only need to get a new access token once
     int refetchEncryptionKey = 1; // only need to get a new encryption key once
+    final int len = (int) (endInBlk - startInBlk + 1);
+    checkReadPortions(offsets, lengths, len);
 
     while (true) {
       // cached block locations may have been updated by chooseDataNode()
@@ -1117,7 +1147,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       try {
         DFSClientFaultInjector.get().fetchFromDatanodeException();
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
-        int len = (int) (end - start + 1);
         reader = new BlockReaderFactory(dfsClient.getConf()).
             setInetSocketAddress(targetAddr).
             setRemotePeerFactory(dfsClient).
@@ -1126,7 +1155,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
             setFileName(src).
             setBlock(block.getBlock()).
             setBlockToken(blockToken).
-            setStartOffset(start).
+            setStartOffset(startInBlk).
             setVerifyChecksum(verifyChecksum).
             setClientName(dfsClient.clientName).
             setLength(len).
@@ -1136,12 +1165,14 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
             setUserGroupInformation(dfsClient.ugi).
             setConfiguration(dfsClient.getConfiguration()).
             build();
-        int nread = reader.readAll(buf, offset, len);
-        updateReadStatistics(readStatistics, nread, reader);
+        for (int i = 0; i < offsets.length; i++) {
+          int nread = reader.readAll(buf, offsets[i], lengths[i]);
+          updateReadStatistics(readStatistics, nread, reader);
 
-        if (nread != len) {
-          throw new IOException("truncated return from reader.read(): " +
-                                "excpected " + len + ", got " + nread);
+          if (nread != len) {
+            throw new IOException("truncated return from reader.read(): " +
+                "excpected " + len + ", got " + nread);
+          }
         }
         DFSClientFaultInjector.get().readFromDatanodeDelay();
         return;
@@ -1187,6 +1218,25 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /**
+   * This method verifies that the read portions are valid and do not overlap
+   * with each other.
+   */
+  private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
+    Preconditions.checkArgument(offsets.length == lengths.length &&
+        offsets.length > 0);
+    int sum = 0;
+    for (int i = 0; i < lengths.length; i++) {
+      if (i > 0) {
+        int gap = offsets[i] - offsets[i - 1];
+        // make sure read portions do not overlap with each other
+        Preconditions.checkArgument(gap >= lengths[i - 1]);
+      }
+      sum += lengths[i];
+    }
+    Preconditions.checkArgument(sum == totalLen);
+  }
+
+  /**
    * Like {@link #fetchBlockByteRange(LocatedBlock, long, long, byte[],
    * int, Map)} except we start up a second, parallel, 'hedged' read
    * if the first read is taking longer than configured amount of
@@ -1407,10 +1457,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       long targetStart = position - blk.getStartOffset();
       long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
       try {
-        if (dfsClient.isHedgedReadsEnabled()) {
+        if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
           hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
-              targetStart + bytesToRead - 1, buffer, offset,
-              corruptedBlockMap);
+              targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
         } else {
           fetchBlockByteRange(blk.getStartOffset(), targetStart,
               targetStart + bytesToRead - 1, buffer, offset,
@@ -1606,7 +1655,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /** Utility class to encapsulate data node info and its address. */
-  private static final class DNAddrPair {
+  static final class DNAddrPair {
     final DatanodeInfo info;
     final InetSocketAddress addr;
     final StorageType storageType;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
new file mode 100644
index 0000000..077b0f8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+
+/******************************************************************************
+ * DFSStripedInputStream reads from striped block groups, illustrated below:
+ *
+ * | <- Striped Block Group -> |
+ *  blk_0      blk_1       blk_2   <- A striped block group has
+ *    |          |           |          {@link #groupSize} blocks
+ *    v          v           v
+ * +------+   +------+   +------+
+ * |cell_0|   |cell_1|   |cell_2|  <- The logical read order should be
+ * +------+   +------+   +------+       cell_0, cell_1, ...
+ * |cell_3|   |cell_4|   |cell_5|
+ * +------+   +------+   +------+
+ * |cell_6|   |cell_7|   |cell_8|
+ * +------+   +------+   +------+
+ * |cell_9|
+ * +------+  <- A cell contains {@link #cellSize} bytes of data
+ *
+ * Three styles of read will eventually be supported:
+ *   1. Stateful read: TODO: HDFS-8033
+ *   2. pread without decode support
+ *     This is implemented by calculating the portion of read from each block and
+ *     issuing requests to each DataNode in parallel.
+ *   3. pread with decode support: TODO: will be supported after HDFS-7678
+ *****************************************************************************/
+public class DFSStripedInputStream extends DFSInputStream {
+  /**
+   * This method plans the read portion from each block in the stripe
+   * @param groupSize The size / width of the striping group
+   * @param cellSize The size of each striping cell
+   * @param startInBlk Starting offset in the striped block
+   * @param len Length of the read request
+   * @param bufOffset  Initial offset in the result buffer
+   * @return array of {@link ReadPortion}, each representing the portion of I/O
+   *         for an individual block in the group
+   */
+  @VisibleForTesting
+  static ReadPortion[] planReadPortions(final int groupSize,
+      final int cellSize, final long startInBlk, final int len, int bufOffset) {
+    ReadPortion[] results = new ReadPortion[groupSize];
+    for (int i = 0; i < groupSize; i++) {
+      results[i] = new ReadPortion();
+    }
+
+    // cellIdxInBlk is the index of the cell in the block
+    // E.g., cell_3 is the 2nd cell in blk_0
+    int cellIdxInBlk = (int) (startInBlk / (cellSize * groupSize));
+
+    // blkIdxInGroup is the index of the block in the striped block group
+    // E.g., blk_2 is the 3rd block in the group
+    final int blkIdxInGroup = (int) (startInBlk / cellSize % groupSize);
+    results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
+        startInBlk % cellSize;
+    boolean crossStripe = false;
+    for (int i = 1; i < groupSize; i++) {
+      if (blkIdxInGroup + i >= groupSize && !crossStripe) {
+        cellIdxInBlk++;
+        crossStripe = true;
+      }
+      results[(blkIdxInGroup + i) % groupSize].startOffsetInBlock =
+          cellSize * cellIdxInBlk;
+    }
+
+    int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
+    results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
+    results[blkIdxInGroup].lengths.add(firstCellLen);
+    results[blkIdxInGroup].readLength += firstCellLen;
+
+    int i = (blkIdxInGroup + 1) % groupSize;
+    for (int done = firstCellLen; done < len; done += cellSize) {
+      ReadPortion rp = results[i];
+      rp.offsetsInBuf.add(done + bufOffset);
+      final int readLen = Math.min(len - done, cellSize);
+      rp.lengths.add(readLen);
+      rp.readLength += readLen;
+      i = (i + 1) % groupSize;
+    }
+    return results;
+  }
+
+  /**
+   * This method parses a striped block group into individual blocks.
+   *
+   * @param bg The striped block group
+   * @param dataBlkNum the number of data blocks
+   * @return An array containing the blocks in the group
+   */
+  @VisibleForTesting
+  static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
+      int dataBlkNum, int cellSize) {
+    int locatedBGSize = bg.getBlockIndices().length;
+    // TODO not considering missing blocks for now, only identify data blocks
+    LocatedBlock[] lbs = new LocatedBlock[dataBlkNum];
+    for (short i = 0; i < locatedBGSize; i++) {
+      final int idx = bg.getBlockIndices()[i];
+      if (idx < dataBlkNum && lbs[idx] == null) {
+        lbs[idx] = constructInternalBlock(bg, i, cellSize, idx);
+      }
+    }
+    return lbs;
+  }
+
+  private static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
+      int idxInReturnedLocs, int cellSize, int idxInBlockGroup) {
+    final ExtendedBlock blk = new ExtendedBlock(bg.getBlock());
+    blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup);
+    // TODO: fix the numBytes computation
+
+    return new LocatedBlock(blk,
+        new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
+        new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
+        new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
+        bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
+        null);
+  }
+
+
+  private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
+
+  DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)
+      throws IOException {
+    super(dfsClient, src, verifyChecksum);
+    DFSClient.LOG.debug("Creating an striped input stream for file " + src);
+  }
+
+  @Override
+  public synchronized int read(final ByteBuffer buf) throws IOException {
+    throw new UnsupportedActionException("Stateful read is not supported");
+  }
+
+  @Override
+  public synchronized int read(final byte buf[], int off, int len)
+      throws IOException {
+    throw new UnsupportedActionException("Stateful read is not supported");
+  }
+
+  /**
+   * | <--------- LocatedStripedBlock (ID = 0) ---------> |
+   * LocatedBlock (0) | LocatedBlock (1) | LocatedBlock (2)
+   *                      ^
+   *                    offset
+   * On a striped file, the super method {@link DFSInputStream#getBlockAt}
+   * treats a striped block group as a single {@link LocatedBlock} object,
+   * which includes target in its range. This method adds the logic of:
+   *   1. Analyzing the index of required block based on offset
+   *   2. Parsing the block group to obtain the block location on that index
+   */
+  @Override
+  protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
+    LocatedBlock lb = super.getBlockAt(blkStartOffset);
+    assert lb instanceof LocatedStripedBlock : "NameNode should return a " +
+        "LocatedStripedBlock for a striped file";
+
+    int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize)
+        % groupSize);
+    // If indexing information is returned, iterate through the index array
+    // to find the entry for position idx in the group
+    LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
+    int i = 0;
+    for (; i < lsb.getBlockIndices().length; i++) {
+      if (lsb.getBlockIndices()[i] == idx) {
+        break;
+      }
+    }
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("getBlockAt for striped blocks, offset="
+          + blkStartOffset + ". Obtained block " + lb + ", idx=" + idx);
+    }
+    return constructInternalBlock(lsb, i, cellSize, idx);
+  }
+
+  private LocatedBlock getBlockGroupAt(long offset) throws IOException {
+    return super.getBlockAt(offset);
+  }
+
+  /**
+   * Real implementation of pread.
+   */
+  @Override
+  protected void fetchBlockByteRange(LocatedBlock block, long start,
+      long end, byte[] buf, int offset,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
+    Map<Future<Void>, Integer> futures = new HashMap<>();
+    CompletionService<Void> stripedReadsService =
+        new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
+    int len = (int) (end - start + 1);
+
+    // Refresh the striped block group
+    block = getBlockGroupAt(block.getStartOffset());
+    assert block instanceof LocatedStripedBlock : "NameNode" +
+        " should return a LocatedStripedBlock for a striped file";
+    LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
+
+    // Planning the portion of I/O for each shard
+    ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, start,
+        len, offset);
+
+    // Parse group to get chosen DN location
+    LocatedBlock[] blks = parseStripedBlockGroup(blockGroup, groupSize, cellSize);
+
+    for (short i = 0; i < groupSize; i++) {
+      ReadPortion rp = readPortions[i];
+      if (rp.readLength <= 0) {
+        continue;
+      }
+      DatanodeInfo loc = blks[i].getLocations()[0];
+      StorageType type = blks[i].getStorageTypes()[0];
+      DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
+          loc.getXferAddr(dfsClient.getConf().connectToDnViaHostname)), type);
+      Callable<Void> readCallable = getFromOneDataNode(dnAddr, blks[i],
+          rp.startOffsetInBlock, rp.startOffsetInBlock + rp.readLength - 1, buf,
+          rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
+      Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
+      DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
+      futures.put(getFromDNRequest, (int) i);
+    }
+    while (!futures.isEmpty()) {
+      try {
+        waitNextCompletion(stripedReadsService, futures);
+      } catch (InterruptedException ie) {
+        // Ignore and retry
+      }
+    }
+  }
+
+  private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
+      final LocatedBlock block, final long start, final long end,
+      final byte[] buf, final int[] offsets, final int[] lengths,
+      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
+      final int hedgedReadId) {
+    final Span parentSpan = Trace.currentSpan();
+    return new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        TraceScope scope =
+            Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan);
+        try {
+          actualGetFromOneDataNode(datanode, block, start,
+              end, buf, offsets, lengths, corruptedBlockMap);
+        } finally {
+          scope.close();
+        }
+        return null;
+      }
+    };
+  }
+
+  private void waitNextCompletion(CompletionService<Void> stripedReadsService,
+      Map<Future<Void>, Integer> futures) throws InterruptedException {
+    if (futures.isEmpty()) {
+      throw new InterruptedException("Futures already empty");
+    }
+    Future<Void> future = null;
+    try {
+      future = stripedReadsService.take();
+      future.get();
+      futures.remove(future);
+    } catch (ExecutionException | CancellationException e) {
+      // already logged in the Callable
+      futures.remove(future);
+    }
+    throw new InterruptedException("let's retry");
+  }
+
+  public void setCellSize(int cellSize) {
+    this.cellSize = cellSize;
+  }
+
+  /**
+   * This class represents the portion of I/O associated with each block in the
+   * striped block group.
+   */
+  static class ReadPortion {
+    /**
+     * startOffsetInBlock
+     *     |
+     *     v
+     *     |<-lengths[0]->|<-  lengths[1]  ->|<-lengths[2]->|
+     * +------------------+------------------+----------------+
+     * |      cell_0      |      cell_3      |     cell_6     |  <- blk_0
+     * +------------------+------------------+----------------+
+     *   _/                \_______________________
+     *  |                                          |
+     *  v offsetsInBuf[0]                          v offsetsInBuf[1]
+     * +------------------------------------------------------+
+     * |  cell_0     |      cell_1 and cell_2      |cell_3 ...|   <- buf
+     * |  (partial)  |    (from blk_1 and blk_2)   |          |
+     * +------------------------------------------------------+
+     */
+    private long startOffsetInBlock = 0;
+    private long readLength = 0;
+    private final List<Integer> offsetsInBuf = new ArrayList<>();
+    private final List<Integer> lengths = new ArrayList<>();
+
+    int[] getOffsets() {
+      int[] offsets = new int[offsetsInBuf.size()];
+      for (int i = 0; i < offsets.length; i++) {
+        offsets[i] = offsetsInBuf.get(i);
+      }
+      return offsets;
+    }
+
+    int[] getLengths() {
+      int[] lens = new int[this.lengths.size()];
+      for (int i = 0; i < lens.length; i++) {
+        lens[i] = this.lengths.get(i);
+      }
+      return lens;
+    }
+
+    long getReadLength() {
+      return readLength;
+    }
+
+    long getStartOffsetInBlock() {
+      return startOffsetInBlock;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index b2882af..a888aa4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -186,5 +186,5 @@ public class HdfsConstants {
   public static final byte MAX_BLOCKS_IN_GROUP = 16;
 
   // The chunk size for striped block which is used by erasure coding
-  public static final int BLOCK_STRIPED_CHUNK_SIZE = 64 * 1024;
+  public static final int BLOCK_STRIPED_CELL_SIZE = 128 * 1024;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index a38e8f2..4ba8193 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -212,5 +212,9 @@ public class LocatedBlock {
         + "; locs=" + Arrays.asList(locs)
         + "}";
   }
+
+  public boolean isStriped() {
+    return false;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
index 97e3a69..98614db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
@@ -65,4 +65,9 @@ public class LocatedStripedBlock extends LocatedBlock {
   public int[] getBlockIndices() {
     return this.blockIndices;
   }
+
+  @Override
+  public boolean isStriped() {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 4a85efb..20b0c5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
 
 /**
  * Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
@@ -203,8 +203,8 @@ public class BlockInfoStriped extends BlockInfo {
     // In case striped blocks, total usage by this striped blocks should
     // be the total of data blocks and parity blocks because
     // `getNumBytes` is the total of actual data block size.
-    return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CHUNK_SIZE) + 1)
-        * BLOCK_STRIPED_CHUNK_SIZE * parityBlockNum + getNumBytes();
+    return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CELL_SIZE) + 1)
+        * BLOCK_STRIPED_CELL_SIZE * parityBlockNum + getNumBytes();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index dc91c37..e3a85cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -64,6 +64,12 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
@@ -100,6 +106,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -107,7 +114,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@@ -122,8 +128,10 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
@@ -131,7 +139,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
-import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.io.IOUtils;
@@ -151,12 +158,8 @@ import org.apache.log4j.Level;
 import org.junit.Assume;
 import org.mockito.internal.util.reflection.Whitebox;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
 
 /** Utilities for HDFS tests */
 public class DFSTestUtil {
@@ -1807,4 +1810,77 @@ public class DFSTestUtil {
     reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
     return reports;
   }
+
+  public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
+      int numBlocks, int numStripesPerBlk) throws Exception {
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    dfs.mkdirs(dir);
+    dfs.getClient().createErasureCodingZone(dir.toString());
+
+    FSDataOutputStream out = null;
+    try {
+      out = dfs.create(file, (short) 1); // create an empty file
+
+      FSNamesystem ns = cluster.getNamesystem();
+      FSDirectory fsdir = ns.getFSDirectory();
+      INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
+
+      ExtendedBlock previous = null;
+      for (int i = 0; i < numBlocks; i++) {
+        Block newBlock = createBlock(cluster.getDataNodes(), dfs, ns,
+            file.toString(), fileNode, dfs.getClient().getClientName(),
+            previous, numStripesPerBlk);
+        previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
+      }
+
+      dfs.getClient().namenode.complete(file.toString(),
+          dfs.getClient().getClientName(), previous, fileNode.getId());
+    } finally {
+      IOUtils.cleanup(null, out);
+    }
+  }
+
+  static Block createBlock(List<DataNode> dataNodes, DistributedFileSystem fs,
+      FSNamesystem ns, String file, INodeFile fileNode, String clientName,
+      ExtendedBlock previous, int numStripes) throws Exception {
+    fs.getClient().namenode.addBlock(file, clientName, previous, null,
+        fileNode.getId(), null);
+
+    final BlockInfo lastBlock = fileNode.getLastBlock();
+    final int groupSize = fileNode.getBlockReplication();
+    // 1. RECEIVING_BLOCK IBR
+    int i = 0;
+    for (DataNode dn : dataNodes) {
+      if (i < groupSize) {
+        final Block block = new Block(lastBlock.getBlockId() + i++, 0,
+            lastBlock.getGenerationStamp());
+        DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+        StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+            .makeReportForReceivedBlock(block,
+                ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
+        for (StorageReceivedDeletedBlocks report : reports) {
+          ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+        }
+      }
+    }
+
+    // 2. RECEIVED_BLOCK IBR
+    i = 0;
+    for (DataNode dn : dataNodes) {
+      if (i < groupSize) {
+        final Block block = new Block(lastBlock.getBlockId() + i++,
+            numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp());
+        DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+        StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+            .makeReportForReceivedBlock(block,
+                ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+        for (StorageReceivedDeletedBlocks report : reports) {
+          ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+        }
+      }
+    }
+
+    lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS);
+    return lastBlock;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
new file mode 100644
index 0000000..0032bdd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestReadStripedFile {
+
+  public static final Log LOG = LogFactory.getLog(TestReadStripedFile.class);
+
+  private MiniDFSCluster cluster;
+  private Configuration conf = new Configuration();
+  private DistributedFileSystem fs;
+  private final Path dirPath = new Path("/striped");
+  private Path filePath = new Path(dirPath, "file");
+  private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS;
+  private final short TOTAL_SIZE = HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
+  private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final int NUM_STRIPE_PER_BLOCK = 2;
+  private final int BLOCKSIZE = 2 * GROUP_SIZE * CELLSIZE;
+
+  @Before
+  public void setup() throws IOException {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
+    SimulatedFSDataset.setFactory(conf);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(TOTAL_SIZE)
+        .build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private void testPlanReadPortions(int startInBlk, int length,
+      int bufferOffset, int[] readLengths, int[] offsetsInBlock,
+      int[][] bufferOffsets, int[][] bufferLengths) {
+    ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE,
+        CELLSIZE, startInBlk, length, bufferOffset);
+    assertEquals(GROUP_SIZE, results.length);
+
+    for (int i = 0; i < GROUP_SIZE; i++) {
+      assertEquals(readLengths[i], results[i].getReadLength());
+      assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
+      final int[] bOffsets = results[i].getOffsets();
+      assertArrayEquals(bufferOffsets[i], bOffsets);
+      final int[] bLengths = results[i].getLengths();
+      assertArrayEquals(bufferLengths[i], bLengths);
+    }
+  }
+
+  /**
+   * Test {@link DFSStripedInputStream#planReadPortions}
+   */
+  @Test
+  public void testPlanReadPortions() {
+    /**
+     * start block offset is 0, read cellSize - 10
+     */
+    testPlanReadPortions(0, CELLSIZE - 10, 0,
+        new int[]{CELLSIZE - 10, 0, 0}, new int[]{0, 0, 0},
+        new int[][]{new int[]{0}, new int[]{}, new int[]{}},
+        new int[][]{new int[]{CELLSIZE - 10}, new int[]{}, new int[]{}});
+
+    /**
+     * start block offset is 0, read 3 * cellSize
+     */
+    testPlanReadPortions(0, GROUP_SIZE * CELLSIZE, 0,
+        new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, new int[]{0, 0, 0},
+        new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{CELLSIZE * 2}},
+        new int[][]{new int[]{CELLSIZE}, new int[]{CELLSIZE}, new int[]{CELLSIZE}});
+
+    /**
+     * start block offset is 0, read cellSize + 10
+     */
+    testPlanReadPortions(0, CELLSIZE + 10, 0,
+        new int[]{CELLSIZE, 10, 0}, new int[]{0, 0, 0},
+        new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{}},
+        new int[][]{new int[]{CELLSIZE}, new int[]{10}, new int[]{}});
+
+    /**
+     * start block offset is 0, read 5 * cellSize + 10, buffer start offset is 100
+     */
+    testPlanReadPortions(0, 5 * CELLSIZE + 10, 100,
+        new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE + 10}, new int[]{0, 0, 0},
+        new int[][]{new int[]{100, 100 + CELLSIZE * GROUP_SIZE},
+            new int[]{100 + CELLSIZE, 100 + CELLSIZE * 4},
+            new int[]{100 + CELLSIZE * 2, 100 + CELLSIZE * 5}},
+        new int[][]{new int[]{CELLSIZE, CELLSIZE},
+            new int[]{CELLSIZE, CELLSIZE},
+            new int[]{CELLSIZE, 10}});
+
+    /**
+     * start block offset is 2, read 3 * cellSize
+     */
+    testPlanReadPortions(2, GROUP_SIZE * CELLSIZE, 100,
+        new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
+        new int[]{2, 0, 0},
+        new int[][]{new int[]{100, 100 + GROUP_SIZE * CELLSIZE - 2},
+            new int[]{100 + CELLSIZE - 2},
+            new int[]{100 + CELLSIZE * 2 - 2}},
+        new int[][]{new int[]{CELLSIZE - 2, 2},
+            new int[]{CELLSIZE},
+            new int[]{CELLSIZE}});
+
+    /**
+     * start block offset is 2, read 3 * cellSize + 10
+     */
+    testPlanReadPortions(2, GROUP_SIZE * CELLSIZE + 10, 0,
+        new int[]{CELLSIZE + 10, CELLSIZE, CELLSIZE},
+        new int[]{2, 0, 0},
+        new int[][]{new int[]{0, GROUP_SIZE * CELLSIZE - 2},
+            new int[]{CELLSIZE - 2},
+            new int[]{CELLSIZE * 2 - 2}},
+        new int[][]{new int[]{CELLSIZE - 2, 12},
+            new int[]{CELLSIZE},
+            new int[]{CELLSIZE}});
+
+    /**
+     * start block offset is cellSize * 2 - 1, read 5 * cellSize + 10
+     */
+    testPlanReadPortions(CELLSIZE * 2 - 1, 5 * CELLSIZE + 10, 0,
+        new int[]{CELLSIZE * 2, CELLSIZE + 10, CELLSIZE * 2},
+        new int[]{CELLSIZE, CELLSIZE - 1, 0},
+        new int[][]{new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1},
+            new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1},
+            new int[]{1, 3 * CELLSIZE + 1}},
+        new int[][]{new int[]{CELLSIZE, CELLSIZE},
+            new int[]{1, CELLSIZE, 9},
+            new int[]{CELLSIZE, CELLSIZE}});
+
+    /**
+     * start block offset is cellSize * 6 - 1, read 7 * cellSize + 10
+     */
+    testPlanReadPortions(CELLSIZE * 6 - 1, 7 * CELLSIZE + 10, 0,
+        new int[]{CELLSIZE * 3, CELLSIZE * 2 + 9, CELLSIZE * 2 + 1},
+        new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE * 2 - 1},
+        new int[][]{new int[]{1, 3 * CELLSIZE + 1, 6 * CELLSIZE + 1},
+            new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1, 7 * CELLSIZE + 1},
+            new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}},
+        new int[][]{new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
+            new int[]{CELLSIZE, CELLSIZE, 9},
+            new int[]{1, CELLSIZE, CELLSIZE}});
+  }
+
+  private LocatedStripedBlock createDummyLocatedBlock() {
+    final long blockGroupID = -1048576;
+    DatanodeInfo[] locs = new DatanodeInfo[TOTAL_SIZE];
+    String[] storageIDs = new String[TOTAL_SIZE];
+    StorageType[] storageTypes = new StorageType[TOTAL_SIZE];
+    int[] indices = new int[TOTAL_SIZE];
+    for (int i = 0; i < TOTAL_SIZE; i++) {
+      locs[i] = new DatanodeInfo(cluster.getDataNodes().get(i).getDatanodeId());
+      storageIDs[i] = cluster.getDataNodes().get(i).getDatanodeUuid();
+      storageTypes[i] = StorageType.DISK;
+      indices[i] = (i + 2) % GROUP_SIZE;
+    }
+    return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID),
+        locs, storageIDs, storageTypes, indices, 0, false, null);
+  }
+
+  @Test
+  public void testParseDummyStripedBlock() {
+    LocatedStripedBlock lsb = createDummyLocatedBlock();
+    LocatedBlock[] blocks = DFSStripedInputStream.parseStripedBlockGroup(
+        lsb, GROUP_SIZE, CELLSIZE);
+    assertEquals(GROUP_SIZE, blocks.length);
+    for (int j = 0; j < GROUP_SIZE; j++) {
+      assertFalse(blocks[j].isStriped());
+      assertEquals(j,
+          BlockIdManager.getBlockIndex(blocks[j].getBlock().getLocalBlock()));
+      assertEquals(j * CELLSIZE, blocks[j].getStartOffset());
+    }
+  }
+
+  @Test
+  public void testParseStripedBlock() throws Exception {
+    final int numBlocks = 4;
+    DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
+        NUM_STRIPE_PER_BLOCK);
+    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+        filePath.toString(), 0, BLOCKSIZE * numBlocks);
+
+    assertEquals(4, lbs.locatedBlockCount());
+    List<LocatedBlock> lbList = lbs.getLocatedBlocks();
+    for (LocatedBlock lb : lbList) {
+      assertTrue(lb.isStriped());
+    }
+
+    for (int i = 0; i < numBlocks; i++) {
+      LocatedStripedBlock lsb = (LocatedStripedBlock) (lbs.get(i));
+      LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
+          GROUP_SIZE, CELLSIZE);
+      assertEquals(GROUP_SIZE, blks.length);
+      for (int j = 0; j < GROUP_SIZE; j++) {
+        assertFalse(blks[j].isStriped());
+        assertEquals(j,
+            BlockIdManager.getBlockIndex(blks[j].getBlock().getLocalBlock()));
+        assertEquals(i * BLOCKSIZE + j * CELLSIZE, blks[j].getStartOffset());
+      }
+    }
+  }
+
+  /**
+   * Test {@link DFSStripedInputStream#getBlockAt(long)}
+   */
+  @Test
+  public void testGetBlock() throws Exception {
+    final int numBlocks = 4;
+    DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
+        NUM_STRIPE_PER_BLOCK);
+    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+        filePath.toString(), 0, BLOCKSIZE * numBlocks);
+    final DFSStripedInputStream in =
+        new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
+
+    List<LocatedBlock> lbList = lbs.getLocatedBlocks();
+    for (LocatedBlock aLbList : lbList) {
+      LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList;
+      LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
+          GROUP_SIZE, CELLSIZE);
+      for (int j = 0; j < GROUP_SIZE; j++) {
+        LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset());
+        assertEquals(blks[j].getBlock(), refreshed.getBlock());
+        assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
+        assertArrayEquals(blks[j].getLocations(), refreshed.getLocations());
+      }
+    }
+  }
+
+  @Test
+  public void testPread() throws Exception {
+    final int numBlocks = 4;
+    DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
+        NUM_STRIPE_PER_BLOCK);
+    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+        filePath.toString(), 0, BLOCKSIZE);
+
+    assert lbs.get(0) instanceof LocatedStripedBlock;
+    LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
+    for (int i = 0; i < GROUP_SIZE; i++) {
+      Block blk = new Block(bg.getBlock().getBlockId() + i, BLOCKSIZE,
+          bg.getBlock().getGenerationStamp());
+      blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+      cluster.injectBlocks(i, Arrays.asList(blk),
+          bg.getBlock().getBlockPoolId());
+    }
+    DFSStripedInputStream in =
+        new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
+    in.setCellSize(CELLSIZE);
+    int readSize = BLOCKSIZE;
+    byte[] readBuffer = new byte[readSize];
+    int ret = in.read(0, readBuffer, 0, readSize);
+
+    assertEquals(readSize, ret);
+    // TODO: verify read results with patterned data from HDFS-8117
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
index d965ae7..b2ff6c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -18,15 +18,11 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@@ -36,19 +32,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
-import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
-import org.apache.hadoop.io.IOUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.UUID;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
 import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -84,83 +75,10 @@ public class TestRecoverStripedBlocks {
     }
   }
 
-  public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
-      int numBlocks) throws Exception {
-    DistributedFileSystem dfs = cluster.getFileSystem();
-    dfs.mkdirs(dir);
-    dfs.getClient().getNamenode().createErasureCodingZone(dir.toString());
-
-    FSDataOutputStream out = null;
-    try {
-      out = dfs.create(file, (short) 1); // create an empty file
-
-      FSNamesystem ns = cluster.getNamesystem();
-      FSDirectory fsdir = ns.getFSDirectory();
-      INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
-
-      ExtendedBlock previous = null;
-      for (int i = 0; i < numBlocks; i++) {
-        Block newBlock = createBlock(cluster.getDataNodes(), ns,
-            file.toString(), fileNode, dfs.getClient().getClientName(),
-            previous);
-        previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
-      }
-
-      ns.completeFile(file.toString(), dfs.getClient().getClientName(),
-          previous, fileNode.getId());
-    } finally {
-      IOUtils.cleanup(null, out);
-    }
-  }
-
-  static Block createBlock(List<DataNode> dataNodes, FSNamesystem ns,
-      String file, INodeFile fileNode, String clientName,
-      ExtendedBlock previous) throws Exception {
-    ns.getAdditionalBlock(file, fileNode.getId(), clientName, previous, null,
-        null);
-
-    final BlockInfo lastBlock = fileNode.getLastBlock();
-    final int groupSize = fileNode.getBlockReplication();
-    // 1. RECEIVING_BLOCK IBR
-    int i = 0;
-    for (DataNode dn : dataNodes) {
-      if (i < groupSize) {
-        final Block block = new Block(lastBlock.getBlockId() + i++, 0,
-            lastBlock.getGenerationStamp());
-        DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
-        StorageReceivedDeletedBlocks[] reports = DFSTestUtil
-            .makeReportForReceivedBlock(block,
-                ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
-        for (StorageReceivedDeletedBlocks report : reports) {
-          ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
-        }
-      }
-    }
-
-    // 2. RECEIVED_BLOCK IBR
-    i = 0;
-    for (DataNode dn : dataNodes) {
-      if (i < groupSize) {
-        final Block block = new Block(lastBlock.getBlockId() + i++,
-            BLOCK_STRIPED_CHUNK_SIZE, lastBlock.getGenerationStamp());
-        DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
-        StorageReceivedDeletedBlocks[] reports = DFSTestUtil
-            .makeReportForReceivedBlock(block,
-                ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
-        for (StorageReceivedDeletedBlocks report : reports) {
-          ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
-        }
-      }
-    }
-
-    lastBlock.setNumBytes(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS);
-    return lastBlock;
-  }
-
   @Test
   public void testMissingStripedBlock() throws Exception {
     final int numBlocks = 4;
-    createECFile(cluster, filePath, dirPath, numBlocks);
+    DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, 1);
 
     // make sure the file is complete in NN
     final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
@@ -172,7 +90,7 @@ public class TestRecoverStripedBlocks {
     for (BlockInfo blk : blocks) {
       assertTrue(blk.isStriped());
       assertTrue(blk.isComplete());
-      assertEquals(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
+      assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
       final BlockInfoStriped sb = (BlockInfoStriped) blk;
       assertEquals(GROUP_SIZE, sb.numNodes());
     }


[04/12] hadoop git commit: HADOOP-11818 Minor improvements for erasurecode classes. Contributed by Rakesh R

Posted by zh...@apache.org.
HADOOP-11818 Minor improvements for erasurecode classes. Contributed by Rakesh R


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cc2202c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cc2202c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cc2202c2

Branch: refs/heads/HDFS-7285
Commit: cc2202c276f71de4363c509a6223281d58f83f3f
Parents: d8c0ecf
Author: Kai Zheng <ka...@intel.com>
Authored: Fri Apr 10 04:31:48 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:19 2015 -0700

----------------------------------------------------------------------
 .../hadoop-common/CHANGES-HDFS-EC-7285.txt       |  2 ++
 .../hadoop/io/erasurecode/SchemaLoader.java      | 12 ++++++------
 .../io/erasurecode/coder/RSErasureDecoder.java   | 19 ++++++++++++++++++-
 .../io/erasurecode/coder/RSErasureEncoder.java   | 19 ++++++++++++++++++-
 .../io/erasurecode/coder/XORErasureDecoder.java  |  2 +-
 .../io/erasurecode/rawcoder/util/RSUtil.java     | 17 +++++++++++++++++
 6 files changed, 62 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc2202c2/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
index c72394e..b850e11 100644
--- a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt
@@ -40,3 +40,5 @@
 
     HADOOP-11645. Erasure Codec API covering the essential aspects for an erasure code
     ( Kai Zheng via vinayakumarb )
+  
+    HADOOP-11818. Minor improvements for erasurecode classes. (Rakesh R via Kai Zheng)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc2202c2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java
index c51ed37..75dd03a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java
@@ -17,8 +17,8 @@
  */
 package org.apache.hadoop.io.erasurecode;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.w3c.dom.*;
@@ -36,7 +36,7 @@ import java.util.*;
  * A EC schema loading utility that loads predefined EC schemas from XML file
  */
 public class SchemaLoader {
-  private static final Log LOG = LogFactory.getLog(SchemaLoader.class.getName());
+  private static final Logger LOG = LoggerFactory.getLogger(SchemaLoader.class.getName());
 
   /**
    * Load predefined ec schemas from configuration file. This file is
@@ -63,7 +63,7 @@ public class SchemaLoader {
   private List<ECSchema> loadSchema(File schemaFile)
       throws ParserConfigurationException, IOException, SAXException {
 
-    LOG.info("Loading predefined EC schema file " + schemaFile);
+    LOG.info("Loading predefined EC schema file {}", schemaFile);
 
     // Read and parse the schema file.
     DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
@@ -87,7 +87,7 @@ public class SchemaLoader {
           ECSchema schema = loadSchema(element);
             schemas.add(schema);
         } else {
-          LOG.warn("Bad element in EC schema configuration file: " +
+          LOG.warn("Bad element in EC schema configuration file: {}",
               element.getTagName());
         }
       }
@@ -109,7 +109,7 @@ public class SchemaLoader {
       URL url = Thread.currentThread().getContextClassLoader()
           .getResource(schemaFilePath);
       if (url == null) {
-        LOG.warn(schemaFilePath + " not found on the classpath.");
+        LOG.warn("{} not found on the classpath.", schemaFilePath);
         schemaFile = null;
       } else if (! url.getProtocol().equalsIgnoreCase("file")) {
         throw new RuntimeException(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc2202c2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
index e2c5051..fc664a5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureDecoder.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.io.erasurecode.coder;
 
 import org.apache.hadoop.conf.Configuration;
@@ -11,7 +28,7 @@ import org.apache.hadoop.io.erasurecode.rawcoder.XORRawDecoder;
 /**
  * Reed-Solomon erasure decoder that decodes a block group.
  *
- * It implements {@link ErasureDecoder}.
+ * It implements {@link ErasureCoder}.
  */
 public class RSErasureDecoder extends AbstractErasureDecoder {
   private RawErasureDecoder rsRawDecoder;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc2202c2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
index a7d02b5..18ca5ac 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.io.erasurecode.coder;
 
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -9,7 +26,7 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 /**
  * Reed-Solomon erasure encoder that encodes a block group.
  *
- * It implements {@link ErasureEncoder}.
+ * It implements {@link ErasureCoder}.
  */
 public class RSErasureEncoder extends AbstractErasureEncoder {
   private RawErasureEncoder rawEncoder;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc2202c2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
index 6f4b423..0672549 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.io.erasurecode.rawcoder.XORRawDecoder;
 /**
  * Xor erasure decoder that decodes a block group.
  *
- * It implements {@link ErasureDecoder}.
+ * It implements {@link ErasureCoder}.
  */
 public class XORErasureDecoder extends AbstractErasureDecoder {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc2202c2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/RSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/RSUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/RSUtil.java
index 33ba561..8badf02 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/RSUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/RSUtil.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.io.erasurecode.rawcoder.util;
 
 /**


[12/12] hadoop git commit: HDFS-8114. Erasure coding: Add auditlog FSNamesystem#createErasureCodingZone if this operation fails. Contributed by Rakesh R.

Posted by zh...@apache.org.
HDFS-8114. Erasure coding: Add auditlog FSNamesystem#createErasureCodingZone if this operation fails. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6e202bac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6e202bac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6e202bac

Branch: refs/heads/HDFS-7285
Commit: 6e202bac1aea7de46bd836d6039c4be543982170
Parents: 9708ae6
Author: Zhe Zhang <zh...@apache.org>
Authored: Mon Apr 13 11:15:02 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:21 2015 -0700

----------------------------------------------------------------------
 .../hdfs/server/namenode/FSNamesystem.java      | 21 ++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e202bac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index dfd0382..1a9c529 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -8116,11 +8116,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       SafeModeException, AccessControlException {
     String src = srcArg;
     HdfsFileStatus resultingStat = null;
-    checkSuperuserPrivilege();
-    checkOperation(OperationCategory.WRITE);
-    final byte[][] pathComponents =
-        FSDirectory.getPathComponentsForReservedPath(src);
-    FSPermissionChecker pc = getPermissionChecker();
+    FSPermissionChecker pc = null;
+    byte[][] pathComponents = null;
+    boolean success = false;
+    try {
+      checkSuperuserPrivilege();
+      checkOperation(OperationCategory.WRITE);
+      pathComponents =
+          FSDirectory.getPathComponentsForReservedPath(src);
+      pc = getPermissionChecker();
+    } catch (Throwable e) {
+      logAuditEvent(success, "createErasureCodingZone", srcArg);
+      throw e;
+    }
     writeLock();
     try {
       checkSuperuserPrivilege();
@@ -8134,11 +8142,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
       final INodesInPath iip = dir.getINodesInPath4Write(src, false);
       resultingStat = dir.getAuditFileInfo(iip);
+      success = true;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(true, "createErasureCodingZone", srcArg, null, resultingStat);
+    logAuditEvent(success, "createErasureCodingZone", srcArg, null, resultingStat);
   }
 
   /**


[08/12] hadoop git commit: HDFS-8122. Erasure Coding: Support specifying ECSchema during creation of ECZone. Contributed by Vinayakumar B.

Posted by zh...@apache.org.
HDFS-8122. Erasure Coding: Support specifying ECSchema during creation of ECZone. Contributed by Vinayakumar B.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9708ae63
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9708ae63
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9708ae63

Branch: refs/heads/HDFS-7285
Commit: 9708ae637b51e9235318a1fae962f9d1f73132dd
Parents: f8992da
Author: Zhe Zhang <zh...@apache.org>
Authored: Mon Apr 13 11:08:57 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:20 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  6 ++--
 .../hadoop/hdfs/DistributedFileSystem.java      | 33 ++++++++++++++++++++
 .../hadoop/hdfs/protocol/ClientProtocol.java    |  6 ++--
 ...tNamenodeProtocolServerSideTranslatorPB.java |  4 ++-
 .../ClientNamenodeProtocolTranslatorPB.java     |  5 ++-
 .../namenode/ErasureCodingZoneManager.java      | 30 +++++++++++++-----
 .../hdfs/server/namenode/FSDirectory.java       | 23 ++++++++------
 .../hdfs/server/namenode/FSNamesystem.java      | 19 ++++++-----
 .../hdfs/server/namenode/NameNodeRpcServer.java |  6 ++--
 .../src/main/proto/ClientNamenodeProtocol.proto |  1 +
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  2 +-
 .../hadoop/hdfs/TestDFSStripedOutputStream.java |  2 +-
 .../hadoop/hdfs/TestErasureCodingZones.java     | 18 +++++------
 .../server/namenode/TestAddStripedBlocks.java   |  2 +-
 .../server/namenode/TestFSEditLogLoader.java    |  4 +--
 .../hdfs/server/namenode/TestFSImage.java       |  4 +--
 16 files changed, 112 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 6339d30..7ff9073 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1322,7 +1322,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
                              Progressable progress,
                              int buffersize,
                              ChecksumOpt checksumOpt) throws IOException {
-    return create(src, permission, flag, createParent, replication, blockSize, 
+    return create(src, permission, flag, createParent, replication, blockSize,
         progress, buffersize, checksumOpt, null);
   }
 
@@ -2966,12 +2966,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return new EncryptionZoneIterator(namenode, traceSampler);
   }
 
-  public void createErasureCodingZone(String src)
+  public void createErasureCodingZone(String src, ECSchema schema)
       throws IOException {
     checkOpen();
     TraceScope scope = getPathTraceScope("createErasureCodingZone", src);
     try {
-      namenode.createErasureCodingZone(src);
+      namenode.createErasureCodingZone(src, schema);
     } catch (RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
           SafeModeException.class,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 21f5107..160aae3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
@@ -2217,4 +2218,36 @@ public class DistributedFileSystem extends FileSystem {
       throws IOException {
     return dfs.getInotifyEventStream(lastReadTxid);
   }
+
+  /**
+   * Create the erasurecoding zone
+   * 
+   * @param path Directory to create the ec zone
+   * @param schema ECSchema for the zone. If not specified default will be used.
+   * @throws IOException
+   */
+  public void createErasureCodingZone(final Path path, final ECSchema schema)
+      throws IOException {
+    Path absF = fixRelativePart(path);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p) throws IOException,
+          UnresolvedLinkException {
+        dfs.createErasureCodingZone(getPathName(p), null);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p) throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem) fs;
+          myDfs.createErasureCodingZone(p, schema);
+          return null;
+        }
+        throw new UnsupportedOperationException(
+            "Cannot createErasureCodingZone through a symlink to a "
+                + "non-DistributedFileSystem: " + path + " -> " + p);
+      }
+    }.resolve(this, absF);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 7f5ac49..0c04ca9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -1364,11 +1364,11 @@ public interface ClientProtocol {
       long prevId) throws IOException;
 
   /**
-   * Create an erasure coding zone (currently with hardcoded schema)
-   * TODO: Configurable and pluggable schemas (HDFS-7337)
+   * Create an erasure coding zone with specified schema, if any, otherwise
+   * default
    */
   @Idempotent
-  public void createErasureCodingZone(String src)
+  public void createErasureCodingZone(String src, ECSchema schema)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 33207ab..cc5ca55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -1403,7 +1403,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       RpcController controller, CreateErasureCodingZoneRequestProto req)
       throws ServiceException {
     try {
-      server.createErasureCodingZone(req.getSrc());
+      ECSchema schema = req.hasSchema() ? PBHelper.convertECSchema(req
+          .getSchema()) : null;
+      server.createErasureCodingZone(req.getSrc(), schema);
       return CreateErasureCodingZoneResponseProto.newBuilder().build();
     } catch (IOException e) {
       throw new ServiceException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 0211522..2e17823 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -1420,11 +1420,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
   }
 
   @Override
-  public void createErasureCodingZone(String src)
+  public void createErasureCodingZone(String src, ECSchema schema)
       throws IOException {
     final CreateErasureCodingZoneRequestProto.Builder builder =
         CreateErasureCodingZoneRequestProto.newBuilder();
     builder.setSrc(src);
+    if (schema != null) {
+      builder.setSchema(PBHelper.convertECSchema(schema));
+    }
     CreateErasureCodingZoneRequestProto req = builder.build();
     try {
       rpcProxy.createErasureCodingZone(null, req);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
index 606e804..c7daa2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
@@ -22,6 +22,9 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.hdfs.XAttrHelper;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -50,7 +53,11 @@ public class ErasureCodingZoneManager {
     this.dir = dir;
   }
 
-  boolean getECPolicy(INodesInPath iip) {
+  boolean getECPolicy(INodesInPath iip) throws IOException {
+    return getECSchema(iip) != null;
+  }
+
+  ECSchema getECSchema(INodesInPath iip) throws IOException{
     assert dir.hasReadLock();
     Preconditions.checkNotNull(iip);
     List<INode> inodes = iip.getReadOnlyINodes();
@@ -64,21 +71,23 @@ public class ErasureCodingZoneManager {
       // EC
       // TODO: properly support symlinks in EC zones
       if (inode.isSymlink()) {
-        return false;
+        return null;
       }
       final List<XAttr> xAttrs = inode.getXAttrFeature() == null ?
           new ArrayList<XAttr>(0)
           : inode.getXAttrFeature().getXAttrs();
       for (XAttr xAttr : xAttrs) {
         if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) {
-          return true;
+          ECSchemaProto ecSchemaProto;
+          ecSchemaProto = ECSchemaProto.parseFrom(xAttr.getValue());
+          return PBHelper.convertECSchema(ecSchemaProto);
         }
       }
     }
-    return false;
+    return null;
   }
 
-  XAttr createErasureCodingZone(String src)
+  XAttr createErasureCodingZone(String src, ECSchema schema)
       throws IOException {
     assert dir.hasWriteLock();
     final INodesInPath srcIIP = dir.getINodesInPath4Write(src, false);
@@ -97,8 +106,15 @@ public class ErasureCodingZoneManager {
       throw new IOException("Directory " + src + " is already in an " +
           "erasure coding zone.");
     }
-    final XAttr ecXAttr = XAttrHelper
-        .buildXAttr(XATTR_ERASURECODING_ZONE, null);
+    // TODO HDFS-7859 Need to persist the schema in xattr in efficient way
+    // As of now storing the protobuf format
+    if (schema == null) {
+      schema = ECSchemaManager.getSystemDefaultSchema();
+    }
+    ECSchemaProto schemaProto = PBHelper.convertECSchema(schema);
+    byte[] schemaBytes = schemaProto.toByteArray();
+    final XAttr ecXAttr = XAttrHelper.buildXAttr(XATTR_ERASURECODING_ZONE,
+        schemaBytes);
     final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
     xattrs.add(ecXAttr);
     FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index d68128a..188425e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -63,9 +63,9 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshotFeature;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.EnumCounters;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -414,11 +414,12 @@ public class FSDirectory implements Closeable {
    * Add the given filename to the fs.
    * @return the new INodesInPath instance that contains the new INode
    */
-  INodesInPath addFile(INodesInPath existing, String localName, PermissionStatus
-      permissions, short replication, long preferredBlockSize,
+  INodesInPath addFile(INodesInPath existing, String localName,
+      PermissionStatus permissions, short replication, long preferredBlockSize,
       String clientName, String clientMachine)
-    throws FileAlreadyExistsException, QuotaExceededException,
-      UnresolvedLinkException, SnapshotAccessControlException, AclException {
+      throws FileAlreadyExistsException, QuotaExceededException,
+      UnresolvedLinkException, SnapshotAccessControlException, AclException,
+      IOException {
 
     long modTime = now();
     INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime,
@@ -1400,20 +1401,24 @@ public class FSDirectory implements Closeable {
     }
   }
 
-  XAttr createErasureCodingZone(String src)
+  XAttr createErasureCodingZone(String src, ECSchema schema)
       throws IOException {
     writeLock();
     try {
-      return ecZoneManager.createErasureCodingZone(src);
+      return ecZoneManager.createErasureCodingZone(src, schema);
     } finally {
       writeUnlock();
     }
   }
 
-  public boolean getECPolicy(INodesInPath iip) {
+  public boolean getECPolicy(INodesInPath iip) throws IOException {
+    return getECSchema(iip) != null;
+  }
+
+  ECSchema getECSchema(INodesInPath iip) throws IOException {
     readLock();
     try {
-      return ecZoneManager.getECPolicy(iip);
+      return ecZoneManager.getECSchema(iip);
     } finally {
       readUnlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 6d89b0c..dfd0382 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -8103,16 +8103,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   /**
    * Create an erasure coding zone on directory src.
-   *
+   * @param schema  ECSchema for the erasure coding zone
    * @param src     the path of a directory which will be the root of the
    *                erasure coding zone. The directory must be empty.
+   *
    * @throws AccessControlException  if the caller is not the superuser.
    * @throws UnresolvedLinkException if the path can't be resolved.
    * @throws SafeModeException       if the Namenode is in safe mode.
    */
-  void createErasureCodingZone(final String srcArg,
-      final boolean logRetryCache)
-      throws IOException, UnresolvedLinkException,
+  void createErasureCodingZone(final String srcArg, final ECSchema schema,
+      final boolean logRetryCache) throws IOException, UnresolvedLinkException,
       SafeModeException, AccessControlException {
     String src = srcArg;
     HdfsFileStatus resultingStat = null;
@@ -8128,7 +8128,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkNameNodeSafeMode("Cannot create erasure coding zone on " + src);
       src = dir.resolvePath(pc, src, pathComponents);
 
-      final XAttr ecXAttr = dir.createErasureCodingZone(src);
+      final XAttr ecXAttr = dir.createErasureCodingZone(src, schema);
       List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
       xAttrs.add(ecXAttr);
       getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
@@ -8158,11 +8158,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (isPermissionEnabled) {
         dir.checkPathAccess(pc, iip, FsAction.READ);
       }
-      if (dir.getECPolicy(iip)) {
-        // TODO HDFS-8074 and HDFS-7859 : To get from loaded schemas
-        Map<String, String> options = new HashMap<String, String>();
-        ECSchema defaultSchema = new ECSchema("RS-6-3", "rs", 6, 3, options);
-        return new ECInfo(src, defaultSchema);
+      // Get schema set for the zone
+      ECSchema schema = dir.getECSchema(iip);
+      if (schema != null) {
+        return new ECInfo(src, schema);
       }
     } finally {
       readUnlock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 85ac1e5..5e01c77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1836,8 +1836,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
   }
 
   @Override // ClientProtocol
-  public void createErasureCodingZone(String src)
-    throws IOException {
+  public void createErasureCodingZone(String src, ECSchema schema)
+      throws IOException {
     checkNNStartup();
     final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -1845,7 +1845,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
     }
     boolean success = false;
     try {
-      namesystem.createErasureCodingZone(src, cacheEntry != null);
+      namesystem.createErasureCodingZone(src, schema, cacheEntry != null);
     } finally {
       RetryCache.setState(cacheEntry, success);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index 3389a22..c9059bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -716,6 +716,7 @@ message GetEditsFromTxidResponseProto {
 
 message CreateErasureCodingZoneRequestProto {
   required string src = 1;
+  optional ECSchemaProto schema = 2;
 }
 
 message CreateErasureCodingZoneResponseProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index e3a85cc..4c80388 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1815,7 +1815,7 @@ public class DFSTestUtil {
       int numBlocks, int numStripesPerBlk) throws Exception {
     DistributedFileSystem dfs = cluster.getFileSystem();
     dfs.mkdirs(dir);
-    dfs.getClient().createErasureCodingZone(dir.toString());
+    dfs.getClient().createErasureCodingZone(dir.toString(), null);
 
     FSDataOutputStream out = null;
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index ee6998b..c78922e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -50,7 +50,7 @@ public class TestDFSStripedOutputStream {
     Configuration conf = new Configuration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, cellsInBlock * cellSize);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
-    cluster.getFileSystem().getClient().createErasureCodingZone("/");
+    cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
     fs = cluster.getFileSystem();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
index bdca915..699df4e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
@@ -64,7 +64,7 @@ public class TestErasureCodingZones {
     fs.mkdir(testDir, FsPermission.getDirDefault());
 
     /* Normal creation of an erasure coding zone */
-    fs.getClient().createErasureCodingZone(testDir.toString());
+    fs.getClient().createErasureCodingZone(testDir.toString(), null);
 
     /* Verify files under the zone are striped */
     final Path ECFilePath = new Path(testDir, "foo");
@@ -77,7 +77,7 @@ public class TestErasureCodingZones {
     fs.mkdir(notEmpty, FsPermission.getDirDefault());
     fs.create(new Path(notEmpty, "foo"));
     try {
-      fs.getClient().createErasureCodingZone(notEmpty.toString());
+      fs.getClient().createErasureCodingZone(notEmpty.toString(), null);
       fail("Erasure coding zone on non-empty dir");
     } catch (IOException e) {
       assertExceptionContains("erasure coding zone for a non-empty directory", e);
@@ -87,10 +87,10 @@ public class TestErasureCodingZones {
     final Path zone1 = new Path("/zone1");
     final Path zone2 = new Path(zone1, "zone2");
     fs.mkdir(zone1, FsPermission.getDirDefault());
-    fs.getClient().createErasureCodingZone(zone1.toString());
+    fs.getClient().createErasureCodingZone(zone1.toString(), null);
     fs.mkdir(zone2, FsPermission.getDirDefault());
     try {
-      fs.getClient().createErasureCodingZone(zone2.toString());
+      fs.getClient().createErasureCodingZone(zone2.toString(), null);
       fail("Nested erasure coding zones");
     } catch (IOException e) {
       assertExceptionContains("already in an erasure coding zone", e);
@@ -100,7 +100,7 @@ public class TestErasureCodingZones {
     final Path fPath = new Path("/file");
     fs.create(fPath);
     try {
-      fs.getClient().createErasureCodingZone(fPath.toString());
+      fs.getClient().createErasureCodingZone(fPath.toString(), null);
       fail("Erasure coding zone on file");
     } catch (IOException e) {
       assertExceptionContains("erasure coding zone for a file", e);
@@ -113,8 +113,8 @@ public class TestErasureCodingZones {
     final Path dstECDir = new Path("/dstEC");
     fs.mkdir(srcECDir, FsPermission.getDirDefault());
     fs.mkdir(dstECDir, FsPermission.getDirDefault());
-    fs.getClient().createErasureCodingZone(srcECDir.toString());
-    fs.getClient().createErasureCodingZone(dstECDir.toString());
+    fs.getClient().createErasureCodingZone(srcECDir.toString(), null);
+    fs.getClient().createErasureCodingZone(dstECDir.toString(), null);
     final Path srcFile = new Path(srcECDir, "foo");
     fs.create(srcFile);
 
@@ -158,7 +158,7 @@ public class TestErasureCodingZones {
     // dir ECInfo before creating ec zone
     assertNull(fs.getClient().getErasureCodingInfo(src));
     // dir ECInfo after creating ec zone
-    fs.getClient().createErasureCodingZone(src);
+    fs.getClient().createErasureCodingZone(src, null);
     verifyErasureCodingInfo(src);
     fs.create(new Path(ecDir, "/child1")).close();
     // verify for the files in ec zone
@@ -182,4 +182,4 @@ public class TestErasureCodingZones {
     assertEquals("Default chunkSize should be used",
         ECSchema.DEFAULT_CHUNK_SIZE, schema.getChunkSize());
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
index c3c8239..27df1cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
@@ -68,7 +68,7 @@ public class TestAddStripedBlocks {
         .numDataNodes(GROUP_SIZE).build();
     cluster.waitActive();
     dfs = cluster.getFileSystem();
-    dfs.getClient().createErasureCodingZone("/");
+    dfs.getClient().createErasureCodingZone("/", null);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
index 0eeb7f8..c18fd5d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
@@ -444,7 +444,7 @@ public class TestFSEditLogLoader {
 
       //set the storage policy of the directory
       fs.mkdir(new Path(testDir), new FsPermission("755"));
-      fs.getClient().getNamenode().createErasureCodingZone(testDir);
+      fs.getClient().getNamenode().createErasureCodingZone(testDir, null);
 
       // Create a file with striped block
       Path p = new Path(testFilePath);
@@ -516,7 +516,7 @@ public class TestFSEditLogLoader {
 
       //set the storage policy of the directory
       fs.mkdir(new Path(testDir), new FsPermission("755"));
-      fs.getClient().getNamenode().createErasureCodingZone(testDir);
+      fs.getClient().getNamenode().createErasureCodingZone(testDir, null);
 
       //create a file with striped blocks
       Path p = new Path(testFilePath);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9708ae63/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
index a456cad..fe130a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
@@ -135,7 +135,7 @@ public class TestFSImage {
   private void testSaveAndLoadStripedINodeFile(FSNamesystem fsn, Configuration conf,
                                                boolean isUC) throws IOException{
     // contruct a INode with StripedBlock for saving and loading
-    fsn.createErasureCodingZone("/", false);
+    fsn.createErasureCodingZone("/", null, false);
     long id = 123456789;
     byte[] name = "testSaveAndLoadInodeFile_testfile".getBytes();
     PermissionStatus permissionStatus = new PermissionStatus("testuser_a",
@@ -397,7 +397,7 @@ public class TestFSImage {
           .build();
       cluster.waitActive();
       DistributedFileSystem fs = cluster.getFileSystem();
-      fs.getClient().getNamenode().createErasureCodingZone("/");
+      fs.getClient().getNamenode().createErasureCodingZone("/", null);
       Path file = new Path("/striped");
       FSDataOutputStream out = fs.create(file);
       byte[] bytes = DFSTestUtil.generateSequentialBytes(0, BLOCK_SIZE);


[05/12] hadoop git commit: HDFS-8077. Erasure coding: fix bugs in EC zone and symlinks. Contributed by Jing Zhao and Zhe Zhang.

Posted by zh...@apache.org.
HDFS-8077. Erasure coding: fix bugs in EC zone and symlinks. Contributed by Jing Zhao and Zhe Zhang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58d9f264
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58d9f264
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58d9f264

Branch: refs/heads/HDFS-7285
Commit: 58d9f264645bbb10c522ce5df7d69ed6ec8b3e2d
Parents: cc2202c
Author: Jing Zhao <ji...@apache.org>
Authored: Thu Apr 9 17:53:22 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:19 2015 -0700

----------------------------------------------------------------------
 .../BlockInfoStripedUnderConstruction.java         |  2 +-
 .../hdfs/server/blockmanagement/BlockManager.java  | 12 ++++++------
 .../server/namenode/ErasureCodingZoneManager.java  |  7 +++++++
 .../hadoop/hdfs/server/namenode/FSDirectory.java   |  4 ++--
 .../hdfs/server/namenode/FSEditLogLoader.java      | 11 ++++++-----
 .../hdfs/server/namenode/FSImageSerialization.java |  4 ++--
 .../hadoop/hdfs/server/namenode/INodeFile.java     | 17 ++++-------------
 .../hdfs/server/namenode/TestFSEditLogLoader.java  |  4 ++--
 .../hadoop/hdfs/server/namenode/TestFSImage.java   |  2 +-
 .../server/namenode/TestRecoverStripedBlocks.java  |  2 +-
 10 files changed, 32 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
index cfaf3a0..0373314 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
@@ -96,7 +96,7 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped
     for(int i = 0; i < numLocations; i++) {
       // when creating a new block we simply sequentially assign block index to
       // each storage
-      Block blk = new Block(this.getBlockId() + i, this.getGenerationStamp(), 0);
+      Block blk = new Block(this.getBlockId() + i, 0, this.getGenerationStamp());
       replicas[i] = new ReplicaUnderConstruction(blk, targets[i],
           ReplicaState.RBW);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index fcf1421..94aafc7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2450,12 +2450,12 @@ public class BlockManager {
       case COMMITTED:
         if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
           final long reportedGS = reported.getGenerationStamp();
-          return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
+          return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS,
               "block is " + ucState + " and reported genstamp " + reportedGS
               + " does not match genstamp in block map "
               + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
         } else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
-          return new BlockToMarkCorrupt(reported, storedBlock,
+          return new BlockToMarkCorrupt(new Block(reported), storedBlock,
               "block is " + ucState + " and reported length " +
               reported.getNumBytes() + " does not match " +
               "length in block map " + storedBlock.getNumBytes(),
@@ -2466,7 +2466,7 @@ public class BlockManager {
       case UNDER_CONSTRUCTION:
         if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
           final long reportedGS = reported.getGenerationStamp();
-          return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
+          return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS,
               "block is " + ucState + " and reported state " + reportedState
               + ", But reported genstamp " + reportedGS
               + " does not match genstamp in block map "
@@ -2482,7 +2482,7 @@ public class BlockManager {
         return null; // not corrupt
       } else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
         final long reportedGS = reported.getGenerationStamp();
-        return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
+        return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS,
             "reported " + reportedState + " replica with genstamp " + reportedGS
             + " does not match COMPLETE block's genstamp in block map "
             + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
@@ -2497,7 +2497,7 @@ public class BlockManager {
               "complete with the same genstamp");
           return null;
         } else {
-          return new BlockToMarkCorrupt(reported, storedBlock,
+          return new BlockToMarkCorrupt(new Block(reported), storedBlock,
               "reported replica has invalid state " + reportedState,
               Reason.INVALID_STATE);
         }
@@ -2510,7 +2510,7 @@ public class BlockManager {
       " on " + dn + " size " + storedBlock.getNumBytes();
       // log here at WARN level since this is really a broken HDFS invariant
       LOG.warn(msg);
-      return new BlockToMarkCorrupt(reported, storedBlock, msg,
+      return new BlockToMarkCorrupt(new Block(reported), storedBlock, msg,
           Reason.INVALID_STATE);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
index d4ff7c5..606e804 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
@@ -59,6 +59,13 @@ public class ErasureCodingZoneManager {
       if (inode == null) {
         continue;
       }
+      // We don't allow symlinks in an EC zone, or pointing to a file/dir in
+      // an EC. Therefore if a symlink is encountered, the dir shouldn't have
+      // EC
+      // TODO: properly support symlinks in EC zones
+      if (inode.isSymlink()) {
+        return false;
+      }
       final List<XAttr> xAttrs = inode.getXAttrFeature() == null ?
           new ArrayList<XAttr>(0)
           : inode.getXAttrFeature().getXAttrs();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 53d2040..d68128a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -468,8 +468,8 @@ public class FSDirectory implements Closeable {
     try {
       INodesInPath iip = addINode(existing, newNode);
       if (iip != null) {
-        // TODO: we will no longer use storage policy for "Erasure Coding Zone"
-        if (newNode.isStriped()) {
+        // check if the file is in an EC zone
+        if (getECPolicy(iip)) {
           newNode.addStripedBlocksFeature();
         }
         if (aclEntries != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 89cfe05..f530772 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -417,7 +417,7 @@ public class FSEditLogLoader {
       newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
       newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
       // TODO whether the file is striped should later be retrieved from iip
-      updateBlocks(fsDir, addCloseOp, iip, newFile, newFile.isStriped());
+      updateBlocks(fsDir, addCloseOp, iip, newFile, fsDir.getECPolicy(iip));
       break;
     }
     case OP_CLOSE: {
@@ -438,7 +438,7 @@ public class FSEditLogLoader {
       file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
       file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
       // TODO whether the file is striped should later be retrieved from iip
-      updateBlocks(fsDir, addCloseOp, iip, file, file.isStriped());
+      updateBlocks(fsDir, addCloseOp, iip, file, fsDir.getECPolicy(iip));
 
       // Now close the file
       if (!file.isUnderConstruction() &&
@@ -497,7 +497,7 @@ public class FSEditLogLoader {
       INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
       // Update in-memory data structures
       // TODO whether the file is striped should later be retrieved from iip
-      updateBlocks(fsDir, updateOp, iip, oldFile, oldFile.isStriped());
+      updateBlocks(fsDir, updateOp, iip, oldFile, fsDir.getECPolicy(iip));
       
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
@@ -511,10 +511,11 @@ public class FSEditLogLoader {
         FSNamesystem.LOG.debug(op.opCode + ": " + path +
             " new block id : " + addBlockOp.getLastBlock().getBlockId());
       }
-      INodeFile oldFile = INodeFile.valueOf(fsDir.getINode(path), path);
+      INodesInPath iip = fsDir.getINodesInPath(path, true);
+      INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
       // add the new block to the INodeFile
       // TODO whether the file is striped should later be retrieved from iip
-      addNewBlock(addBlockOp, oldFile, oldFile.isStriped());
+      addNewBlock(addBlockOp, oldFile, fsDir.getECPolicy(iip));
       break;
     }
     case OP_SET_REPLICATION: {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index 1e58858..58244e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@ -207,7 +207,7 @@ public class FSImageSerialization {
     out.writeLong(cons.getModificationTime());
     out.writeLong(cons.getPreferredBlockSize());
     // whether the file has striped blocks
-    out.writeBoolean(cons.isWithStripedBlocks());
+    out.writeBoolean(cons.isStriped());
     writeBlocks(cons.getBlocks(), out);
     cons.getPermissionStatus().write(out);
 
@@ -233,7 +233,7 @@ public class FSImageSerialization {
     out.writeLong(file.getAccessTime());
     out.writeLong(file.getPreferredBlockSize());
     // whether the file has striped blocks
-    out.writeBoolean(file.isWithStripedBlocks());
+    out.writeBoolean(file.isStriped());
     writeBlocks(file.getBlocks(), out);
     SnapshotFSImageFormat.saveFileDiffList(file, out);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index f95e54e..b5c510e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -185,17 +185,13 @@ public class INodeFile extends INodeWithAdditionalFields
   public FileWithStripedBlocksFeature addStripedBlocksFeature() {
     assert blocks == null || blocks.length == 0:
         "The file contains contiguous blocks";
-    assert !isWithStripedBlocks();
+    assert !isStriped();
     this.setFileReplication((short) 0);
     FileWithStripedBlocksFeature sb = new FileWithStripedBlocksFeature();
     addFeature(sb);
     return sb;
   }
 
-  public boolean isWithStripedBlocks() {
-    return getStripedBlocksFeature() != null;
-  }
-
   /** Used to make sure there is no contiguous block related info */
   private boolean hasNoContiguousBlock() {
     return (blocks == null || blocks.length == 0) && getFileReplication() == 0;
@@ -431,7 +427,7 @@ public class INodeFile extends INodeWithAdditionalFields
   /** Set the replication factor of this file. */
   public final INodeFile setFileReplication(short replication,
       int latestSnapshotId) throws QuotaExceededException {
-    Preconditions.checkState(!isWithStripedBlocks(),
+    Preconditions.checkState(!isStriped(),
         "Cannot set replication to a file with striped blocks");
     recordModification(latestSnapshotId);
     setFileReplication(replication);
@@ -653,7 +649,7 @@ public class INodeFile extends INodeWithAdditionalFields
     long nsDelta = 1;
     final long ssDeltaNoReplication;
     short replication;
-    if (isWithStripedBlocks()) {
+    if (isStriped()) {
       return computeQuotaUsageWithStriped(bsps, counts);
     }
     FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
@@ -695,11 +691,6 @@ public class INodeFile extends INodeWithAdditionalFields
 
   /**
    * Compute quota of striped file
-   * @param bsps
-   * @param counts
-   * @param useCache
-   * @param lastSnapshotId
-   * @return quota counts
    */
   public final QuotaCounts computeQuotaUsageWithStriped(
       BlockStoragePolicySuite bsps, QuotaCounts counts) {
@@ -828,7 +819,7 @@ public class INodeFile extends INodeWithAdditionalFields
    * Use preferred block size for the last block if it is under construction.
    */
   public final long storagespaceConsumed() {
-    if (isWithStripedBlocks()) {
+    if (isStriped()) {
       return storagespaceConsumedWithStriped();
     } else {
       return storagespaceConsumedNoReplication() * getBlockReplication();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
index 407d07e..0eeb7f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
@@ -472,7 +472,7 @@ public class TestFSEditLogLoader {
       INodeFile inodeLoaded = (INodeFile)fns.getFSDirectory()
           .getINode(testFilePath);
 
-      assertTrue(inodeLoaded.isWithStripedBlocks());
+      assertTrue(inodeLoaded.isStriped());
 
       BlockInfoStriped[] blks = (BlockInfoStriped[])inodeLoaded.getBlocks();
       assertEquals(1, blks.length);
@@ -551,7 +551,7 @@ public class TestFSEditLogLoader {
       INodeFile inodeLoaded = (INodeFile)fns.getFSDirectory()
           .getINode(testFilePath);
 
-      assertTrue(inodeLoaded.isWithStripedBlocks());
+      assertTrue(inodeLoaded.isStriped());
 
       BlockInfoStriped[] blks = (BlockInfoStriped[])inodeLoaded.getBlocks();
       assertEquals(1, blks.length);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
index 83f01c6..a456cad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
@@ -199,7 +199,7 @@ public class TestFSImage {
     assertEquals(mtime, fileByLoaded.getModificationTime());
     assertEquals(isUC ? mtime : atime, fileByLoaded.getAccessTime());
     assertEquals(0, fileByLoaded.getContiguousBlocks().length);
-    assertEquals(0, fileByLoaded.getBlockReplication());
+    assertEquals(0, fileByLoaded.getFileReplication());
     assertEquals(preferredBlockSize, fileByLoaded.getPreferredBlockSize());
 
     //check the BlockInfoStriped

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58d9f264/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
index b2ff6c8..4292f9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -84,7 +84,7 @@ public class TestRecoverStripedBlocks {
     final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
         .getINode4Write(filePath.toString()).asFile();
     assertFalse(fileNode.isUnderConstruction());
-    assertTrue(fileNode.isWithStripedBlocks());
+    assertTrue(fileNode.isStriped());
     BlockInfo[] blocks = fileNode.getBlocks();
     assertEquals(numBlocks, blocks.length);
     for (BlockInfo blk : blocks) {


[11/12] hadoop git commit: HDFS-8090. Erasure Coding: Add RPC to client-namenode to list all ECSchemas loaded in Namenode. (Contributed by Vinayakumar B)

Posted by zh...@apache.org.
HDFS-8090. Erasure Coding: Add RPC to client-namenode to list all ECSchemas loaded in Namenode. (Contributed by Vinayakumar B)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/06562136
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/06562136
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/06562136

Branch: refs/heads/HDFS-7285
Commit: 065621364592d68fabc98d2f8f7aca50d2cbdd8b
Parents: 0331f20
Author: Vinayakumar B <vi...@apache.org>
Authored: Fri Apr 10 15:07:32 2015 +0530
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:20 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/io/erasurecode/ECSchema.java  |  4 +-
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |  5 +-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 11 ++++
 .../hadoop/hdfs/protocol/ClientProtocol.java    | 10 ++++
 ...tNamenodeProtocolServerSideTranslatorPB.java | 19 +++++++
 .../ClientNamenodeProtocolTranslatorPB.java     | 26 ++++++++-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |  5 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 17 ++++++
 .../hdfs/server/namenode/NameNodeRpcServer.java |  9 +++-
 .../src/main/proto/ClientNamenodeProtocol.proto |  9 ++++
 .../hadoop-hdfs/src/main/proto/hdfs.proto       |  3 +-
 .../org/apache/hadoop/hdfs/TestECSchemas.java   | 57 ++++++++++++++++++++
 12 files changed, 164 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
index 8c3310e..32077f6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
@@ -123,12 +123,12 @@ public final class ECSchema {
 
     this.chunkSize = DEFAULT_CHUNK_SIZE;
     try {
-      if (options.containsKey(CHUNK_SIZE_KEY)) {
+      if (this.options.containsKey(CHUNK_SIZE_KEY)) {
         this.chunkSize = Integer.parseInt(options.get(CHUNK_SIZE_KEY));
       }
     } catch (NumberFormatException e) {
       throw new IllegalArgumentException("Option value " +
-          options.get(CHUNK_SIZE_KEY) + " for " + CHUNK_SIZE_KEY +
+          this.options.get(CHUNK_SIZE_KEY) + " for " + CHUNK_SIZE_KEY +
           " is found. It should be an integer");
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 753795a..5250dfa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -58,4 +58,7 @@
 
     HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
 
-    HDFS-7889. Subclass DFSOutputStream to support writing striping layout files. (Li Bo via Kai Zheng)
\ No newline at end of file
+    HDFS-7889. Subclass DFSOutputStream to support writing striping layout files. (Li Bo via Kai Zheng)
+
+    HDFS-8090. Erasure Coding: Add RPC to client-namenode to list all
+    ECSchemas loaded in Namenode. (vinayakumarb)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index b2a69dd..6339d30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -163,6 +163,7 @@ import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -3105,6 +3106,16 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  public ECSchema[] getECSchemas() throws IOException {
+    checkOpen();
+    TraceScope scope = Trace.startSpan("getECSchemas", traceSampler);
+    try {
+      return namenode.getECSchemas();
+    } finally {
+      scope.close();
+    }
+  }
+
   public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
     return new DFSInotifyEventInputStream(traceSampler, namenode);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 45d92f3..7f5ac49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.retry.AtMostOnce;
 import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.security.AccessControlException;
@@ -1474,4 +1475,13 @@ public interface ClientProtocol {
    */
   @Idempotent
   public ECInfo getErasureCodingInfo(String src) throws IOException;
+
+  /**
+   * Gets list of ECSchemas loaded in Namenode
+   *
+   * @return Returns the list of ECSchemas loaded at Namenode
+   * @throws IOException
+   */
+  @Idempotent
+  public ECSchema[] getECSchemas() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 1493e52..33207ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -106,6 +106,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDat
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetECSchemasRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetECSchemasResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoRequestProto;
@@ -218,6 +220,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
 import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
@@ -1530,4 +1533,20 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public GetECSchemasResponseProto getECSchemas(RpcController controller,
+      GetECSchemasRequestProto request) throws ServiceException {
+    try {
+      ECSchema[] ecSchemas = server.getECSchemas();
+      GetECSchemasResponseProto.Builder resBuilder = GetECSchemasResponseProto
+          .newBuilder();
+      for (ECSchema ecSchema : ecSchemas) {
+        resBuilder.addSchemas(PBHelper.convertECSchema(ecSchema));
+      }
+      return resBuilder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 568da68..0211522 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -107,6 +107,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDat
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetECSchemasRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetECSchemasResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoResponseProto;
@@ -165,10 +167,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Update
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateErasureCodingZoneRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateErasureCodingZoneResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
+import org.apache.hadoop.hdfs.protocol.proto.*;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
@@ -180,6 +183,7 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolTranslator;
@@ -237,6 +241,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
   VOID_GET_STORAGE_POLICIES_REQUEST =
       GetStoragePoliciesRequestProto.newBuilder().build();
 
+  private final static GetECSchemasRequestProto
+  VOID_GET_ECSCHEMAS_REQUEST = GetECSchemasRequestProto
+      .newBuilder().build();
+
   public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) {
     rpcProxy = proxy;
   }
@@ -1550,4 +1558,20 @@ public class ClientNamenodeProtocolTranslatorPB implements
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+
+  @Override
+  public ECSchema[] getECSchemas() throws IOException {
+    try {
+      GetECSchemasResponseProto response = rpcProxy.getECSchemas(null,
+          VOID_GET_ECSCHEMAS_REQUEST);
+      ECSchema[] schemas = new ECSchema[response.getSchemasCount()];
+      int i = 0;
+      for (ECSchemaProto schemaProto : response.getSchemasList()) {
+        schemas[i++] = PBHelper.convertECSchema(schemaProto);
+      }
+      return schemas;
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index afdb302..3d0e9bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -3122,8 +3122,6 @@ public class PBHelper {
     for (ECSchemaOptionEntryProto option : optionsList) {
       options.put(option.getKey(), option.getValue());
     }
-    // include chunksize in options.
-    options.put(ECSchema.CHUNK_SIZE_KEY, String.valueOf(schema.getChunkSize()));
     return new ECSchema(schema.getSchemaName(), schema.getCodecName(),
         schema.getDataUnits(), schema.getParityUnits(), options);
   }
@@ -3133,8 +3131,7 @@ public class PBHelper {
         .setSchemaName(schema.getSchemaName())
         .setCodecName(schema.getCodecName())
         .setDataUnits(schema.getNumDataUnits())
-        .setParityUnits(schema.getNumParityUnits())
-        .setChunkSize(schema.getChunkSize());
+        .setParityUnits(schema.getNumParityUnits());
     Set<Entry<String, String>> entrySet = schema.getOptions().entrySet();
     for (Entry<String, String> entry : entrySet) {
       builder.addOptions(ECSchemaOptionEntryProto.newBuilder()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 4c86bd3..a504907 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -8169,6 +8169,23 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return null;
   }
 
+  /**
+   * Get available ECSchemas
+   */
+  ECSchema[] getECSchemas() throws IOException {
+    checkOperation(OperationCategory.READ);
+    waitForLoadingFSImage();
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      // TODO HDFS-7866 Need to return all schemas maintained by Namenode
+      ECSchema defaultSchema = ECSchemaManager.getSystemDefaultSchema();
+      return new ECSchema[] { defaultSchema };
+    } finally {
+      readUnlock();
+    }
+  }
+
   void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
                 boolean logRetryCache)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index fe9b901..85ac1e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -142,6 +142,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RetryCache;
@@ -2044,9 +2045,15 @@ class NameNodeRpcServer implements NamenodeProtocols {
     nn.spanReceiverHost.removeSpanReceiver(id);
   }
 
-  @Override // ClientNameNodeProtocol
+  @Override // ClientProtocol
   public ECInfo getErasureCodingInfo(String src) throws IOException {
     checkNNStartup();
     return namesystem.getErasureCodingInfo(src);
   }
+
+  @Override // ClientProtocol
+  public ECSchema[] getECSchemas() throws IOException {
+    checkNNStartup();
+    return namesystem.getECSchemas();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index 9488aed..3389a22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -729,6 +729,13 @@ message GetErasureCodingInfoResponseProto {
   optional ECInfoProto ECInfo = 1;
 }
 
+message GetECSchemasRequestProto { // void request
+}
+
+message GetECSchemasResponseProto {
+  repeated ECSchemaProto schemas = 1;
+}
+
 service ClientNamenodeProtocol {
   rpc getBlockLocations(GetBlockLocationsRequestProto)
       returns(GetBlockLocationsResponseProto);
@@ -879,4 +886,6 @@ service ClientNamenodeProtocol {
       returns(GetEditsFromTxidResponseProto);
   rpc getErasureCodingInfo(GetErasureCodingInfoRequestProto)
       returns(GetErasureCodingInfoResponseProto);
+  rpc getECSchemas(GetECSchemasRequestProto)
+      returns(GetECSchemasResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index 1314ea0..0507538 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -637,8 +637,7 @@ message ECSchemaProto {
   required string codecName = 2;
   required uint32 dataUnits = 3;
   required uint32 parityUnits = 4;
-  required uint32 chunkSize = 5;
-  repeated ECSchemaOptionEntryProto options = 6;
+  repeated ECSchemaOptionEntryProto options = 5;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06562136/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java
new file mode 100644
index 0000000..07e1359
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestECSchemas {
+  private MiniDFSCluster cluster;
+
+  @Before
+  public void before() throws IOException {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(0)
+        .build();
+    cluster.waitActive();
+  }
+
+  @After
+  public void after() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testGetECSchemas() throws Exception {
+    ECSchema[] ecSchemas = cluster.getFileSystem().getClient().getECSchemas();
+    // TODO update assertion after HDFS-7866
+    assertNotNull(ecSchemas);
+    assertEquals("Should have only one ecSchema", 1, ecSchemas.length);
+    assertEquals("Returned schemas should have only default schema",
+        ECSchemaManager.getSystemDefaultSchema(), ecSchemas[0]);
+  }
+}


[06/12] hadoop git commit: HDFS-8104 Make hard-coded values consistent with the system default schema first before remove them. Contributed by Kai Zheng

Posted by zh...@apache.org.
HDFS-8104 Make hard-coded values consistent with the system default schema first before remove them. Contributed by Kai Zheng


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d8c0ecfb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d8c0ecfb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d8c0ecfb

Branch: refs/heads/HDFS-7285
Commit: d8c0ecfb0be9c998a32480d14b392a378cc9e6a9
Parents: cc11b5c
Author: Kai Zheng <ka...@intel.com>
Authored: Fri Apr 10 00:16:28 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:19 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   4 +-
 .../hadoop/hdfs/protocol/HdfsConstants.java     |  12 +-
 .../hadoop/hdfs/TestPlanReadPortions.java       | 142 +++++++++++++++++++
 .../apache/hadoop/hdfs/TestReadStripedFile.java | 112 ---------------
 4 files changed, 154 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8c0ecfb/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 5078a15..1e695c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -54,4 +54,6 @@
     HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from
     NameNode (vinayakumarb)
     
-    HDFS-8074. Define a system-wide default EC schema. (Kai Zheng)
\ No newline at end of file
+    HDFS-8074. Define a system-wide default EC schema. (Kai Zheng)
+
+    HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8c0ecfb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index a888aa4..11c5260 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -180,11 +180,17 @@ public class HdfsConstants {
   public static final byte WARM_STORAGE_POLICY_ID = 5;
   public static final byte COLD_STORAGE_POLICY_ID = 2;
 
-  public static final byte NUM_DATA_BLOCKS = 3;
-  public static final byte NUM_PARITY_BLOCKS = 2;
+
   public static final long BLOCK_GROUP_INDEX_MASK = 15;
   public static final byte MAX_BLOCKS_IN_GROUP = 16;
 
+  /*
+   * These values correspond to the values used by the system default schema.
+   * TODO: to be removed once all places use schema.
+   */
+
+  public static final byte NUM_DATA_BLOCKS = 6;
+  public static final byte NUM_PARITY_BLOCKS = 3;
   // The chunk size for striped block which is used by erasure coding
-  public static final int BLOCK_STRIPED_CELL_SIZE = 128 * 1024;
+  public static final int BLOCK_STRIPED_CELL_SIZE = 256 * 1024;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8c0ecfb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
new file mode 100644
index 0000000..cf84b30
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.junit.Test;
+
+import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion;
+import static org.junit.Assert.*;
+
+public class TestPlanReadPortions {
+
+  // We only support this as num of data blocks. It might be good enough for now
+  // for the purpose, even not flexible yet for any number in a schema.
+  private final short GROUP_SIZE = 3;
+  private final int CELLSIZE = 128 * 1024;
+
+  private void testPlanReadPortions(int startInBlk, int length,
+      int bufferOffset, int[] readLengths, int[] offsetsInBlock,
+      int[][] bufferOffsets, int[][] bufferLengths) {
+    ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE,
+        CELLSIZE, startInBlk, length, bufferOffset);
+    assertEquals(GROUP_SIZE, results.length);
+
+    for (int i = 0; i < GROUP_SIZE; i++) {
+      assertEquals(readLengths[i], results[i].getReadLength());
+      assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
+      final int[] bOffsets = results[i].getOffsets();
+      assertArrayEquals(bufferOffsets[i], bOffsets);
+      final int[] bLengths = results[i].getLengths();
+      assertArrayEquals(bufferLengths[i], bLengths);
+    }
+  }
+
+  /**
+   * Test {@link DFSStripedInputStream#planReadPortions}
+   */
+  @Test
+  public void testPlanReadPortions() {
+    /**
+     * start block offset is 0, read cellSize - 10
+     */
+    testPlanReadPortions(0, CELLSIZE - 10, 0,
+        new int[]{CELLSIZE - 10, 0, 0}, new int[]{0, 0, 0},
+        new int[][]{new int[]{0}, new int[]{}, new int[]{}},
+        new int[][]{new int[]{CELLSIZE - 10}, new int[]{}, new int[]{}});
+
+    /**
+     * start block offset is 0, read 3 * cellSize
+     */
+    testPlanReadPortions(0, GROUP_SIZE * CELLSIZE, 0,
+        new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, new int[]{0, 0, 0},
+        new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{CELLSIZE * 2}},
+        new int[][]{new int[]{CELLSIZE}, new int[]{CELLSIZE}, new int[]{CELLSIZE}});
+
+    /**
+     * start block offset is 0, read cellSize + 10
+     */
+    testPlanReadPortions(0, CELLSIZE + 10, 0,
+        new int[]{CELLSIZE, 10, 0}, new int[]{0, 0, 0},
+        new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{}},
+        new int[][]{new int[]{CELLSIZE}, new int[]{10}, new int[]{}});
+
+    /**
+     * start block offset is 0, read 5 * cellSize + 10, buffer start offset is 100
+     */
+    testPlanReadPortions(0, 5 * CELLSIZE + 10, 100,
+        new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE + 10}, new int[]{0, 0, 0},
+        new int[][]{new int[]{100, 100 + CELLSIZE * GROUP_SIZE},
+            new int[]{100 + CELLSIZE, 100 + CELLSIZE * 4},
+            new int[]{100 + CELLSIZE * 2, 100 + CELLSIZE * 5}},
+        new int[][]{new int[]{CELLSIZE, CELLSIZE},
+            new int[]{CELLSIZE, CELLSIZE},
+            new int[]{CELLSIZE, 10}});
+
+    /**
+     * start block offset is 2, read 3 * cellSize
+     */
+    testPlanReadPortions(2, GROUP_SIZE * CELLSIZE, 100,
+        new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
+        new int[]{2, 0, 0},
+        new int[][]{new int[]{100, 100 + GROUP_SIZE * CELLSIZE - 2},
+            new int[]{100 + CELLSIZE - 2},
+            new int[]{100 + CELLSIZE * 2 - 2}},
+        new int[][]{new int[]{CELLSIZE - 2, 2},
+            new int[]{CELLSIZE},
+            new int[]{CELLSIZE}});
+
+    /**
+     * start block offset is 2, read 3 * cellSize + 10
+     */
+    testPlanReadPortions(2, GROUP_SIZE * CELLSIZE + 10, 0,
+        new int[]{CELLSIZE + 10, CELLSIZE, CELLSIZE},
+        new int[]{2, 0, 0},
+        new int[][]{new int[]{0, GROUP_SIZE * CELLSIZE - 2},
+            new int[]{CELLSIZE - 2},
+            new int[]{CELLSIZE * 2 - 2}},
+        new int[][]{new int[]{CELLSIZE - 2, 12},
+            new int[]{CELLSIZE},
+            new int[]{CELLSIZE}});
+
+    /**
+     * start block offset is cellSize * 2 - 1, read 5 * cellSize + 10
+     */
+    testPlanReadPortions(CELLSIZE * 2 - 1, 5 * CELLSIZE + 10, 0,
+        new int[]{CELLSIZE * 2, CELLSIZE + 10, CELLSIZE * 2},
+        new int[]{CELLSIZE, CELLSIZE - 1, 0},
+        new int[][]{new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1},
+            new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1},
+            new int[]{1, 3 * CELLSIZE + 1}},
+        new int[][]{new int[]{CELLSIZE, CELLSIZE},
+            new int[]{1, CELLSIZE, 9},
+            new int[]{CELLSIZE, CELLSIZE}});
+
+    /**
+     * start block offset is cellSize * 6 - 1, read 7 * cellSize + 10
+     */
+    testPlanReadPortions(CELLSIZE * 6 - 1, 7 * CELLSIZE + 10, 0,
+        new int[]{CELLSIZE * 3, CELLSIZE * 2 + 9, CELLSIZE * 2 + 1},
+        new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE * 2 - 1},
+        new int[][]{new int[]{1, 3 * CELLSIZE + 1, 6 * CELLSIZE + 1},
+            new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1, 7 * CELLSIZE + 1},
+            new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}},
+        new int[][]{new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
+            new int[]{CELLSIZE, CELLSIZE, 9},
+            new int[]{1, CELLSIZE, CELLSIZE}});
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8c0ecfb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
index 0032bdd..849e12e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
-import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -78,117 +77,6 @@ public class TestReadStripedFile {
     }
   }
 
-  private void testPlanReadPortions(int startInBlk, int length,
-      int bufferOffset, int[] readLengths, int[] offsetsInBlock,
-      int[][] bufferOffsets, int[][] bufferLengths) {
-    ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE,
-        CELLSIZE, startInBlk, length, bufferOffset);
-    assertEquals(GROUP_SIZE, results.length);
-
-    for (int i = 0; i < GROUP_SIZE; i++) {
-      assertEquals(readLengths[i], results[i].getReadLength());
-      assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
-      final int[] bOffsets = results[i].getOffsets();
-      assertArrayEquals(bufferOffsets[i], bOffsets);
-      final int[] bLengths = results[i].getLengths();
-      assertArrayEquals(bufferLengths[i], bLengths);
-    }
-  }
-
-  /**
-   * Test {@link DFSStripedInputStream#planReadPortions}
-   */
-  @Test
-  public void testPlanReadPortions() {
-    /**
-     * start block offset is 0, read cellSize - 10
-     */
-    testPlanReadPortions(0, CELLSIZE - 10, 0,
-        new int[]{CELLSIZE - 10, 0, 0}, new int[]{0, 0, 0},
-        new int[][]{new int[]{0}, new int[]{}, new int[]{}},
-        new int[][]{new int[]{CELLSIZE - 10}, new int[]{}, new int[]{}});
-
-    /**
-     * start block offset is 0, read 3 * cellSize
-     */
-    testPlanReadPortions(0, GROUP_SIZE * CELLSIZE, 0,
-        new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, new int[]{0, 0, 0},
-        new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{CELLSIZE * 2}},
-        new int[][]{new int[]{CELLSIZE}, new int[]{CELLSIZE}, new int[]{CELLSIZE}});
-
-    /**
-     * start block offset is 0, read cellSize + 10
-     */
-    testPlanReadPortions(0, CELLSIZE + 10, 0,
-        new int[]{CELLSIZE, 10, 0}, new int[]{0, 0, 0},
-        new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{}},
-        new int[][]{new int[]{CELLSIZE}, new int[]{10}, new int[]{}});
-
-    /**
-     * start block offset is 0, read 5 * cellSize + 10, buffer start offset is 100
-     */
-    testPlanReadPortions(0, 5 * CELLSIZE + 10, 100,
-        new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE + 10}, new int[]{0, 0, 0},
-        new int[][]{new int[]{100, 100 + CELLSIZE * GROUP_SIZE},
-            new int[]{100 + CELLSIZE, 100 + CELLSIZE * 4},
-            new int[]{100 + CELLSIZE * 2, 100 + CELLSIZE * 5}},
-        new int[][]{new int[]{CELLSIZE, CELLSIZE},
-            new int[]{CELLSIZE, CELLSIZE},
-            new int[]{CELLSIZE, 10}});
-
-    /**
-     * start block offset is 2, read 3 * cellSize
-     */
-    testPlanReadPortions(2, GROUP_SIZE * CELLSIZE, 100,
-        new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
-        new int[]{2, 0, 0},
-        new int[][]{new int[]{100, 100 + GROUP_SIZE * CELLSIZE - 2},
-            new int[]{100 + CELLSIZE - 2},
-            new int[]{100 + CELLSIZE * 2 - 2}},
-        new int[][]{new int[]{CELLSIZE - 2, 2},
-            new int[]{CELLSIZE},
-            new int[]{CELLSIZE}});
-
-    /**
-     * start block offset is 2, read 3 * cellSize + 10
-     */
-    testPlanReadPortions(2, GROUP_SIZE * CELLSIZE + 10, 0,
-        new int[]{CELLSIZE + 10, CELLSIZE, CELLSIZE},
-        new int[]{2, 0, 0},
-        new int[][]{new int[]{0, GROUP_SIZE * CELLSIZE - 2},
-            new int[]{CELLSIZE - 2},
-            new int[]{CELLSIZE * 2 - 2}},
-        new int[][]{new int[]{CELLSIZE - 2, 12},
-            new int[]{CELLSIZE},
-            new int[]{CELLSIZE}});
-
-    /**
-     * start block offset is cellSize * 2 - 1, read 5 * cellSize + 10
-     */
-    testPlanReadPortions(CELLSIZE * 2 - 1, 5 * CELLSIZE + 10, 0,
-        new int[]{CELLSIZE * 2, CELLSIZE + 10, CELLSIZE * 2},
-        new int[]{CELLSIZE, CELLSIZE - 1, 0},
-        new int[][]{new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1},
-            new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1},
-            new int[]{1, 3 * CELLSIZE + 1}},
-        new int[][]{new int[]{CELLSIZE, CELLSIZE},
-            new int[]{1, CELLSIZE, 9},
-            new int[]{CELLSIZE, CELLSIZE}});
-
-    /**
-     * start block offset is cellSize * 6 - 1, read 7 * cellSize + 10
-     */
-    testPlanReadPortions(CELLSIZE * 6 - 1, 7 * CELLSIZE + 10, 0,
-        new int[]{CELLSIZE * 3, CELLSIZE * 2 + 9, CELLSIZE * 2 + 1},
-        new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE * 2 - 1},
-        new int[][]{new int[]{1, 3 * CELLSIZE + 1, 6 * CELLSIZE + 1},
-            new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1, 7 * CELLSIZE + 1},
-            new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}},
-        new int[][]{new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
-            new int[]{CELLSIZE, CELLSIZE, 9},
-            new int[]{1, CELLSIZE, CELLSIZE}});
-  }
-
   private LocatedStripedBlock createDummyLocatedBlock() {
     final long blockGroupID = -1048576;
     DatanodeInfo[] locs = new DatanodeInfo[TOTAL_SIZE];


[10/12] hadoop git commit: HDFS-7889 Subclass DFSOutputStream to support writing striping layout files. Contributed by Li Bo

Posted by zh...@apache.org.
HDFS-7889 Subclass DFSOutputStream to support writing striping layout files. Contributed by Li Bo


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0331f20a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0331f20a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0331f20a

Branch: refs/heads/HDFS-7285
Commit: 0331f20aec85312eec1093a5030fb3d94d5cedef
Parents: 58d9f26
Author: Kai Zheng <ka...@intel.com>
Authored: Sat Apr 11 01:03:37 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:20 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   4 +-
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  13 +-
 .../java/org/apache/hadoop/hdfs/DFSPacket.java  |  26 +-
 .../hadoop/hdfs/DFSStripedOutputStream.java     | 439 +++++++++++++++++++
 .../org/apache/hadoop/hdfs/DataStreamer.java    |  12 +-
 .../apache/hadoop/hdfs/StripedDataStreamer.java | 241 ++++++++++
 .../hadoop/hdfs/TestDFSStripedOutputStream.java | 311 +++++++++++++
 7 files changed, 1031 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 1e695c4..753795a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -56,4 +56,6 @@
     
     HDFS-8074. Define a system-wide default EC schema. (Kai Zheng)
 
-    HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
\ No newline at end of file
+    HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
+
+    HDFS-7889. Subclass DFSOutputStream to support writing striping layout files. (Li Bo via Kai Zheng)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 8cde274..8270331 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -268,8 +268,14 @@ public class DFSOutputStream extends FSOutputSummer
         }
       }
       Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
-      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
-          flag, progress, checksum, favoredNodes);
+      final DFSOutputStream out;
+      if(stat.getReplication() == 0) {
+        out = new DFSStripedOutputStream(dfsClient, src, stat,
+            flag, progress, checksum, favoredNodes);
+      } else {
+        out = new DFSOutputStream(dfsClient, src, stat,
+            flag, progress, checksum, favoredNodes);
+      }
       out.start();
       return out;
     } finally {
@@ -347,6 +353,9 @@ public class DFSOutputStream extends FSOutputSummer
       String[] favoredNodes) throws IOException {
     TraceScope scope =
         dfsClient.getPathTraceScope("newStreamForAppend", src);
+	if(stat.getReplication() == 0) {
+      throw new IOException("Not support appending to a striping layout file yet.");
+    }
     try {
       final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
           progress, lastBlock, stat, checksum);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
index 22055c3..9cd1ec1 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
 
@@ -113,6 +114,19 @@ class DFSPacket {
     dataPos += len;
   }
 
+  synchronized void writeData(ByteBuffer inBuffer, int len)
+      throws ClosedChannelException {
+    checkBuffer();
+    len =  len > inBuffer.remaining() ? inBuffer.remaining() : len;
+    if (dataPos + len > buf.length) {
+      throw new BufferOverflowException();
+    }
+    for (int i = 0; i < len; i++) {
+      buf[dataPos + i] = inBuffer.get();
+    }
+    dataPos += len;
+  }
+
   /**
    * Write checksums to this packet
    *
@@ -222,7 +236,7 @@ class DFSPacket {
    *
    * @return true if the packet is the last packet
    */
-  boolean isLastPacketInBlock(){
+  boolean isLastPacketInBlock() {
     return lastPacketInBlock;
   }
 
@@ -231,7 +245,7 @@ class DFSPacket {
    *
    * @return the sequence number of this packet
    */
-  long getSeqno(){
+  long getSeqno() {
     return seqno;
   }
 
@@ -240,14 +254,14 @@ class DFSPacket {
    *
    * @return the number of chunks in this packet
    */
-  synchronized int getNumChunks(){
+  synchronized int getNumChunks() {
     return numChunks;
   }
 
   /**
    * increase the number of chunks by one
    */
-  synchronized void incNumChunks(){
+  synchronized void incNumChunks() {
     numChunks++;
   }
 
@@ -256,7 +270,7 @@ class DFSPacket {
    *
    * @return the maximum number of packets
    */
-  int getMaxChunks(){
+  int getMaxChunks() {
     return maxChunks;
   }
 
@@ -265,7 +279,7 @@ class DFSPacket {
    *
    * @param syncBlock if to sync block
    */
-  synchronized void setSyncBlock(boolean syncBlock){
+  synchronized void setSyncBlock(boolean syncBlock) {
     this.syncBlock = syncBlock;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
new file mode 100644
index 0000000..aded4fe
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+
+/****************************************************************
+ * The DFSStripedOutputStream class supports writing files in striped
+ * layout. Each stripe contains a sequence of cells and multiple
+ * {@link StripedDataStreamer}s in DFSStripedOutputStream are responsible
+ * for writing the cells to different datanodes.
+ *
+ ****************************************************************/
+
+@InterfaceAudience.Private
+public class DFSStripedOutputStream extends DFSOutputStream {
+
+  private final List<StripedDataStreamer> streamers;
+  /**
+   * Size of each striping cell, must be a multiple of bytesPerChecksum
+   */
+  private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private ByteBuffer[] cellBuffers;
+  private final short blockGroupBlocks = HdfsConstants.NUM_DATA_BLOCKS
+      + HdfsConstants.NUM_PARITY_BLOCKS;
+  private final short blockGroupDataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  private int curIdx = 0;
+  /* bytes written in current block group */
+  private long currentBlockGroupBytes = 0;
+
+  //TODO: Use ErasureCoder interface (HDFS-7781)
+  private RawErasureEncoder encoder;
+
+  private StripedDataStreamer getLeadingStreamer() {
+    return streamers.get(0);
+  }
+
+  private long getBlockGroupSize() {
+    return blockSize * HdfsConstants.NUM_DATA_BLOCKS;
+  }
+
+  /** Construct a new output stream for creating a file. */
+  DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
+                         EnumSet<CreateFlag> flag, Progressable progress,
+                         DataChecksum checksum, String[] favoredNodes)
+                         throws IOException {
+    super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
+    DFSClient.LOG.info("Creating striped output stream");
+    if (blockGroupBlocks <= 1) {
+      throw new IOException("The block group must contain more than one block.");
+    }
+
+    cellBuffers = new ByteBuffer[blockGroupBlocks];
+    List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>();
+
+    for (int i = 0; i < blockGroupBlocks; i++) {
+      stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(blockGroupBlocks));
+      try {
+        cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
+      } catch (InterruptedException ie) {
+        final InterruptedIOException iioe = new InterruptedIOException(
+            "create cell buffers");
+        iioe.initCause(ie);
+        throw iioe;
+      }
+    }
+    encoder = new RSRawEncoder();
+    encoder.initialize(blockGroupDataBlocks,
+        blockGroupBlocks - blockGroupDataBlocks, cellSize);
+
+    streamers = new ArrayList<>(blockGroupBlocks);
+    for (short i = 0; i < blockGroupBlocks; i++) {
+      StripedDataStreamer streamer = new StripedDataStreamer(stat, null,
+          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
+          i, stripeBlocks);
+      if (favoredNodes != null && favoredNodes.length != 0) {
+        streamer.setFavoredNodes(favoredNodes);
+      }
+      streamers.add(streamer);
+    }
+
+    refreshStreamer();
+  }
+
+  private void refreshStreamer() {
+    streamer = streamers.get(curIdx);
+  }
+
+  private void moveToNextStreamer() {
+    curIdx = (curIdx + 1) % blockGroupBlocks;
+    refreshStreamer();
+  }
+
+  /**
+   * encode the buffers.
+   * After encoding, flip each buffer.
+   *
+   * @param buffers data buffers + parity buffers
+   */
+  private void encode(ByteBuffer[] buffers) {
+    ByteBuffer[] dataBuffers = new ByteBuffer[blockGroupDataBlocks];
+    ByteBuffer[] parityBuffers = new ByteBuffer[blockGroupBlocks - blockGroupDataBlocks];
+    for (int i = 0; i < blockGroupBlocks; i++) {
+      if (i < blockGroupDataBlocks) {
+        dataBuffers[i] = buffers[i];
+      } else {
+        parityBuffers[i - blockGroupDataBlocks] = buffers[i];
+      }
+    }
+    encoder.encode(dataBuffers, parityBuffers);
+  }
+
+  /**
+   * Generate packets from a given buffer
+   *
+   * @param byteBuffer the given buffer to generate packets
+   * @return packets generated
+   * @throws IOException
+   */
+  private List<DFSPacket> generatePackets(ByteBuffer byteBuffer)
+      throws IOException{
+    List<DFSPacket> packets = new ArrayList<>();
+    while (byteBuffer.remaining() > 0) {
+      DFSPacket p = createPacket(packetSize, chunksPerPacket,
+          streamer.getBytesCurBlock(),
+          streamer.getAndIncCurrentSeqno(), false);
+      int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum;
+      int toWrite = byteBuffer.remaining() > maxBytesToPacket ?
+          maxBytesToPacket: byteBuffer.remaining();
+      p.writeData(byteBuffer, toWrite);
+      streamer.incBytesCurBlock(toWrite);
+      packets.add(p);
+    }
+    return packets;
+  }
+
+  @Override
+  protected synchronized void writeChunk(byte[] b, int offset, int len,
+      byte[] checksum, int ckoff, int cklen) throws IOException {
+    super.writeChunk(b, offset, len, checksum, ckoff, cklen);
+
+    if (getSizeOfCellnBuffer(curIdx) <= cellSize) {
+      addToCellBuffer(b, offset, len);
+    } else {
+      String msg = "Writing a chunk should not overflow the cell buffer.";
+      DFSClient.LOG.info(msg);
+      throw new IOException(msg);
+    }
+
+
+    // If current packet has not been enqueued for transmission,
+    // but the cell buffer is full, we need to enqueue the packet
+    if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("DFSClient writeChunk cell buffer full seqno=" +
+            currentPacket.getSeqno() +
+            ", curIdx=" + curIdx +
+            ", src=" + src +
+            ", bytesCurBlock=" + streamer.getBytesCurBlock() +
+            ", blockSize=" + blockSize +
+            ", appendChunk=" + streamer.getAppendChunk());
+      }
+      streamer.waitAndQueuePacket(currentPacket);
+      currentPacket = null;
+      adjustChunkBoundary();
+      endBlock();
+    }
+
+    // Two extra steps are needed when a striping cell is full:
+    // 1. Forward the current index pointer
+    // 2. Generate parity packets if a full stripe of data cells are present
+    if (getSizeOfCellnBuffer(curIdx) == cellSize) {
+      //move curIdx to next cell
+      moveToNextStreamer();
+      //When all data cells in a stripe are ready, we need to encode
+      //them and generate some parity cells. These cells will be
+      //converted to packets and put to their DataStreamer's queue.
+      if (curIdx == blockGroupDataBlocks) {
+        //encode the data cells
+        for (int k = 0; k < blockGroupDataBlocks; k++) {
+          cellBuffers[k].flip();
+        }
+        encode(cellBuffers);
+        for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
+          ByteBuffer parityBuffer = cellBuffers[i];
+          List<DFSPacket> packets = generatePackets(parityBuffer);
+          for (DFSPacket p : packets) {
+            currentPacket = p;
+            streamer.waitAndQueuePacket(currentPacket);
+            currentPacket = null;
+          }
+          endBlock();
+          moveToNextStreamer();
+        }
+        //read next stripe to cellBuffers
+        clearCellBuffers();
+      }
+    }
+  }
+
+  private void addToCellBuffer(byte[] b, int off, int len) {
+    cellBuffers[curIdx].put(b, off, len);
+  }
+
+  private int getSizeOfCellnBuffer(int cellIndex) {
+    return cellBuffers[cellIndex].position();
+  }
+
+  private void clearCellBuffers() {
+    for (int i = 0; i< blockGroupBlocks; i++) {
+      cellBuffers[i].clear();
+    }
+  }
+
+  private int stripeDataSize() {
+    return blockGroupDataBlocks * cellSize;
+  }
+
+  private void notSupported(String headMsg)
+      throws IOException{
+      throw new IOException(
+          headMsg + " is now not supported for striping layout.");
+  }
+
+  @Override
+  public void hflush() throws IOException {
+    notSupported("hflush");
+  }
+
+  @Override
+  public void hsync() throws IOException {
+    notSupported("hsync");
+  }
+
+
+  @Override
+  protected synchronized void start() {
+    for (StripedDataStreamer streamer : streamers) {
+      streamer.start();
+    }
+  }
+
+  @Override
+  synchronized void abort() throws IOException {
+    if (isClosed()) {
+      return;
+    }
+    for (StripedDataStreamer streamer : streamers) {
+      streamer.setLastException(new IOException("Lease timeout of "
+          + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
+    }
+    closeThreads(true);
+    dfsClient.endFileLease(fileId);
+  }
+
+  //TODO: Handle slow writers (HDFS-7786)
+  //Cuurently only check if the leading streamer is terminated
+  boolean isClosed() {
+    return closed || getLeadingStreamer().streamerClosed();
+  }
+
+  // shutdown datastreamer and responseprocessor threads.
+  // interrupt datastreamer if force is true
+  @Override
+  protected void closeThreads(boolean force) throws IOException {
+    StripedDataStreamer leadingStreamer = null;
+    for (StripedDataStreamer streamer : streamers) {
+      try {
+        streamer.close(force);
+        streamer.join();
+        streamer.closeSocket();
+        if (streamer.isLeadingStreamer()) {
+          leadingStreamer = streamer;
+        } else {
+          streamer.countTailingBlockGroupBytes();
+        }
+
+      } catch (InterruptedException e) {
+        throw new IOException("Failed to shutdown streamer");
+      } finally {
+        streamer.setSocketToNull();
+        setClosed();
+      }
+    }
+    leadingStreamer.countTailingBlockGroupBytes();
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    super.write(b);
+    currentBlockGroupBytes = (currentBlockGroupBytes + 1) % getBlockGroupSize();
+  }
+
+  @Override
+  public synchronized void write(byte b[], int off, int len)
+      throws IOException {
+    super.write(b, off, len);
+    currentBlockGroupBytes = (currentBlockGroupBytes + len) % getBlockGroupSize();
+  }
+
+  private void writeParityCellsForLastStripe() throws IOException{
+    if(currentBlockGroupBytes == 0 ||
+        currentBlockGroupBytes % stripeDataSize() == 0)
+      return;
+    int lastStripeLen =(int)(currentBlockGroupBytes % stripeDataSize());
+    // Size of parity cells should equal the size of the first cell, if it
+    // is not full.
+    int parityCellSize = cellSize;
+    int index = lastStripeLen / cellSize;
+    if (lastStripeLen < cellSize) {
+      parityCellSize = lastStripeLen;
+      index++;
+    }
+    for (int i = 0; i < blockGroupBlocks; i++) {
+      if (i >= index) {
+        int position = cellBuffers[i].position();
+        for (int j = 0; j < parityCellSize - position; j++) {
+          cellBuffers[i].put((byte)0);
+        }
+      }
+      cellBuffers[i].flip();
+    }
+    encode(cellBuffers);
+
+    //write parity cells
+    curIdx = blockGroupDataBlocks;
+    refreshStreamer();
+    for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
+      ByteBuffer parityBuffer = cellBuffers[i];
+      List<DFSPacket> packets = generatePackets(parityBuffer);
+      for (DFSPacket p : packets) {
+        currentPacket = p;
+        streamer.waitAndQueuePacket(currentPacket);
+        currentPacket = null;
+      }
+      endBlock();
+      moveToNextStreamer();
+    }
+
+    clearCellBuffers();
+  }
+
+  @Override
+  void setClosed() {
+    super.setClosed();
+    for (int i = 0; i < blockGroupBlocks; i++) {
+      byteArrayManager.release(cellBuffers[i].array());
+      streamers.get(i).release();
+    }
+  }
+
+  @Override
+  protected synchronized void closeImpl() throws IOException {
+    if (isClosed()) {
+      IOException e = getLeadingStreamer().getLastException().getAndSet(null);
+      if (e == null)
+        return;
+      else
+        throw e;
+    }
+
+    try {
+      // flush from all upper layers
+      flushBuffer();
+      if (currentPacket != null) {
+        streamer.waitAndQueuePacket(currentPacket);
+        currentPacket = null;
+      }
+      //if the last stripe is incomplete, generate and write parity cells
+      writeParityCellsForLastStripe();
+
+      for (int i = 0; i < blockGroupBlocks; i++) {
+        curIdx = i;
+        refreshStreamer();
+        if (streamer.getBytesCurBlock()!= 0 ||
+            currentBlockGroupBytes < getBlockGroupSize()) {
+          // send an empty packet to mark the end of the block
+          currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
+              streamer.getAndIncCurrentSeqno(), true);
+          currentPacket.setSyncBlock(shouldSyncBlock);
+        }
+        // flush all data to Datanode
+        flushInternal();
+      }
+
+      // get last block before destroying the streamer
+      ExtendedBlock lastBlock = streamers.get(0).getBlock();
+      closeThreads(false);
+      TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
+      try {
+        completeFile(lastBlock);
+      } finally {
+        scope.close();
+      }
+      dfsClient.endFileLease(fileId);
+    } catch (ClosedChannelException e) {
+    } finally {
+      setClosed();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index d53e73f..1b99d60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -169,7 +169,7 @@ class DataStreamer extends Daemon {
   }
 
   private volatile boolean streamerClosed = false;
-  private ExtendedBlock block; // its length is number of bytes acked
+  protected ExtendedBlock block; // its length is number of bytes acked
   private Token<BlockTokenIdentifier> accessToken;
   private DataOutputStream blockStream;
   private DataInputStream blockReplyStream;
@@ -177,7 +177,7 @@ class DataStreamer extends Daemon {
   private volatile DatanodeInfo[] nodes = null; // list of targets for current block
   private volatile StorageType[] storageTypes = null;
   private volatile String[] storageIDs = null;
-  private String[] favoredNodes;
+  protected String[] favoredNodes;
   volatile boolean hasError = false;
   volatile int errorIndex = -1;
   // Restarting node index
@@ -204,12 +204,12 @@ class DataStreamer extends Daemon {
   private final AtomicReference<IOException> lastException = new AtomicReference<>();
   private Socket s;
 
-  private final DFSClient dfsClient;
-  private final String src;
+  protected final DFSClient dfsClient;
+  protected final String src;
   /** Only for DataTransferProtocol.writeBlock(..) */
   private final DataChecksum checksum4WriteBlock;
   private final Progressable progress;
-  private final HdfsFileStatus stat;
+  protected final HdfsFileStatus stat;
   // appending to existing partial block
   private volatile boolean appendChunk = false;
   // both dataQueue and ackQueue are protected by dataQueue lock
@@ -332,7 +332,7 @@ class DataStreamer extends Daemon {
     stage = BlockConstructionStage.DATA_STREAMING;
   }
 
-  private void endBlock() {
+  protected void endBlock() {
     if(DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("Closing old block " + block);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
new file mode 100644
index 0000000..710d92d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.List;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/****************************************************************************
+ * The StripedDataStreamer class is used by {@link DFSStripedOutputStream}.
+ * There are two kinds of StripedDataStreamer, leading streamer and ordinary
+ * stream. Leading streamer requests a block group from NameNode, unwraps
+ * it to located blocks and transfers each located block to its corresponding
+ * ordinary streamer via a blocking queue.
+ *
+ ****************************************************************************/
+public class StripedDataStreamer extends DataStreamer {
+  private final short index;
+  private final  List<BlockingQueue<LocatedBlock>> stripedBlocks;
+  private static short blockGroupSize = HdfsConstants.NUM_DATA_BLOCKS
+      + HdfsConstants.NUM_PARITY_BLOCKS;
+  private boolean hasCommittedBlock = false;
+
+  StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block,
+                      DFSClient dfsClient, String src,
+                      Progressable progress, DataChecksum checksum,
+                      AtomicReference<CachingStrategy> cachingStrategy,
+                      ByteArrayManager byteArrayManage, short index,
+                      List<BlockingQueue<LocatedBlock>> stripedBlocks) {
+    super(stat,block, dfsClient, src, progress, checksum, cachingStrategy,
+        byteArrayManage);
+    this.index = index;
+    this.stripedBlocks = stripedBlocks;
+  }
+
+  /**
+   * Construct a data streamer for appending to the last partial block
+   * @param lastBlock last block of the file to be appended
+   * @param stat status of the file to be appended
+   * @throws IOException if error occurs
+   */
+  StripedDataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
+                      DFSClient dfsClient, String src,
+                      Progressable progress, DataChecksum checksum,
+                      AtomicReference<CachingStrategy> cachingStrategy,
+                      ByteArrayManager byteArrayManage, short index,
+                      List<BlockingQueue<LocatedBlock>> stripedBlocks)
+      throws IOException {
+    super(lastBlock, stat, dfsClient, src, progress, checksum, cachingStrategy,
+        byteArrayManage);
+    this.index = index;
+    this.stripedBlocks = stripedBlocks;
+  }
+
+  public boolean isLeadingStreamer () {
+    return index == 0;
+  }
+
+  private boolean isParityStreamer() {
+    return index >= HdfsConstants.NUM_DATA_BLOCKS;
+  }
+
+  @Override
+  protected void endBlock() {
+    if (!isLeadingStreamer() && !isParityStreamer()) {
+      //before retrieving a new block, transfer the finished block to
+      //leading streamer
+      LocatedBlock finishedBlock = new LocatedBlock(
+          new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(),
+                       block.getNumBytes(),block.getGenerationStamp()), null);
+      try{
+        boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
+            TimeUnit.SECONDS);
+      }catch (InterruptedException ie) {
+      //TODO: Handle InterruptedException (HDFS-7786)
+      }
+    }
+    super.endBlock();
+  }
+
+  /**
+   * This function is called after the streamer is closed.
+   */
+  void countTailingBlockGroupBytes () throws IOException {
+    if (isLeadingStreamer()) {
+      //when committing a block group, leading streamer has to adjust
+      // {@link block} including the size of block group
+      for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
+        try {
+          LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
+              TimeUnit.SECONDS);
+          if (finishedLocatedBlock == null) {
+            throw new IOException("Fail to get finished LocatedBlock " +
+                "from streamer, i=" + i);
+          }
+          ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
+          long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
+          if (block != null) {
+            block.setNumBytes(block.getNumBytes() + bytes);
+          }
+        } catch (InterruptedException ie) {
+          DFSClient.LOG.info("InterruptedException received when " +
+              "putting a block to stripeBlocks, ie = " + ie);
+        }
+      }
+    } else if (!isParityStreamer()) {
+      if (block == null || block.getNumBytes() == 0) {
+        LocatedBlock finishedBlock = new LocatedBlock(null, null);
+        try {
+          boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
+              TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+          //TODO: Handle InterruptedException (HDFS-7786)
+          ie.printStackTrace();
+        }
+      }
+    }
+
+  }
+
+  @Override
+  protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
+      throws IOException {
+    LocatedBlock lb = null;
+    if (isLeadingStreamer()) {
+      if(hasCommittedBlock) {
+        //when committing a block group, leading streamer has to adjust
+        // {@link block} including the size of block group
+        for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
+          try {
+            LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
+                TimeUnit.SECONDS);
+            if (finishedLocatedBlock == null) {
+              throw new IOException("Fail to get finished LocatedBlock " +
+                  "from streamer, i=" + i);
+            }
+            ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
+            long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
+            if(block != null) {
+              block.setNumBytes(block.getNumBytes() + bytes);
+            }
+          } catch (InterruptedException ie) {
+            DFSClient.LOG.info("InterruptedException received when putting" +
+                " a block to stripeBlocks, ie = " + ie);
+          }
+        }
+      }
+
+      lb = super.locateFollowingBlock(excludedNodes);
+      hasCommittedBlock = true;
+      LocatedBlock[] blocks = unwrapBlockGroup(lb);
+      assert blocks.length == blockGroupSize :
+          "Fail to get block group from namenode: blockGroupSize: " +
+              blockGroupSize + ", blocks.length: " + blocks.length;
+      lb = blocks[0];
+      for (int i = 1; i < blocks.length; i++) {
+        try {
+          boolean offSuccess = stripedBlocks.get(i).offer(blocks[i],
+              90, TimeUnit.SECONDS);
+          if(!offSuccess){
+            String msg = "Fail to put block to stripeBlocks. i = " + i;
+            DFSClient.LOG.info(msg);
+            throw new IOException(msg);
+          } else {
+            DFSClient.LOG.info("Allocate a new block to a streamer. i = " + i
+                + ", block: " + blocks[i]);
+          }
+        } catch (InterruptedException ie) {
+          DFSClient.LOG.info("InterruptedException received when putting" +
+              " a block to stripeBlocks, ie = " + ie);
+        }
+      }
+    } else {
+      try {
+        //wait 90 seconds to get a block from the queue
+        lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS);
+      } catch (InterruptedException ie) {
+        DFSClient.LOG.info("InterruptedException received when retrieving " +
+            "a block from stripeBlocks, ie = " + ie);
+      }
+    }
+    return lb;
+  }
+
+  /**
+   * Generate other blocks in a block group according to the first one.
+   *
+   * @param firstBlockInGroup the first block in a block group
+   * @return  other blocks in this group
+   */
+  public static LocatedBlock[] unwrapBlockGroup(
+      final LocatedBlock firstBlockInGroup) {
+    ExtendedBlock eb = firstBlockInGroup.getBlock();
+    DatanodeInfo[] locs = firstBlockInGroup.getLocations();
+    String[] storageIDs = firstBlockInGroup.getStorageIDs();
+    StorageType[] storageTypes = firstBlockInGroup.getStorageTypes();
+    Token<BlockTokenIdentifier> blockToken = firstBlockInGroup.getBlockToken();
+    LocatedBlock[] blocksInGroup = new LocatedBlock[locs.length];
+    for (int i = 0; i < blocksInGroup.length; i++) {
+      //each block in a group has the same number of bytes and timestamp
+      ExtendedBlock extendedBlock = new ExtendedBlock(eb.getBlockPoolId(),
+          eb.getBlockId() + i, eb.getNumBytes(), eb.getGenerationStamp());
+      blocksInGroup[i] = new LocatedBlock(extendedBlock,
+          new DatanodeInfo[] {locs[i]}, new String[]{storageIDs[i]},
+          new StorageType[] {storageTypes[i]});
+      blocksInGroup[i].setBlockToken(blockToken);
+    }
+    return blocksInGroup;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0331f20a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
new file mode 100644
index 0000000..f5a37f3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -0,0 +1,311 @@
+package org.apache.hadoop.hdfs;
+
+import java.util.Arrays;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestDFSStripedOutputStream {
+  private int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+
+  private MiniDFSCluster cluster;
+  private Configuration conf = new Configuration();
+  private DistributedFileSystem fs;
+  int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  int blockSize = 8 * 1024 * 1024;
+  int cellsInBlock = blockSize / cellSize;
+  private int mod = 29;
+
+  @Before
+  public void setup() throws IOException {
+    int numDNs = dataBlocks + parityBlocks + 2;
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, cellsInBlock * cellSize);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient().createErasureCodingZone("/");
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void TestFileEmpty() throws IOException {
+    testOneFile("/EmptyFile", 0);
+  }
+
+  @Test
+  public void TestFileSmallerThanOneCell1() throws IOException {
+    testOneFile("/SmallerThanOneCell", 1);
+  }
+
+  @Test
+  public void TestFileSmallerThanOneCell2() throws IOException {
+    testOneFile("/SmallerThanOneCell", cellSize - 1);
+  }
+
+  @Test
+  public void TestFileEqualsWithOneCell() throws IOException {
+    testOneFile("/EqualsWithOneCell", cellSize);
+  }
+
+  @Test
+  public void TestFileSmallerThanOneStripe1() throws IOException {
+    testOneFile("/SmallerThanOneStripe", cellSize * dataBlocks - 1);
+  }
+
+  @Test
+  public void TestFileSmallerThanOneStripe2() throws IOException {
+    testOneFile("/SmallerThanOneStripe", cellSize + 123);
+  }
+
+  @Test
+  public void TestFileEqualsWithOneStripe() throws IOException {
+    testOneFile("/EqualsWithOneStripe", cellSize * dataBlocks);
+  }
+
+  @Test
+  public void TestFileMoreThanOneStripe1() throws IOException {
+    testOneFile("/MoreThanOneStripe1", cellSize * dataBlocks + 123);
+  }
+
+  @Test
+  public void TestFileMoreThanOneStripe2() throws IOException {
+    testOneFile("/MoreThanOneStripe2",
+        cellSize * dataBlocks * (cellsInBlock >= 2 ? cellsInBlock / 2 : 1)
+            + cellSize * dataBlocks + 123);
+  }
+
+  @Test
+  public void TestFileFullBlockGroup() throws IOException {
+    testOneFile("/FullBlockGroup", blockSize * dataBlocks);
+  }
+
+  //TODO: The following tests will pass after HDFS-8121 fixed
+//  @Test
+  public void TestFileMoreThanABlockGroup1() throws IOException {
+    testOneFile("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
+  }
+
+  //  @Test
+  public void TestFileMoreThanABlockGroup2() throws IOException {
+    testOneFile("/MoreThanABlockGroup2",
+        blockSize * dataBlocks * 3
+            + (cellsInBlock >= 2 ? cellsInBlock / 2 : 1) * cellSize * dataBlocks
+            + 123);
+  }
+
+  private int stripeDataSize() {
+    return cellSize * dataBlocks;
+  }
+
+  private byte[] generateBytes(int cnt) {
+    byte[] bytes = new byte[cnt];
+    for (int i = 0; i < cnt; i++) {
+      bytes[i] = getByte(i);
+    }
+    return bytes;
+  }
+
+  private byte getByte(long pos) {
+    return (byte) (pos % mod + 1);
+  }
+
+  private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
+      throws IOException {
+    Path TestPath = new Path(src);
+    byte[] bytes = generateBytes(writeBytes);
+    DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+
+    //check file length
+    FileStatus status = fs.getFileStatus(TestPath);
+    long fileLength = status.getLen();
+    if (fileLength != writeBytes) {
+      Assert.fail("File Length error: expect=" + writeBytes
+          + ", actual=" + fileLength);
+    }
+
+    DFSStripedInputStream dis = new DFSStripedInputStream(
+        fs.getClient(), src, true);
+    byte[] buf = new byte[writeBytes + 100];
+    int readLen = dis.read(0, buf, 0, buf.length);
+    readLen = readLen >= 0 ? readLen : 0;
+    if (readLen != writeBytes) {
+      Assert.fail("The length of file is not correct.");
+    }
+
+    for (int i = 0; i < writeBytes; i++) {
+      if (getByte(i) != buf[i]) {
+        Assert.fail("Byte at i = " + i + " is wrongly written.");
+      }
+    }
+  }
+
+  private void testOneFile(String src, int writeBytes)
+      throws IOException {
+    Path TestPath = new Path(src);
+
+    int allBlocks = dataBlocks + parityBlocks;
+    byte[] bytes = generateBytes(writeBytes);
+    DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+
+    //check file length
+    FileStatus status = fs.getFileStatus(TestPath);
+    long fileLength = status.getLen();
+    if (fileLength != writeBytes) {
+      Assert.fail("File Length error: expect=" + writeBytes
+          + ", actual=" + fileLength);
+    }
+
+    List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
+    LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
+
+    for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
+      LocatedBlock[] blocks = StripedDataStreamer.unwrapBlockGroup(firstBlock);
+      List<LocatedBlock> oneGroup = Arrays.asList(blocks);
+      blockGroupList.add(oneGroup);
+    }
+
+    //test each block group
+    for (int group = 0; group < blockGroupList.size(); group++) {
+      //get the data of this block
+      List<LocatedBlock> blockList = blockGroupList.get(group);
+      byte[][] dataBlockBytes = new byte[dataBlocks][];
+      byte[][] parityBlockBytes = new byte[allBlocks - dataBlocks][];
+
+      //calculate the size of this block group
+      int lenOfBlockGroup = group < blockGroupList.size() - 1 ?
+          blockSize * dataBlocks :
+          writeBytes - blockSize * (blockGroupList.size() - 1) * dataBlocks;
+      int intactStripes = lenOfBlockGroup / stripeDataSize();
+      int lastStripeLen = lenOfBlockGroup % stripeDataSize();
+
+      //for each block, use BlockReader to read data
+      for (int i = 0; i < blockList.size(); i++) {
+        LocatedBlock lblock = blockList.get(i);
+        if (lblock == null) {
+          continue;
+        }
+        DatanodeInfo[] nodes = lblock.getLocations();
+        ExtendedBlock block = lblock.getBlock();
+        InetSocketAddress targetAddr = NetUtils.createSocketAddr(
+            nodes[0].getXferAddr());
+
+        int lenOfCell = cellSize;
+        if (i == lastStripeLen / cellSize) {
+          lenOfCell = lastStripeLen % cellSize;
+        } else if (i > lastStripeLen / cellSize) {
+          lenOfCell = 0;
+        }
+        int lenOfBlock = cellSize * intactStripes + lenOfCell;
+        byte[] blockBytes = new byte[lenOfBlock];
+        if (i < dataBlocks) {
+          dataBlockBytes[i] = blockBytes;
+        } else {
+          parityBlockBytes[i - dataBlocks] = blockBytes;
+        }
+
+        if (lenOfBlock == 0) {
+          continue;
+        }
+
+        block.setNumBytes(lenOfBlock);
+        BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
+            setFileName(src).
+            setBlock(block).
+            setBlockToken(lblock.getBlockToken()).
+            setInetSocketAddress(targetAddr).
+            setStartOffset(0).
+            setLength(block.getNumBytes()).
+            setVerifyChecksum(true).
+            setClientName("TestStripeLayoutWrite").
+            setDatanodeInfo(nodes[0]).
+            setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+            setClientCacheContext(ClientContext.getFromConf(conf)).
+            setConfiguration(conf).
+            setRemotePeerFactory(new RemotePeerFactory() {
+              @Override
+              public Peer newConnectedPeer(InetSocketAddress addr,
+                                           Token<BlockTokenIdentifier> blockToken,
+                                           DatanodeID datanodeId)
+                  throws IOException {
+                Peer peer = null;
+                Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+                try {
+                  sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+                  sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+                  peer = TcpPeerServer.peerFromSocket(sock);
+                } finally {
+                  if (peer == null) {
+                    IOUtils.closeSocket(sock);
+                  }
+                }
+                return peer;
+              }
+            }).build();
+
+        blockReader.readAll(blockBytes, 0, lenOfBlock);
+        blockReader.close();
+      }
+
+      //check if we write the data correctly
+      for (int i = 0; i < dataBlockBytes.length; i++) {
+        byte[] cells = dataBlockBytes[i];
+        if (cells == null) {
+          continue;
+        }
+        for (int j = 0; j < cells.length; j++) {
+          byte expected;
+          //calculate the postion of this byte in the file
+          long pos = group * dataBlocks * blockSize
+              + (i * cellSize + j / cellSize * cellSize * dataBlocks)
+              + j % cellSize;
+          if (pos >= writeBytes) {
+            expected = 0;
+          } else {
+            expected = getByte(pos);
+          }
+
+          if (expected != cells[j]) {
+            Assert.fail("Unexpected byte " + cells[j] + ", expect " + expected
+                + ". Block group index is " + group +
+                ", stripe index is " + j / cellSize +
+                ", cell index is " + i + ", byte index is " + j % cellSize);
+          }
+        }
+      }
+    }
+  }
+
+}


[02/12] hadoop git commit: HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from NameNode (Contributed by Vinayakumar B) Added missed file

Posted by zh...@apache.org.
HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from NameNode (Contributed by Vinayakumar B)
Added missed file


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/75b32770
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/75b32770
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/75b32770

Branch: refs/heads/HDFS-7285
Commit: 75b32770fbc25f1c41dc7212966511ad5d7a481a
Parents: c11a02c
Author: Vinayakumar B <vi...@apache.org>
Authored: Wed Apr 8 14:23:03 2015 +0530
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:18 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/protocol/ECInfo.java | 41 ++++++++++++++++++++
 1 file changed, 41 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b32770/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ECInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ECInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ECInfo.java
new file mode 100644
index 0000000..ca642c2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ECInfo.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.io.erasurecode.ECSchema;
+
+/**
+ * Class to provide information, such as ECSchema, for a file/block.
+ */
+public class ECInfo {
+  private final String src;
+  private final ECSchema schema;
+
+  public ECInfo(String src, ECSchema schema) {
+    this.src = src;
+    this.schema = schema;
+  }
+
+  public String getSrc() {
+    return src;
+  }
+
+  public ECSchema getSchema() {
+    return schema;
+  }
+}


[07/12] hadoop git commit: HDFS-8074 Define a system-wide default EC schema. Contributed by Kai Zheng

Posted by zh...@apache.org.
HDFS-8074 Define a system-wide default EC schema. Contributed by Kai Zheng


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cc11b5c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cc11b5c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cc11b5c2

Branch: refs/heads/HDFS-7285
Commit: cc11b5c2230ef10023066ed6c006fcd46af328bb
Parents: 75b3277
Author: Kai Zheng <ka...@intel.com>
Authored: Thu Apr 9 01:30:02 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:19 2015 -0700

----------------------------------------------------------------------
 .../src/main/conf/ecschema-def.xml              |  5 --
 .../apache/hadoop/io/erasurecode/ECSchema.java  | 57 +++++++++++++++++-
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |  4 +-
 .../hdfs/server/namenode/ECSchemaManager.java   | 62 ++++++++++++++++++++
 4 files changed, 120 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc11b5c2/hadoop-common-project/hadoop-common/src/main/conf/ecschema-def.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/conf/ecschema-def.xml b/hadoop-common-project/hadoop-common/src/main/conf/ecschema-def.xml
index e619485..e36d386 100644
--- a/hadoop-common-project/hadoop-common/src/main/conf/ecschema-def.xml
+++ b/hadoop-common-project/hadoop-common/src/main/conf/ecschema-def.xml
@@ -27,11 +27,6 @@ You can modify and remove those not used yet, or add new ones.
 -->
 
 <schemas>
-  <schema name="RS-6-3">
-    <k>6</k>
-    <m>3</m>
-    <codec>RS</codec>
-  </schema>
   <schema name="RS-10-4">
     <k>10</k>
     <m>4</m>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc11b5c2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
index 27be00e..8c3310e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java
@@ -23,12 +23,12 @@ import java.util.Map;
 /**
  * Erasure coding schema to housekeeper relevant information.
  */
-public class ECSchema {
+public final class ECSchema {
   public static final String NUM_DATA_UNITS_KEY = "k";
   public static final String NUM_PARITY_UNITS_KEY = "m";
   public static final String CODEC_NAME_KEY = "codec";
   public static final String CHUNK_SIZE_KEY = "chunkSize";
-  public static final int DEFAULT_CHUNK_SIZE = 64 * 1024; // 64K
+  public static final int DEFAULT_CHUNK_SIZE = 256 * 1024; // 256K
 
   private String schemaName;
   private String codecName;
@@ -82,6 +82,18 @@ public class ECSchema {
   }
 
   /**
+   * Constructor with key parameters provided.
+   * @param schemaName
+   * @param codecName
+   * @param numDataUnits
+   * @param numParityUnits
+   */
+  public ECSchema(String schemaName, String codecName,
+                  int numDataUnits, int numParityUnits) {
+    this(schemaName, codecName, numDataUnits, numParityUnits, null);
+  }
+
+  /**
    * Constructor with key parameters provided. Note the options may contain
    * additional information for the erasure codec to interpret further.
    * @param schemaName
@@ -200,4 +212,45 @@ public class ECSchema {
 
     return sb.toString();
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    ECSchema ecSchema = (ECSchema) o;
+
+    if (numDataUnits != ecSchema.numDataUnits) {
+      return false;
+    }
+    if (numParityUnits != ecSchema.numParityUnits) {
+      return false;
+    }
+    if (chunkSize != ecSchema.chunkSize) {
+      return false;
+    }
+    if (!schemaName.equals(ecSchema.schemaName)) {
+      return false;
+    }
+    if (!codecName.equals(ecSchema.codecName)) {
+      return false;
+    }
+    return options.equals(ecSchema.options);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = schemaName.hashCode();
+    result = 31 * result + codecName.hashCode();
+    result = 31 * result + options.hashCode();
+    result = 31 * result + numDataUnits;
+    result = 31 * result + numParityUnits;
+    result = 31 * result + chunkSize;
+
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc11b5c2/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 7423033..5078a15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -52,4 +52,6 @@
     manage EC zones (Zhe Zhang)
 
     HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from
-    NameNode (vinayakumarb)
\ No newline at end of file
+    NameNode (vinayakumarb)
+    
+    HDFS-8074. Define a system-wide default EC schema. (Kai Zheng)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc11b5c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ECSchemaManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ECSchemaManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ECSchemaManager.java
new file mode 100644
index 0000000..b001c57
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ECSchemaManager.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+
+/**
+ * This manages EC schemas predefined and activated in the system. It loads from
+ * predefined ones in XML and syncs with persisted ones in NameNode image.
+ *
+ * This class is instantiated by the FSNamesystem.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+public final class ECSchemaManager {
+
+  private static final int DEFAULT_DATA_BLOCKS = 6;
+  private static final int DEFAULT_PARITY_BLOCKS = 3;
+  private static final String DEFAULT_CODEC_NAME = "rs";
+  private static final String DEFAULT_SCHEMA_NAME = "SYS-DEFAULT-RS-6-3";
+
+  private static ECSchema SYS_DEFAULT_SCHEMA = new ECSchema(DEFAULT_SCHEMA_NAME,
+      DEFAULT_CODEC_NAME, DEFAULT_DATA_BLOCKS, DEFAULT_PARITY_BLOCKS);
+
+  /**
+   * Get system-wide default EC schema, which can be used by default when no
+   * schema is specified for an EC zone.
+   * @return schema
+   */
+  public static ECSchema getSystemDefaultSchema() {
+    return SYS_DEFAULT_SCHEMA;
+  }
+
+  /**
+   * Tell the specified schema is the system default one or not.
+   * @param schema
+   * @return true if it's the default false otherwise
+   */
+  public static boolean isSystemDefault(ECSchema schema) {
+    if (schema == null) {
+      throw new IllegalArgumentException("Invalid schema parameter");
+    }
+
+    // schema name is the identifier, but for safety we check all properties.
+    return SYS_DEFAULT_SCHEMA.equals(schema);
+  }
+}


[03/12] hadoop git commit: HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from NameNode (Contributed by Vinayakumar B)

Posted by zh...@apache.org.
HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from NameNode (Contributed by Vinayakumar B)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c11a02c5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c11a02c5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c11a02c5

Branch: refs/heads/HDFS-7285
Commit: c11a02c5f96bfa9db36abc63b19716e3da7d0735
Parents: ae8f0c1
Author: Vinayakumar B <vi...@apache.org>
Authored: Wed Apr 8 12:48:59 2015 +0530
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:18 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |  5 ++-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 14 ++++++
 .../hadoop/hdfs/protocol/ClientProtocol.java    | 10 +++++
 ...tNamenodeProtocolServerSideTranslatorPB.java | 19 ++++++++
 .../ClientNamenodeProtocolTranslatorPB.java     | 18 ++++++++
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 47 +++++++++++++++++++-
 .../hdfs/server/namenode/FSNamesystem.java      | 31 +++++++++++++
 .../hdfs/server/namenode/NameNodeRpcServer.java |  7 +++
 .../src/main/proto/ClientNamenodeProtocol.proto | 10 +++++
 .../hadoop-hdfs/src/main/proto/hdfs.proto       | 28 ++++++++++++
 .../hadoop/hdfs/TestErasureCodingZones.java     | 38 +++++++++++++++-
 11 files changed, 223 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 9927ccf..7423033 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -49,4 +49,7 @@
     (Hui Zheng via Zhe Zhang)
 
     HDFS-7839. Erasure coding: implement facilities in NameNode to create and
-    manage EC zones (Zhe Zhang)
\ No newline at end of file
+    manage EC zones (Zhe Zhang)
+
+    HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from
+    NameNode (vinayakumarb)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 2ef1d36..b2a69dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -117,6 +117,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -3091,6 +3092,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  public ECInfo getErasureCodingInfo(String src) throws IOException {
+    checkOpen();
+    TraceScope scope = getPathTraceScope("getErasureCodingInfo", src);
+    try {
+      return namenode.getErasureCodingInfo(src);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+          FileNotFoundException.class, UnresolvedPathException.class);
+    } finally {
+      scope.close();
+    }
+  }
+
   public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
     return new DFSInotifyEventInputStream(traceSampler, namenode);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 8efe344..45d92f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -1464,4 +1464,14 @@ public interface ClientProtocol {
    */
   @Idempotent
   public EventBatchList getEditsFromTxid(long txid) throws IOException;
+
+  /**
+   * Gets the ECInfo for the specified file/directory
+   * 
+   * @param src
+   * @return Returns the ECInfo if the file/directory is erasure coded, null otherwise
+   * @throws IOException
+   */
+  @Idempotent
+  public ECInfo getErasureCodingInfo(String src) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index a209539..1493e52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@@ -107,6 +108,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDat
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
@@ -1511,4 +1514,20 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public GetErasureCodingInfoResponseProto getErasureCodingInfo(RpcController controller,
+      GetErasureCodingInfoRequestProto request) throws ServiceException {
+    try {
+      ECInfo ecInfo = server.getErasureCodingInfo(request.getSrc());
+      GetErasureCodingInfoResponseProto.Builder resBuilder = GetErasureCodingInfoResponseProto
+          .newBuilder();
+      if (ecInfo != null) {
+        resBuilder.setECInfo(PBHelper.convertECInfo(ecInfo));
+      }
+      return resBuilder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 43a0322..568da68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@@ -107,6 +108,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDat
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetErasureCodingInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
@@ -1532,4 +1535,19 @@ public class ClientNamenodeProtocolTranslatorPB implements
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+
+  @Override
+  public ECInfo getErasureCodingInfo(String src) throws IOException {
+    GetErasureCodingInfoRequestProto req = GetErasureCodingInfoRequestProto.newBuilder()
+        .setSrc(src).build();
+    try {
+      GetErasureCodingInfoResponseProto res = rpcProxy.getErasureCodingInfo(null, req);
+      if (res.hasECInfo()) {
+        return PBHelper.convertECInfo(res.getECInfo());
+      }
+      return null;
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index cda708f..afdb302 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -29,7 +29,11 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
@@ -76,6 +80,7 @@ import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -112,7 +117,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rollin
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
@@ -146,6 +150,9 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeLocalInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeStorageProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeStorageProto.StorageState;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FsPermissionProto;
@@ -228,6 +235,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -3097,4 +3105,41 @@ public class PBHelper {
         setId(context.getReportId()).
         build();
   }
+
+  public static ECInfo convertECInfo(ECInfoProto ecInfoProto) {
+    return new ECInfo(ecInfoProto.getSrc(),
+        convertECSchema(ecInfoProto.getSchema()));
+  }
+
+  public static ECInfoProto convertECInfo(ECInfo ecInfo) {
+    return ECInfoProto.newBuilder().setSrc(ecInfo.getSrc())
+        .setSchema(convertECSchema(ecInfo.getSchema())).build();
+  }
+
+  public static ECSchema convertECSchema(ECSchemaProto schema) {
+    List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList();
+    Map<String, String> options = new HashMap<>(optionsList.size());
+    for (ECSchemaOptionEntryProto option : optionsList) {
+      options.put(option.getKey(), option.getValue());
+    }
+    // include chunksize in options.
+    options.put(ECSchema.CHUNK_SIZE_KEY, String.valueOf(schema.getChunkSize()));
+    return new ECSchema(schema.getSchemaName(), schema.getCodecName(),
+        schema.getDataUnits(), schema.getParityUnits(), options);
+  }
+
+  public static ECSchemaProto convertECSchema(ECSchema schema) {
+    ECSchemaProto.Builder builder = ECSchemaProto.newBuilder()
+        .setSchemaName(schema.getSchemaName())
+        .setCodecName(schema.getCodecName())
+        .setDataUnits(schema.getNumDataUnits())
+        .setParityUnits(schema.getNumParityUnits())
+        .setChunkSize(schema.getChunkSize());
+    Set<Entry<String, String>> entrySet = schema.getOptions().entrySet();
+    for (Entry<String, String> entry : entrySet) {
+      builder.addOptions(ECSchemaOptionEntryProto.newBuilder()
+          .setKey(entry.getKey()).setValue(entry.getValue()).build());
+    }
+    return builder.build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 79d6f21..4c86bd3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -180,6 +180,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -261,6 +262,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.RetryCache;
 import org.apache.hadoop.ipc.Server;
@@ -8138,6 +8140,35 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     logAuditEvent(true, "createErasureCodingZone", srcArg, null, resultingStat);
   }
 
+  /**
+   * Get the erasure coding information for specified src
+   */
+  ECInfo getErasureCodingInfo(String src) throws AccessControlException,
+      UnresolvedLinkException, IOException {
+    checkOperation(OperationCategory.READ);
+    final byte[][] pathComponents = FSDirectory
+        .getPathComponentsForReservedPath(src);
+    final FSPermissionChecker pc = getPermissionChecker();
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      src = dir.resolvePath(pc, src, pathComponents);
+      final INodesInPath iip = dir.getINodesInPath(src, true);
+      if (isPermissionEnabled) {
+        dir.checkPathAccess(pc, iip, FsAction.READ);
+      }
+      if (dir.getECPolicy(iip)) {
+        // TODO HDFS-8074 and HDFS-7859 : To get from loaded schemas
+        Map<String, String> options = new HashMap<String, String>();
+        ECSchema defaultSchema = new ECSchema("RS-6-3", "rs", 6, 3, options);
+        return new ECInfo(src, defaultSchema);
+      }
+    } finally {
+      readUnlock();
+    }
+    return null;
+  }
+
   void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
                 boolean logRetryCache)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index ce24662..fe9b901 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSLimitException;
@@ -2042,4 +2043,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
     namesystem.checkSuperuserPrivilege();
     nn.spanReceiverHost.removeSpanReceiver(id);
   }
+
+  @Override // ClientNameNodeProtocol
+  public ECInfo getErasureCodingInfo(String src) throws IOException {
+    checkNNStartup();
+    return namesystem.getErasureCodingInfo(src);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index 183aff8..9488aed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -721,6 +721,14 @@ message CreateErasureCodingZoneRequestProto {
 message CreateErasureCodingZoneResponseProto {
 }
 
+message GetErasureCodingInfoRequestProto {
+  required string src = 1;
+}
+
+message GetErasureCodingInfoResponseProto {
+  optional ECInfoProto ECInfo = 1;
+}
+
 service ClientNamenodeProtocol {
   rpc getBlockLocations(GetBlockLocationsRequestProto)
       returns(GetBlockLocationsResponseProto);
@@ -869,4 +877,6 @@ service ClientNamenodeProtocol {
       returns(GetCurrentEditLogTxidResponseProto);
   rpc getEditsFromTxid(GetEditsFromTxidRequestProto)
       returns(GetEditsFromTxidResponseProto);
+  rpc getErasureCodingInfo(GetErasureCodingInfoRequestProto)
+      returns(GetErasureCodingInfoResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index 67e2058..1314ea0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -620,3 +620,31 @@ message RollingUpgradeStatusProto {
   required string blockPoolId = 1;
   optional bool finalized = 2 [default = false];
 }
+
+/**
+ * ECSchema options entry
+ */
+message ECSchemaOptionEntryProto {
+  required string key = 1;
+  required string value = 2;
+}
+
+/**
+ * ECShema for erasurecoding
+ */
+message ECSchemaProto {
+  required string schemaName = 1;
+  required string codecName = 2;
+  required uint32 dataUnits = 3;
+  required uint32 parityUnits = 4;
+  required uint32 chunkSize = 5;
+  repeated ECSchemaOptionEntryProto options = 6;
+}
+
+/**
+ * ECInfo
+ */
+message ECInfoProto {
+ required string src = 1;
+ required ECSchemaProto schema = 2;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c11a02c5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
index 49f08eef..bdca915 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -29,8 +31,7 @@ import org.junit.Test;
 import java.io.IOException;
 
 import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 public class TestErasureCodingZones {
   private final int NUM_OF_DATANODES = 3;
@@ -148,4 +149,37 @@ public class TestErasureCodingZones {
           "destination have different erasure coding policies", e);
     }
   }
+
+  @Test
+  public void testGetErasureCodingInfo() throws Exception {
+    String src = "/ec";
+    final Path ecDir = new Path(src);
+    fs.mkdir(ecDir, FsPermission.getDirDefault());
+    // dir ECInfo before creating ec zone
+    assertNull(fs.getClient().getErasureCodingInfo(src));
+    // dir ECInfo after creating ec zone
+    fs.getClient().createErasureCodingZone(src);
+    verifyErasureCodingInfo(src);
+    fs.create(new Path(ecDir, "/child1")).close();
+    // verify for the files in ec zone
+    verifyErasureCodingInfo(src + "/child1");
+  }
+
+  private void verifyErasureCodingInfo(String src) throws IOException {
+    ECInfo ecInfo = fs.getClient().getErasureCodingInfo(src);
+    assertNotNull("ECInfo should have been non-null", ecInfo);
+    assertEquals(src, ecInfo.getSrc());
+    ECSchema schema = ecInfo.getSchema();
+    assertNotNull(schema);
+    assertEquals("Default schema should be returned", "RS-6-3",
+        schema.getSchemaName());
+    assertEquals("Default codec(rs) should be returned", "rs",
+        schema.getCodecName());
+    assertEquals("Default numDataUnits should be used", 6,
+        schema.getNumDataUnits());
+    assertEquals("Default numParityUnits should be used", 3,
+        schema.getNumParityUnits());
+    assertEquals("Default chunkSize should be used",
+        ECSchema.DEFAULT_CHUNK_SIZE, schema.getChunkSize());
+  }
 }
\ No newline at end of file


[09/12] hadoop git commit: HDFS-7936. Erasure coding: resolving conflicts in the branch when merging trunk changes (this commit mainly addresses HDFS-8081 and HDFS-8048. Contributed by Zhe Zhang.

Posted by zh...@apache.org.
HDFS-7936. Erasure coding: resolving conflicts in the branch when merging trunk changes (this commit mainly addresses HDFS-8081 and HDFS-8048. Contributed by Zhe Zhang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f8992da8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f8992da8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f8992da8

Branch: refs/heads/HDFS-7285
Commit: f8992da86d8f15241e426fe7a5175e17b08680cf
Parents: 0656213
Author: Zhe Zhang <zh...@apache.org>
Authored: Mon Apr 13 10:56:24 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Apr 13 14:08:20 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSInputStream.java |  4 ++--
 .../apache/hadoop/hdfs/DFSStripedInputStream.java   | 16 +++++++++-------
 .../apache/hadoop/hdfs/DFSStripedOutputStream.java  |  3 ++-
 .../hadoop/hdfs/server/namenode/FSNamesystem.java   |  5 +++--
 .../hadoop/hdfs/TestDFSStripedOutputStream.java     |  3 ++-
 5 files changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8992da8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index e94c41a..6006693 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -1099,7 +1099,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
     final int length = (int) (end - start + 1);
-    actualGetFromOneDataNode(datanode, block, start, end, buf,
+    actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
         new int[]{offset}, new int[]{length}, corruptedBlockMap);
   }
 
@@ -1118,7 +1118,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    *                          block replica
    */
   void actualGetFromOneDataNode(final DNAddrPair datanode,
-      LocatedBlock block, final long startInBlk, final long endInBlk,
+      long blockStartOffset, final long startInBlk, final long endInBlk,
       byte[] buf, int[] offsets, int[] lengths,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8992da8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 077b0f8..8a431b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -224,7 +224,7 @@ public class DFSStripedInputStream extends DFSInputStream {
    * Real implementation of pread.
    */
   @Override
-  protected void fetchBlockByteRange(LocatedBlock block, long start,
+  protected void fetchBlockByteRange(long blockStartOffset, long start,
       long end, byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
@@ -234,7 +234,7 @@ public class DFSStripedInputStream extends DFSInputStream {
     int len = (int) (end - start + 1);
 
     // Refresh the striped block group
-    block = getBlockGroupAt(block.getStartOffset());
+    LocatedBlock block = getBlockGroupAt(blockStartOffset);
     assert block instanceof LocatedStripedBlock : "NameNode" +
         " should return a LocatedStripedBlock for a striped file";
     LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
@@ -254,9 +254,11 @@ public class DFSStripedInputStream extends DFSInputStream {
       DatanodeInfo loc = blks[i].getLocations()[0];
       StorageType type = blks[i].getStorageTypes()[0];
       DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
-          loc.getXferAddr(dfsClient.getConf().connectToDnViaHostname)), type);
-      Callable<Void> readCallable = getFromOneDataNode(dnAddr, blks[i],
-          rp.startOffsetInBlock, rp.startOffsetInBlock + rp.readLength - 1, buf,
+          loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
+          type);
+      Callable<Void> readCallable = getFromOneDataNode(dnAddr,
+          blks[i].getStartOffset(), rp.startOffsetInBlock,
+          rp.startOffsetInBlock + rp.readLength - 1, buf,
           rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
       Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
       DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
@@ -272,7 +274,7 @@ public class DFSStripedInputStream extends DFSInputStream {
   }
 
   private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
-      final LocatedBlock block, final long start, final long end,
+      final long blockStartOffset, final long start, final long end,
       final byte[] buf, final int[] offsets, final int[] lengths,
       final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
       final int hedgedReadId) {
@@ -283,7 +285,7 @@ public class DFSStripedInputStream extends DFSInputStream {
         TraceScope scope =
             Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan);
         try {
-          actualGetFromOneDataNode(datanode, block, start,
+          actualGetFromOneDataNode(datanode, blockStartOffset, start,
               end, buf, offsets, lengths, corruptedBlockMap);
         } finally {
           scope.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8992da8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index aded4fe..1d0e1be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -284,7 +284,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     }
     for (StripedDataStreamer streamer : streamers) {
       streamer.setLastException(new IOException("Lease timeout of "
-          + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
+          + (dfsClient.getConf().getHdfsTimeout()/1000) +
+          " seconds expired."));
     }
     closeThreads(true);
     dfsClient.endFileLease(fileId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8992da8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index a504907..6d89b0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3067,7 +3067,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final long blockSize;
     final short numTargets;
     final byte storagePolicyID;
-    final boolean isStriped;
     Node clientNode = null;
     String clientMachine = null;
 
@@ -3109,7 +3108,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
           clientMachine);
       // TODO: make block group size configurable (HDFS-7337)
-      isStriped = pendingFile.isStriped();
+      boolean isStriped = pendingFile.isStriped();
       numTargets = isStriped ?
           HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS :
           pendingFile.getFileReplication();
@@ -3138,6 +3137,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       ExtendedBlock previous, DatanodeStorageInfo[] targets) throws IOException {
     Block newBlock = null;
     long offset;
+    boolean isStriped;
     checkOperation(OperationCategory.WRITE);
     waitForLoadingFSImage();
     writeLock();
@@ -3168,6 +3168,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       commitOrCompleteLastBlock(pendingFile, fileState.iip,
                                 ExtendedBlock.getLocalBlock(previous));
 
+      isStriped = pendingFile.isStriped();
       // allocate new block, record block locations in INode.
       newBlock = createNewBlock(isStriped);
       saveAllocatedBlock(src, fileState.iip, newBlock, targets,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8992da8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index f5a37f3..ee6998b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -5,6 +5,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -241,7 +242,7 @@ public class TestDFSStripedOutputStream {
         }
 
         block.setNumBytes(lenOfBlock);
-        BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
+        BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)).
             setFileName(src).
             setBlock(block).
             setBlockToken(lblock.getBlockToken()).