You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2017/05/11 21:42:46 UTC

[50/50] [abbrv] hadoop git commit: HDFS-11190. [READ] Namenode support for data stored in external stores.

HDFS-11190. [READ] Namenode support for data stored in external stores.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/77dc3a0f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/77dc3a0f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/77dc3a0f

Branch: refs/heads/HDFS-9806
Commit: 77dc3a0f63bec431013a3c5f271ae6bc25e644af
Parents: b3bb531
Author: Virajith Jalaparti <vi...@apache.org>
Authored: Fri Apr 21 11:12:36 2017 -0700
Committer: Virajith Jalaparti <vi...@apache.org>
Committed: Thu May 11 14:41:16 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/protocol/LocatedBlock.java      |  96 ++++-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   5 +
 .../blockmanagement/BlockFormatProvider.java    |  91 ++++
 .../server/blockmanagement/BlockManager.java    |  95 +++--
 .../server/blockmanagement/BlockProvider.java   |  65 +++
 .../BlockStoragePolicySuite.java                |   6 +
 .../blockmanagement/DatanodeDescriptor.java     |  34 +-
 .../server/blockmanagement/DatanodeManager.java |   2 +
 .../blockmanagement/DatanodeStorageInfo.java    |   4 +
 .../blockmanagement/LocatedBlockBuilder.java    | 109 +++++
 .../blockmanagement/ProvidedStorageMap.java     | 427 +++++++++++++++++++
 .../src/main/resources/hdfs-default.xml         |  30 +-
 .../hadoop/hdfs/TestBlockStoragePolicy.java     |   4 +
 .../blockmanagement/TestDatanodeManager.java    |  66 ++-
 .../TestNameNodeProvidedImplementation.java     | 345 +++++++++++++++
 15 files changed, 1293 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index 85bec92..5ad0bca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.protocol;
 
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
@@ -62,40 +63,50 @@ public class LocatedBlock {
 
   public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
     // By default, startOffset is unknown(-1) and corrupt is false.
-    this(b, locs, null, null, -1, false, EMPTY_LOCS);
+    this(b, convert(locs, null, null), null, null, -1, false, EMPTY_LOCS);
   }
 
   public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs,
       String[] storageIDs, StorageType[] storageTypes) {
-    this(b, locs, storageIDs, storageTypes, -1, false, EMPTY_LOCS);
+    this(b, convert(locs, storageIDs, storageTypes),
+         storageIDs, storageTypes, -1, false, EMPTY_LOCS);
   }
 
-  public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs,
-      StorageType[] storageTypes, long startOffset,
+  public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs,
+      String[] storageIDs, StorageType[] storageTypes, long startOffset,
+      boolean corrupt, DatanodeInfo[] cachedLocs) {
+    this(b, convert(locs, storageIDs, storageTypes),
+        storageIDs, storageTypes, startOffset, corrupt,
+        null == cachedLocs || 0 == cachedLocs.length ? EMPTY_LOCS : cachedLocs);
+  }
+
+  public LocatedBlock(ExtendedBlock b, DatanodeInfoWithStorage[] locs,
+      String[] storageIDs, StorageType[] storageTypes, long startOffset,
       boolean corrupt, DatanodeInfo[] cachedLocs) {
     this.b = b;
     this.offset = startOffset;
     this.corrupt = corrupt;
-    if (locs==null) {
-      this.locs = EMPTY_LOCS;
-    } else {
-      this.locs = new DatanodeInfoWithStorage[locs.length];
-      for(int i = 0; i < locs.length; i++) {
-        DatanodeInfo di = locs[i];
-        DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di,
-            storageIDs != null ? storageIDs[i] : null,
-            storageTypes != null ? storageTypes[i] : null);
-        this.locs[i] = storage;
-      }
-    }
+    this.locs = null == locs ? EMPTY_LOCS : locs;
     this.storageIDs = storageIDs;
     this.storageTypes = storageTypes;
+    this.cachedLocs = null == cachedLocs || 0 == cachedLocs.length
+      ? EMPTY_LOCS
+      : cachedLocs;
+  }
+
+  private static DatanodeInfoWithStorage[] convert(
+      DatanodeInfo[] infos, String[] storageIDs, StorageType[] storageTypes) {
+    if (null == infos) {
+      return EMPTY_LOCS;
+    }
 
-    if (cachedLocs == null || cachedLocs.length == 0) {
-      this.cachedLocs = EMPTY_LOCS;
-    } else {
-      this.cachedLocs = cachedLocs;
+    DatanodeInfoWithStorage[] ret = new DatanodeInfoWithStorage[infos.length];
+    for(int i = 0; i < infos.length; i++) {
+      ret[i] = new DatanodeInfoWithStorage(infos[i],
+          storageIDs   != null ? storageIDs[i]   : null,
+          storageTypes != null ? storageTypes[i] : null);
     }
+    return ret;
   }
 
   public Token<BlockTokenIdentifier> getBlockToken() {
@@ -145,6 +156,51 @@ public class LocatedBlock {
     }
   }
 
+  /**
+   * Comparator that ensures that a PROVIDED storage type is greater than
+   * any other storage type. Any other storage types are considered equal.
+   */
+  private class ProvidedLastComparator
+    implements Comparator<DatanodeInfoWithStorage> {
+    @Override
+    public int compare(DatanodeInfoWithStorage dns1,
+        DatanodeInfoWithStorage dns2) {
+      if (StorageType.PROVIDED.equals(dns1.getStorageType())
+          && !StorageType.PROVIDED.equals(dns2.getStorageType())) {
+        return 1;
+      }
+      if (!StorageType.PROVIDED.equals(dns1.getStorageType())
+          && StorageType.PROVIDED.equals(dns2.getStorageType())) {
+        return -1;
+      }
+      // Storage types of dns1 and dns2 are now both provided or not provided;
+      // thus, are essentially equal for the purpose of this comparator.
+      return 0;
+    }
+  }
+
+  /**
+   * Moves all locations that have {@link StorageType}
+   * {@code PROVIDED} to the end of the locations array without
+   * changing the relative ordering of the remaining locations
+   * Only the first {@code activeLen} locations are considered.
+   * The caller must immediately invoke {@link
+   * org.apache.hadoop.hdfs.protocol.LocatedBlock#updateCachedStorageInfo}
+   * to update the cached Storage ID/Type arrays.
+   * @param activeLen
+   */
+  public void moveProvidedToEnd(int activeLen) {
+
+    if (activeLen <= 0) {
+      return;
+    }
+    // as this is a stable sort, for elements that are equal,
+    // the current order of the elements is maintained
+    Arrays.sort(locs, 0,
+        (activeLen < locs.length) ? activeLen : locs.length,
+        new ProvidedLastComparator());
+  }
+
   public long getStartOffset() {
     return offset;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 6406d35..e252b6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -314,6 +314,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.namenode.edits.asynclogging";
   public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = false;
 
+  public static final String DFS_NAMENODE_PROVIDED_ENABLED = "dfs.namenode.provided.enabled";
+  public static final boolean DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT = false;
+
+  public static final String DFS_NAMENODE_BLOCK_PROVIDER_CLASS = "dfs.namenode.block.provider.class";
+
   public static final String DFS_PROVIDER_CLASS = "dfs.provider.class";
   public static final String DFS_PROVIDER_DF_CLASS = "dfs.provided.df.class";
   public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java
new file mode 100644
index 0000000..930263d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.BlockAlias;
+import org.apache.hadoop.hdfs.server.common.BlockFormat;
+import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Loads provided blocks from a {@link BlockFormat}.
+ */
+public class BlockFormatProvider extends BlockProvider
+    implements Configurable {
+
+  private Configuration conf;
+  private BlockFormat<? extends BlockAlias> blockFormat;
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BlockFormatProvider.class);
+
+  @Override
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void setConf(Configuration conf) {
+    Class<? extends BlockFormat> c = conf.getClass(
+        DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
+        TextFileRegionFormat.class, BlockFormat.class);
+    blockFormat = ReflectionUtils.newInstance(c, conf);
+    LOG.info("Loaded BlockFormat class : " + c.getClass().getName());
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Iterator<Block> iterator() {
+    try {
+      final BlockFormat.Reader<? extends BlockAlias> reader =
+          blockFormat.getReader(null);
+
+      return new Iterator<Block>() {
+
+        private final Iterator<? extends BlockAlias> inner = reader.iterator();
+
+        @Override
+        public boolean hasNext() {
+          return inner.hasNext();
+        }
+
+        @Override
+        public Block next() {
+          return inner.next().getBlock();
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+      };
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to read provided blocks", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index a9592bf..c5b9c8b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -375,6 +375,9 @@ public class BlockManager implements BlockStatsMXBean {
    */
   private final short minReplicationToBeInMaintenance;
 
+  /** Storages accessible from multiple DNs. */
+  private final ProvidedStorageMap providedStorageMap;
+
   public BlockManager(final Namesystem namesystem, boolean haEnabled,
       final Configuration conf) throws IOException {
     this.namesystem = namesystem;
@@ -407,6 +410,8 @@ public class BlockManager implements BlockStatsMXBean {
 
     blockTokenSecretManager = createBlockTokenSecretManager(conf);
 
+    providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
+
     this.maxCorruptFilesReturned = conf.getInt(
       DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
       DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
@@ -1053,7 +1058,7 @@ public class BlockManager implements BlockStatsMXBean {
     final long fileLength = bc.computeContentSummary(
         getStoragePolicySuite()).getLength();
     final long pos = fileLength - lastBlock.getNumBytes();
-    return createLocatedBlock(lastBlock, pos,
+    return createLocatedBlock(null, lastBlock, pos,
         BlockTokenIdentifier.AccessMode.WRITE);
   }
 
@@ -1074,8 +1079,10 @@ public class BlockManager implements BlockStatsMXBean {
     return locations;
   }
 
-  private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
-      final long offset, final long length, final int nrBlocksToReturn,
+  private void createLocatedBlockList(
+      LocatedBlockBuilder locatedBlocks,
+      final BlockInfo[] blocks,
+      final long offset, final long length,
       final AccessMode mode) throws IOException {
     int curBlk;
     long curPos = 0, blkSize = 0;
@@ -1090,21 +1097,22 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
-      return Collections.emptyList();
+      return;
 
     long endOff = offset + length;
-    List<LocatedBlock> results = new ArrayList<>(blocks.length);
     do {
-      results.add(createLocatedBlock(blocks[curBlk], curPos, mode));
+      locatedBlocks.addBlock(
+          createLocatedBlock(locatedBlocks, blocks[curBlk], curPos, mode));
       curPos += blocks[curBlk].getNumBytes();
       curBlk++;
     } while (curPos < endOff 
           && curBlk < blocks.length
-          && results.size() < nrBlocksToReturn);
-    return results;
+          && !locatedBlocks.isBlockMax());
+    return;
   }
 
-  private LocatedBlock createLocatedBlock(final BlockInfo[] blocks,
+  private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks,
+      final BlockInfo[] blocks,
       final long endPos, final AccessMode mode) throws IOException {
     int curBlk;
     long curPos = 0;
@@ -1117,12 +1125,13 @@ public class BlockManager implements BlockStatsMXBean {
       curPos += blkSize;
     }
     
-    return createLocatedBlock(blocks[curBlk], curPos, mode);
+    return createLocatedBlock(locatedBlocks, blocks[curBlk], curPos, mode);
   }
 
-  private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos,
-    final AccessMode mode) throws IOException {
-    final LocatedBlock lb = createLocatedBlock(blk, pos);
+  private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks,
+      final BlockInfo blk, final long pos, final AccessMode mode)
+          throws IOException {
+    final LocatedBlock lb = createLocatedBlock(locatedBlocks, blk, pos);
     if (mode != null) {
       setBlockToken(lb, mode);
     }
@@ -1130,21 +1139,24 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /** @return a LocatedBlock for the given block */
-  private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos)
-      throws IOException {
+  private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks,
+      final BlockInfo blk, final long pos) throws IOException {
     if (!blk.isComplete()) {
       final BlockUnderConstructionFeature uc = blk.getUnderConstructionFeature();
       if (blk.isStriped()) {
         final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
         final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(),
             blk);
+        //TODO use locatedBlocks builder??
         return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
             false);
       } else {
         final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
         final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(),
             blk);
-        return newLocatedBlock(eb, storages, pos, false);
+        return null == locatedBlocks
+            ? newLocatedBlock(eb, storages, pos, false)
+                : locatedBlocks.newLocatedBlock(eb, storages, pos, false);
       }
     }
 
@@ -1208,9 +1220,10 @@ public class BlockManager implements BlockStatsMXBean {
       " numCorrupt: " + numCorruptNodes +
       " numCorruptRepls: " + numCorruptReplicas;
     final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(), blk);
-    return blockIndices == null ?
-        newLocatedBlock(eb, machines, pos, isCorrupt) :
-        newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
+    return blockIndices == null
+        ? null == locatedBlocks ? newLocatedBlock(eb, machines, pos, isCorrupt)
+            : locatedBlocks.newLocatedBlock(eb, machines, pos, isCorrupt)
+        : newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
   }
 
   /** Create a LocatedBlocks. */
@@ -1232,27 +1245,31 @@ public class BlockManager implements BlockStatsMXBean {
         LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
       }
       final AccessMode mode = needBlockToken? BlockTokenIdentifier.AccessMode.READ: null;
-      final List<LocatedBlock> locatedblocks = createLocatedBlockList(
-          blocks, offset, length, Integer.MAX_VALUE, mode);
 
-      final LocatedBlock lastlb;
-      final boolean isComplete;
+      LocatedBlockBuilder locatedBlocks = providedStorageMap
+          .newLocatedBlocks(Integer.MAX_VALUE)
+          .fileLength(fileSizeExcludeBlocksUnderConstruction)
+          .lastUC(isFileUnderConstruction)
+          .encryption(feInfo)
+          .erasureCoding(ecPolicy);
+
+      createLocatedBlockList(locatedBlocks, blocks, offset, length, mode);
       if (!inSnapshot) {
         final BlockInfo last = blocks[blocks.length - 1];
         final long lastPos = last.isComplete()?
             fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
             : fileSizeExcludeBlocksUnderConstruction;
-        lastlb = createLocatedBlock(last, lastPos, mode);
-        isComplete = last.isComplete();
+
+        locatedBlocks
+          .lastBlock(createLocatedBlock(locatedBlocks, last, lastPos, mode))
+          .lastComplete(last.isComplete());
       } else {
-        lastlb = createLocatedBlock(blocks,
-            fileSizeExcludeBlocksUnderConstruction, mode);
-        isComplete = true;
+        locatedBlocks
+          .lastBlock(createLocatedBlock(locatedBlocks, blocks,
+              fileSizeExcludeBlocksUnderConstruction, mode))
+          .lastComplete(true);
       }
-      LocatedBlocks locations = new LocatedBlocks(
-          fileSizeExcludeBlocksUnderConstruction,
-          isFileUnderConstruction, locatedblocks, lastlb, isComplete, feInfo,
-          ecPolicy);
+      LocatedBlocks locations = locatedBlocks.build();
       // Set caching information for the located blocks.
       CacheManager cm = namesystem.getCacheManager();
       if (cm != null) {
@@ -2336,7 +2353,10 @@ public class BlockManager implements BlockStatsMXBean {
 
       // To minimize startup time, we discard any second (or later) block reports
       // that we receive while still in startup phase.
-      DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());
+      // !#! Register DN with provided storage, not with storage owned by DN
+      // !#! DN should still have a ref to the DNStorageInfo
+      DatanodeStorageInfo storageInfo =
+          providedStorageMap.getStorage(node, storage);
 
       if (storageInfo == null) {
         // We handle this for backwards compatibility.
@@ -2368,9 +2388,12 @@ public class BlockManager implements BlockStatsMXBean {
             nodeID.getDatanodeUuid());
         processFirstBlockReport(storageInfo, newReport);
       } else {
-        invalidatedBlocks = processReport(storageInfo, newReport, context);
+        // Block reports for provided storage are not
+        // maintained by DN heartbeats
+        if (!StorageType.PROVIDED.equals(storageInfo.getStorageType())) {
+          invalidatedBlocks = processReport(storageInfo, newReport, context);
+        }
       }
-      
       storageInfo.receivedBlockReport();
     } finally {
       endTime = Time.monotonicNow();
@@ -2589,7 +2612,7 @@ public class BlockManager implements BlockStatsMXBean {
    * @param report - the initial block report, to be processed
    * @throws IOException 
    */
-  private void processFirstBlockReport(
+  void processFirstBlockReport(
       final DatanodeStorageInfo storageInfo,
       final BlockListAsLongs report) throws IOException {
     if (report == null) return;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java
new file mode 100644
index 0000000..d8bed16
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap.ProvidedBlockList;
+import org.apache.hadoop.hdfs.util.RwLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to load provided blocks in the {@link BlockManager}.
+ */
+public abstract class BlockProvider implements Iterable<Block> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ProvidedStorageMap.class);
+
+  private RwLock lock;
+  private BlockManager bm;
+  private DatanodeStorageInfo storage;
+  private boolean hasDNs = false;
+
+  /**
+   * @param lock the namesystem lock
+   * @param bm block manager
+   * @param storage storage for provided blocks
+   */
+  void init(RwLock lock, BlockManager bm, DatanodeStorageInfo storage) {
+    this.bm = bm;
+    this.lock = lock;
+    this.storage = storage;
+  }
+
+  /**
+   * start the processing of block report for provided blocks.
+   * @throws IOException
+   */
+  void start() throws IOException {
+    assert lock.hasWriteLock() : "Not holding write lock";
+    if (hasDNs) {
+      return;
+    }
+    LOG.info("Calling process first blk report from storage: " + storage);
+    // first pass; periodic refresh should call bm.processReport
+    bm.processFirstBlockReport(storage, new ProvidedBlockList(iterator()));
+    hasDNs = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
index c8923da..6ea5198 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
@@ -82,6 +82,12 @@ public class BlockStoragePolicySuite {
         HdfsConstants.COLD_STORAGE_POLICY_NAME,
         new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY,
         StorageType.EMPTY_ARRAY);
+    final byte providedId = HdfsConstants.PROVIDED_STORAGE_POLICY_ID;
+    policies[providedId] = new BlockStoragePolicy(providedId,
+      HdfsConstants.PROVIDED_STORAGE_POLICY_NAME,
+      new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
+      new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
+      new StorageType[]{StorageType.PROVIDED, StorageType.DISK});
     return new BlockStoragePolicySuite(hotId, policies);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 4b87fd4..ab16f1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -151,7 +151,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
   private final LeavingServiceStatus leavingServiceStatus =
       new LeavingServiceStatus();
 
-  private final Map<String, DatanodeStorageInfo> storageMap =
+  protected final Map<String, DatanodeStorageInfo> storageMap =
       new HashMap<>();
 
   /**
@@ -322,6 +322,12 @@ public class DatanodeDescriptor extends DatanodeInfo {
   boolean hasStaleStorages() {
     synchronized (storageMap) {
       for (DatanodeStorageInfo storage : storageMap.values()) {
+        if (StorageType.PROVIDED.equals(storage.getStorageType())) {
+          // to verify provided storage participated in this hb, requires
+          // check to pass DNDesc.
+          // e.g., storageInfo.verifyBlockReportId(this, curBlockReportId)
+          continue;
+        }
         if (storage.areBlockContentsStale()) {
           return true;
         }
@@ -439,17 +445,22 @@ public class DatanodeDescriptor extends DatanodeInfo {
     this.volumeFailures = volFailures;
     this.volumeFailureSummary = volumeFailureSummary;
     for (StorageReport report : reports) {
+      totalCapacity += report.getCapacity();
+      totalRemaining += report.getRemaining();
+      totalBlockPoolUsed += report.getBlockPoolUsed();
+      totalDfsUsed += report.getDfsUsed();
+      totalNonDfsUsed += report.getNonDfsUsed();
+
+      if (StorageType.PROVIDED.equals(
+          report.getStorage().getStorageType())) {
+        continue;
+      }
       DatanodeStorageInfo storage = updateStorage(report.getStorage());
       if (checkFailedStorages) {
         failedStorageInfos.remove(storage);
       }
 
       storage.receivedHeartbeat(report);
-      totalCapacity += report.getCapacity();
-      totalRemaining += report.getRemaining();
-      totalBlockPoolUsed += report.getBlockPoolUsed();
-      totalDfsUsed += report.getDfsUsed();
-      totalNonDfsUsed += report.getNonDfsUsed();
     }
     rollBlocksScheduled(getLastUpdateMonotonic());
 
@@ -471,6 +482,17 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
+  void injectStorage(DatanodeStorageInfo s) {
+    synchronized (storageMap) {
+      DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
+      if (null == storage) {
+        storageMap.put(s.getStorageID(), s);
+      } else {
+        assert storage == s : "found " + storage + " expected " + s;
+      }
+    }
+  }
+
   /**
    * Remove stale storages from storageMap. We must not remove any storages
    * as long as they have associated block replicas.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 7dcc9fd..038aaf7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -525,6 +525,8 @@ public class DatanodeManager {
     } else {
       networktopology.sortByDistance(client, lb.getLocations(), activeLen);
     }
+    //move PROVIDED storage to the end to prefer local replicas.
+    lb.moveProvidedToEnd(activeLen);
     // must update cache since we modified locations array
     lb.updateCachedStorageInfo();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 8af86d3..ed3905f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -172,6 +172,10 @@ public class DatanodeStorageInfo {
     this.state = state;
   }
 
+  void setHeartbeatedSinceFailover(boolean value) {
+    heartbeatedSinceFailover = value;
+  }
+
   boolean areBlocksOnFailedStorage() {
     return getState() == State.FAILED && !blocks.isEmpty();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LocatedBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LocatedBlockBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LocatedBlockBuilder.java
new file mode 100644
index 0000000..0056887
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LocatedBlockBuilder.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class LocatedBlockBuilder {
+
+  protected long flen;
+  protected List<LocatedBlock> blocks = Collections.<LocatedBlock>emptyList();
+  protected boolean isUC;
+  protected LocatedBlock last;
+  protected boolean lastComplete;
+  protected FileEncryptionInfo feInfo;
+  private final int maxBlocks;
+  protected ErasureCodingPolicy ecPolicy;
+
+  LocatedBlockBuilder(int maxBlocks) {
+    this.maxBlocks = maxBlocks;
+  }
+
+  boolean isBlockMax() {
+    return blocks.size() >= maxBlocks;
+  }
+
+  LocatedBlockBuilder fileLength(long fileLength) {
+    flen = fileLength;
+    return this;
+  }
+
+  LocatedBlockBuilder addBlock(LocatedBlock block) {
+    if (blocks.isEmpty()) {
+      blocks = new ArrayList<>();
+    }
+    blocks.add(block);
+    return this;
+  }
+
+  // return new block so tokens can be set
+  LocatedBlock newLocatedBlock(ExtendedBlock eb,
+      DatanodeStorageInfo[] storage,
+      long pos, boolean isCorrupt) {
+    LocatedBlock blk =
+        BlockManager.newLocatedBlock(eb, storage, pos, isCorrupt);
+    return blk;
+  }
+
+  LocatedBlockBuilder lastUC(boolean underConstruction) {
+    isUC = underConstruction;
+    return this;
+  }
+
+  LocatedBlockBuilder lastBlock(LocatedBlock block) {
+    last = block;
+    return this;
+  }
+
+  LocatedBlockBuilder lastComplete(boolean complete) {
+    lastComplete = complete;
+    return this;
+  }
+
+  LocatedBlockBuilder encryption(FileEncryptionInfo fileEncryptionInfo) {
+    feInfo = fileEncryptionInfo;
+    return this;
+  }
+
+  LocatedBlockBuilder erasureCoding(ErasureCodingPolicy codingPolicy) {
+    ecPolicy = codingPolicy;
+    return this;
+  }
+
+  LocatedBlocks build(DatanodeDescriptor client) {
+    return build();
+  }
+
+  LocatedBlocks build() {
+    return new LocatedBlocks(flen, isUC, blocks, last,
+        lastComplete, feInfo, ecPolicy);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
new file mode 100644
index 0000000..d222344
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
+import org.apache.hadoop.hdfs.util.RwLock;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * This class allows us to manage and multiplex between storages local to
+ * datanodes, and provided storage.
+ */
+public class ProvidedStorageMap {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ProvidedStorageMap.class);
+
+  // limit to a single provider for now
+  private final BlockProvider blockProvider;
+  private final String storageId;
+  private final ProvidedDescriptor providedDescriptor;
+  private final DatanodeStorageInfo providedStorageInfo;
+  private boolean providedEnabled;
+
+  ProvidedStorageMap(RwLock lock, BlockManager bm, Configuration conf)
+      throws IOException {
+
+    storageId = conf.get(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
+        DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT);
+
+    providedEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED,
+        DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT);
+
+    if (!providedEnabled) {
+      // disable mapping
+      blockProvider = null;
+      providedDescriptor = null;
+      providedStorageInfo = null;
+      return;
+    }
+
+    DatanodeStorage ds = new DatanodeStorage(
+        storageId, State.NORMAL, StorageType.PROVIDED);
+    providedDescriptor = new ProvidedDescriptor();
+    providedStorageInfo = providedDescriptor.createProvidedStorage(ds);
+
+    // load block reader into storage
+    Class<? extends BlockProvider> fmt = conf.getClass(
+        DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS,
+        BlockFormatProvider.class, BlockProvider.class);
+
+    blockProvider = ReflectionUtils.newInstance(fmt, conf);
+    blockProvider.init(lock, bm, providedStorageInfo);
+    LOG.info("Loaded block provider class: " +
+        blockProvider.getClass() + " storage: " + providedStorageInfo);
+  }
+
+  /**
+   * @param dn datanode descriptor
+   * @param s data node storage
+   * @return the {@link DatanodeStorageInfo} for the specified datanode.
+   * If {@code s} corresponds to a provided storage, the storage info
+   * representing provided storage is returned.
+   * @throws IOException
+   */
+  DatanodeStorageInfo getStorage(DatanodeDescriptor dn, DatanodeStorage s)
+      throws IOException {
+    if (providedEnabled && storageId.equals(s.getStorageID())) {
+      if (StorageType.PROVIDED.equals(s.getStorageType())) {
+        // poll service, initiate
+        blockProvider.start();
+        dn.injectStorage(providedStorageInfo);
+        return providedDescriptor.getProvidedStorage(dn, s);
+      }
+      LOG.warn("Reserved storage {} reported as non-provided from {}", s, dn);
+    }
+    return dn.getStorageInfo(s.getStorageID());
+  }
+
+  public LocatedBlockBuilder newLocatedBlocks(int maxValue) {
+    if (!providedEnabled) {
+      return new LocatedBlockBuilder(maxValue);
+    }
+    return new ProvidedBlocksBuilder(maxValue);
+  }
+
+  /**
+   * Builder used for creating {@link LocatedBlocks} when a block is provided.
+   */
+  class ProvidedBlocksBuilder extends LocatedBlockBuilder {
+
+    private ShadowDatanodeInfoWithStorage pending;
+
+    ProvidedBlocksBuilder(int maxBlocks) {
+      super(maxBlocks);
+      pending = new ShadowDatanodeInfoWithStorage(
+          providedDescriptor, storageId);
+    }
+
+    @Override
+    LocatedBlock newLocatedBlock(ExtendedBlock eb,
+        DatanodeStorageInfo[] storages, long pos, boolean isCorrupt) {
+
+      DatanodeInfoWithStorage[] locs =
+        new DatanodeInfoWithStorage[storages.length];
+      String[] sids = new String[storages.length];
+      StorageType[] types = new StorageType[storages.length];
+      for (int i = 0; i < storages.length; ++i) {
+        sids[i] = storages[i].getStorageID();
+        types[i] = storages[i].getStorageType();
+        if (StorageType.PROVIDED.equals(storages[i].getStorageType())) {
+          locs[i] = pending;
+        } else {
+          locs[i] = new DatanodeInfoWithStorage(
+              storages[i].getDatanodeDescriptor(), sids[i], types[i]);
+        }
+      }
+      return new LocatedBlock(eb, locs, sids, types, pos, isCorrupt, null);
+    }
+
+    @Override
+    LocatedBlocks build(DatanodeDescriptor client) {
+      // TODO: to support multiple provided storages, need to pass/maintain map
+      // set all fields of pending DatanodeInfo
+      List<String> excludedUUids = new ArrayList<String>();
+      for (LocatedBlock b: blocks) {
+        DatanodeInfo[] infos = b.getLocations();
+        StorageType[] types = b.getStorageTypes();
+
+        for (int i = 0; i < types.length; i++) {
+          if (!StorageType.PROVIDED.equals(types[i])) {
+            excludedUUids.add(infos[i].getDatanodeUuid());
+          }
+        }
+      }
+
+      DatanodeDescriptor dn = providedDescriptor.choose(client, excludedUUids);
+      if (dn == null) {
+        dn = providedDescriptor.choose(client);
+      }
+
+      pending.replaceInternal(dn);
+      return new LocatedBlocks(
+          flen, isUC, blocks, last, lastComplete, feInfo, ecPolicy);
+    }
+
+    @Override
+    LocatedBlocks build() {
+      return build(providedDescriptor.chooseRandom());
+    }
+  }
+
+  /**
+   * An abstract {@link DatanodeInfoWithStorage} to represent provided storage.
+   */
+  static class ShadowDatanodeInfoWithStorage extends DatanodeInfoWithStorage {
+    private String shadowUuid;
+
+    ShadowDatanodeInfoWithStorage(DatanodeDescriptor d, String storageId) {
+      super(d, storageId, StorageType.PROVIDED);
+    }
+
+    @Override
+    public String getDatanodeUuid() {
+      return shadowUuid;
+    }
+
+    public void setDatanodeUuid(String uuid) {
+      shadowUuid = uuid;
+    }
+
+    void replaceInternal(DatanodeDescriptor dn) {
+      updateRegInfo(dn); // overwrite DatanodeID (except UUID)
+      setDatanodeUuid(dn.getDatanodeUuid());
+      setCapacity(dn.getCapacity());
+      setDfsUsed(dn.getDfsUsed());
+      setRemaining(dn.getRemaining());
+      setBlockPoolUsed(dn.getBlockPoolUsed());
+      setCacheCapacity(dn.getCacheCapacity());
+      setCacheUsed(dn.getCacheUsed());
+      setLastUpdate(dn.getLastUpdate());
+      setLastUpdateMonotonic(dn.getLastUpdateMonotonic());
+      setXceiverCount(dn.getXceiverCount());
+      setNetworkLocation(dn.getNetworkLocation());
+      adminState = dn.getAdminState();
+      setUpgradeDomain(dn.getUpgradeDomain());
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return super.equals(obj);
+    }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode();
+    }
+  }
+
+  /**
+   * An abstract DatanodeDescriptor to track datanodes with provided storages.
+   * NOTE: never resolved through registerDatanode, so not in the topology.
+   */
+  static class ProvidedDescriptor extends DatanodeDescriptor {
+
+    private final NavigableMap<String, DatanodeDescriptor> dns =
+        new ConcurrentSkipListMap<>();
+
+    ProvidedDescriptor() {
+      super(new DatanodeID(
+            null,                         // String ipAddr,
+            null,                         // String hostName,
+            UUID.randomUUID().toString(), // String datanodeUuid,
+            0,                            // int xferPort,
+            0,                            // int infoPort,
+            0,                            // int infoSecurePort,
+            0));                          // int ipcPort
+    }
+
+    DatanodeStorageInfo getProvidedStorage(
+        DatanodeDescriptor dn, DatanodeStorage s) {
+      dns.put(dn.getDatanodeUuid(), dn);
+      // TODO: maintain separate RPC ident per dn
+      return storageMap.get(s.getStorageID());
+    }
+
+    DatanodeStorageInfo createProvidedStorage(DatanodeStorage ds) {
+      assert null == storageMap.get(ds.getStorageID());
+      DatanodeStorageInfo storage = new DatanodeStorageInfo(this, ds);
+      storage.setHeartbeatedSinceFailover(true);
+      storageMap.put(storage.getStorageID(), storage);
+      return storage;
+    }
+
+    DatanodeDescriptor choose(DatanodeDescriptor client) {
+      // exact match for now
+      DatanodeDescriptor dn = dns.get(client.getDatanodeUuid());
+      if (null == dn) {
+        dn = chooseRandom();
+      }
+      return dn;
+    }
+
+    DatanodeDescriptor choose(DatanodeDescriptor client,
+        List<String> excludedUUids) {
+      // exact match for now
+      DatanodeDescriptor dn = dns.get(client.getDatanodeUuid());
+
+      if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) {
+        dn = null;
+        Set<String> exploredUUids = new HashSet<String>();
+
+        while(exploredUUids.size() < dns.size()) {
+          Map.Entry<String, DatanodeDescriptor> d =
+                  dns.ceilingEntry(UUID.randomUUID().toString());
+          if (null == d) {
+            d = dns.firstEntry();
+          }
+          String uuid = d.getValue().getDatanodeUuid();
+          //this node has already been explored, and was not selected earlier
+          if (exploredUUids.contains(uuid)) {
+            continue;
+          }
+          exploredUUids.add(uuid);
+          //this node has been excluded
+          if (excludedUUids.contains(uuid)) {
+            continue;
+          }
+          return dns.get(uuid);
+        }
+      }
+
+      return dn;
+    }
+
+    DatanodeDescriptor chooseRandom(DatanodeStorageInfo[] excludedStorages) {
+      // TODO: Currently this is not uniformly random;
+      // skewed toward sparse sections of the ids
+      Set<DatanodeDescriptor> excludedNodes =
+          new HashSet<DatanodeDescriptor>();
+      if (excludedStorages != null) {
+        for (int i= 0; i < excludedStorages.length; i++) {
+          LOG.info("Excluded: " + excludedStorages[i].getDatanodeDescriptor());
+          excludedNodes.add(excludedStorages[i].getDatanodeDescriptor());
+        }
+      }
+      Set<DatanodeDescriptor> exploredNodes = new HashSet<DatanodeDescriptor>();
+
+      while(exploredNodes.size() < dns.size()) {
+        Map.Entry<String, DatanodeDescriptor> d =
+            dns.ceilingEntry(UUID.randomUUID().toString());
+        if (null == d) {
+          d = dns.firstEntry();
+        }
+        DatanodeDescriptor node = d.getValue();
+        //this node has already been explored, and was not selected earlier
+        if (exploredNodes.contains(node)) {
+          continue;
+        }
+        exploredNodes.add(node);
+        //this node has been excluded
+        if (excludedNodes.contains(node)) {
+          continue;
+        }
+        return node;
+      }
+      return null;
+    }
+
+    DatanodeDescriptor chooseRandom() {
+      return chooseRandom(null);
+    }
+
+    @Override
+    void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
+      // pick a random datanode, delegate to it
+      DatanodeDescriptor node = chooseRandom(targets);
+      if (node != null) {
+        node.addBlockToBeReplicated(block, targets);
+      } else {
+        LOG.error("Cannot find a source node to replicate block: "
+            + block + " from");
+      }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return (this == obj) || super.equals(obj);
+    }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode();
+    }
+  }
+
+  /**
+   * Used to emulate block reports for provided blocks.
+   */
+  static class ProvidedBlockList extends BlockListAsLongs {
+
+    private final Iterator<Block> inner;
+
+    ProvidedBlockList(Iterator<Block> inner) {
+      this.inner = inner;
+    }
+
+    @Override
+    public Iterator<BlockReportReplica> iterator() {
+      return new Iterator<BlockReportReplica>() {
+        @Override
+        public BlockReportReplica next() {
+          return new BlockReportReplica(inner.next());
+        }
+        @Override
+        public boolean hasNext() {
+          return inner.hasNext();
+        }
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+      };
+    }
+
+    @Override
+    public int getNumberOfBlocks() {
+      // VERIFY: only printed for debugging
+      return -1;
+    }
+
+    @Override
+    public ByteString getBlocksBuffer() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long[] getBlockListAsLongs() {
+      // should only be used for backwards compat, DN.ver > NN.ver
+      throw new UnsupportedOperationException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 6df243f..0b8063c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4461,14 +4461,30 @@
   </property>
 
   <property>
+    <name>dfs.namenode.provided.enabled</name>
+    <value>false</value>
+    <description>
+      Enables the Namenode to handle provided storages.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.namenode.block.provider.class</name>
+    <value>org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider</value>
+    <description>
+      The class that is used to load provided blocks in the Namenode.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.provider.class</name>
     <value>org.apache.hadoop.hdfs.server.common.TextFileRegionProvider</value>
     <description>
-        The class that is used to load information about blocks stored in
-        provided storages.
-        org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider
-        is used as the default, which expects the blocks to be specified
-        using a delimited text file.
+      The class that is used to load information about blocks stored in
+      provided storages.
+      org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider
+      is used as the default, which expects the blocks to be specified
+      using a delimited text file.
     </description>
   </property>
 
@@ -4476,7 +4492,7 @@
     <name>dfs.provided.df.class</name>
     <value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.DefaultProvidedVolumeDF</value>
     <description>
-        The class that is used to measure usage statistics of provided stores.
+      The class that is used to measure usage statistics of provided stores.
     </description>
   </property>
 
@@ -4484,7 +4500,7 @@
     <name>dfs.provided.storage.id</name>
     <value>DS-PROVIDED</value>
     <description>
-        The storage ID used for provided stores.
+      The storage ID used for provided stores.
     </description>
   </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
index 3a8fb59..12045a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
@@ -84,6 +84,7 @@ public class TestBlockStoragePolicy {
   static final byte ONESSD  = HdfsConstants.ONESSD_STORAGE_POLICY_ID;
   static final byte ALLSSD  = HdfsConstants.ALLSSD_STORAGE_POLICY_ID;
   static final byte LAZY_PERSIST  = HdfsConstants.MEMORY_STORAGE_POLICY_ID;
+  static final byte PROVIDED  = HdfsConstants.PROVIDED_STORAGE_POLICY_ID;
 
   @Test (timeout=300000)
   public void testConfigKeyEnabled() throws IOException {
@@ -143,6 +144,9 @@ public class TestBlockStoragePolicy {
     expectedPolicyStrings.put(ALLSSD, "BlockStoragePolicy{ALL_SSD:" + ALLSSD +
         ", storageTypes=[SSD], creationFallbacks=[DISK], " +
         "replicationFallbacks=[DISK]}");
+    expectedPolicyStrings.put(PROVIDED, "BlockStoragePolicy{PROVIDED:" + PROVIDED +
+        ", storageTypes=[PROVIDED, DISK], creationFallbacks=[PROVIDED, DISK], " +
+        "replicationFallbacks=[PROVIDED, DISK]}");
 
     for(byte i = 1; i < 16; i++) {
       final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i); 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index 30e2aaf..dd24311 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -25,6 +25,7 @@ import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -291,7 +292,7 @@ public class TestDatanodeManager {
    */
   @Test
   public void testSortLocatedBlocks() throws IOException, URISyntaxException {
-    HelperFunction(null);
+    HelperFunction(null, 0);
   }
 
   /**
@@ -303,7 +304,7 @@ public class TestDatanodeManager {
    */
   @Test
   public void testgoodScript() throws IOException, URISyntaxException {
-    HelperFunction("/" + Shell.appendScriptExtension("topology-script"));
+    HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 0);
   }
 
 
@@ -316,7 +317,21 @@ public class TestDatanodeManager {
    */
   @Test
   public void testBadScript() throws IOException, URISyntaxException {
-    HelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script"));
+    HelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script"), 0);
+  }
+
+  /**
+   * Test with different sorting functions but include datanodes
+   * with provided storage
+   * @throws IOException
+   * @throws URISyntaxException
+   */
+  @Test
+  public void testWithProvidedTypes() throws IOException, URISyntaxException {
+    HelperFunction(null, 1);
+    HelperFunction(null, 3);
+    HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 1);
+    HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 2);
   }
 
   /**
@@ -324,11 +339,12 @@ public class TestDatanodeManager {
    * we invoke this function with and without topology scripts
    *
    * @param scriptFileName - Script Name or null
+   * @param providedStorages - number of provided storages to add
    *
    * @throws URISyntaxException
    * @throws IOException
    */
-  public void HelperFunction(String scriptFileName)
+  public void HelperFunction(String scriptFileName, int providedStorages)
     throws URISyntaxException, IOException {
     // create the DatanodeManager which will be tested
     Configuration conf = new Configuration();
@@ -343,17 +359,25 @@ public class TestDatanodeManager {
     }
     DatanodeManager dm = mockDatanodeManager(fsn, conf);
 
+    int totalDNs = 5 + providedStorages;
+
     // register 5 datanodes, each with different storage ID and type
-    DatanodeInfo[] locs = new DatanodeInfo[5];
-    String[] storageIDs = new String[5];
-    StorageType[] storageTypes = new StorageType[]{
-      StorageType.ARCHIVE,
-      StorageType.DEFAULT,
-      StorageType.DISK,
-      StorageType.RAM_DISK,
-      StorageType.SSD
-    };
-    for (int i = 0; i < 5; i++) {
+    DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
+    String[] storageIDs = new String[totalDNs];
+    List<StorageType> storageTypesList = new ArrayList<>(
+        Arrays.asList(StorageType.ARCHIVE,
+            StorageType.DEFAULT,
+            StorageType.DISK,
+            StorageType.RAM_DISK,
+            StorageType.SSD));
+
+    for (int i = 0; i < providedStorages; i++) {
+      storageTypesList.add(StorageType.PROVIDED);
+    }
+
+    StorageType[] storageTypes= storageTypesList.toArray(new StorageType[0]);
+
+    for (int i = 0; i < totalDNs; i++) {
       // register new datanode
       String uuid = "UUID-" + i;
       String ip = "IP-" + i;
@@ -389,9 +413,9 @@ public class TestDatanodeManager {
     DatanodeInfo[] sortedLocs = block.getLocations();
     storageIDs = block.getStorageIDs();
     storageTypes = block.getStorageTypes();
-    assertThat(sortedLocs.length, is(5));
-    assertThat(storageIDs.length, is(5));
-    assertThat(storageTypes.length, is(5));
+    assertThat(sortedLocs.length, is(totalDNs));
+    assertThat(storageIDs.length, is(totalDNs));
+    assertThat(storageTypes.length, is(totalDNs));
     for (int i = 0; i < sortedLocs.length; i++) {
       assertThat(((DatanodeInfoWithStorage) sortedLocs[i]).getStorageID(),
         is(storageIDs[i]));
@@ -405,6 +429,14 @@ public class TestDatanodeManager {
       is(DatanodeInfo.AdminStates.DECOMMISSIONED));
     assertThat(sortedLocs[sortedLocs.length - 2].getAdminState(),
       is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+    // check that the StorageType of datanoodes immediately
+    // preceding the decommissioned datanodes is PROVIDED
+    for (int i = 0; i < providedStorages; i++) {
+      assertThat(
+          ((DatanodeInfoWithStorage)
+              sortedLocs[sortedLocs.length - 3 - i]).getStorageType(),
+          is(StorageType.PROVIDED));
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
new file mode 100644
index 0000000..3b75806
--- /dev/null
+++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Random;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockProvider;
+import org.apache.hadoop.hdfs.server.common.BlockFormat;
+import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
+import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat;
+import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+public class TestNameNodeProvidedImplementation {
+
+  @Rule public TestName name = new TestName();
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestNameNodeProvidedImplementation.class);
+
+  final Random r = new Random();
+  final File fBASE = new File(MiniDFSCluster.getBaseDirectory());
+  final Path BASE = new Path(fBASE.toURI().toString());
+  final Path NAMEPATH = new Path(BASE, "providedDir");;
+  final Path NNDIRPATH = new Path(BASE, "nnDir");
+  final Path BLOCKFILE = new Path(NNDIRPATH, "blocks.csv");
+  final String SINGLEUSER = "usr1";
+  final String SINGLEGROUP = "grp1";
+
+  Configuration conf;
+  MiniDFSCluster cluster;
+
+  @Before
+  public void setSeed() throws Exception {
+    if (fBASE.exists() && !FileUtil.fullyDelete(fBASE)) {
+      throw new IOException("Could not fully delete " + fBASE);
+    }
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    System.out.println(name.getMethodName() + " seed: " + seed);
+    conf = new HdfsConfiguration();
+    conf.set(SingleUGIResolver.USER, SINGLEUSER);
+    conf.set(SingleUGIResolver.GROUP, SINGLEGROUP);
+
+    conf.set(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
+        DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true);
+
+    conf.setClass(DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS,
+        BlockFormatProvider.class, BlockProvider.class);
+    conf.setClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
+        TextFileRegionProvider.class, FileRegionProvider.class);
+    conf.setClass(DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
+        TextFileRegionFormat.class, BlockFormat.class);
+
+    conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_WRITE_PATH,
+        BLOCKFILE.toString());
+    conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_READ_PATH,
+        BLOCKFILE.toString());
+    conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER, ",");
+
+    File imageDir = new File(NAMEPATH.toUri());
+    if (!imageDir.exists()) {
+      LOG.info("Creating directory: " + imageDir);
+      imageDir.mkdirs();
+    }
+
+    File nnDir = new File(NNDIRPATH.toUri());
+    if (!nnDir.exists()) {
+      nnDir.mkdirs();
+    }
+
+    // create 10 random files under BASE
+    for (int i=0; i < 10; i++) {
+      File newFile = new File(new Path(NAMEPATH, "file" + i).toUri());
+      if(!newFile.exists()) {
+        try {
+          LOG.info("Creating " + newFile.toString());
+          newFile.createNewFile();
+          Writer writer = new OutputStreamWriter(
+              new FileOutputStream(newFile.getAbsolutePath()), "utf-8");
+          for(int j=0; j < 10*i; j++) {
+            writer.write("0");
+          }
+          writer.flush();
+          writer.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+
+  @After
+  public void shutdown() throws Exception {
+    try {
+      if (cluster != null) {
+        cluster.shutdown(true, true);
+      }
+    } finally {
+      cluster = null;
+    }
+  }
+
+  void createImage(TreeWalk t, Path out,
+      Class<? extends BlockResolver> blockIdsClass) throws Exception {
+    ImageWriter.Options opts = ImageWriter.defaults();
+    opts.setConf(conf);
+    opts.output(out.toString())
+        .blocks(TextFileRegionFormat.class)
+        .blockIds(blockIdsClass);
+    try (ImageWriter w = new ImageWriter(opts)) {
+      for (TreePath e : t) {
+        w.accept(e);
+      }
+    }
+  }
+
+  void startCluster(Path nspath, int numDatanodes,
+      StorageType[] storageTypes,
+      StorageType[][] storageTypesPerDatanode)
+      throws IOException {
+    conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString());
+
+    if (storageTypesPerDatanode != null) {
+      cluster = new MiniDFSCluster.Builder(conf)
+          .format(false)
+          .manageNameDfsDirs(false)
+          .numDataNodes(numDatanodes)
+          .storageTypes(storageTypesPerDatanode)
+          .build();
+    } else if (storageTypes != null) {
+      cluster = new MiniDFSCluster.Builder(conf)
+          .format(false)
+          .manageNameDfsDirs(false)
+          .numDataNodes(numDatanodes)
+          .storagesPerDatanode(storageTypes.length)
+          .storageTypes(storageTypes)
+          .build();
+    } else {
+      cluster = new MiniDFSCluster.Builder(conf)
+          .format(false)
+          .manageNameDfsDirs(false)
+          .numDataNodes(numDatanodes)
+          .build();
+    }
+    cluster.waitActive();
+  }
+
+  @Test(timeout = 20000)
+  public void testLoadImage() throws Exception {
+    final long seed = r.nextLong();
+    LOG.info("NAMEPATH: " + NAMEPATH);
+    createImage(new RandomTreeWalk(seed), NNDIRPATH, FixedBlockResolver.class);
+    startCluster(NNDIRPATH, 0, new StorageType[] {StorageType.PROVIDED}, null);
+
+    FileSystem fs = cluster.getFileSystem();
+    for (TreePath e : new RandomTreeWalk(seed)) {
+      FileStatus rs = e.getFileStatus();
+      Path hp = new Path(rs.getPath().toUri().getPath());
+      assertTrue(fs.exists(hp));
+      FileStatus hs = fs.getFileStatus(hp);
+      assertEquals(rs.getPath().toUri().getPath(),
+                   hs.getPath().toUri().getPath());
+      assertEquals(rs.getPermission(), hs.getPermission());
+      assertEquals(rs.getLen(), hs.getLen());
+      assertEquals(SINGLEUSER, hs.getOwner());
+      assertEquals(SINGLEGROUP, hs.getGroup());
+      assertEquals(rs.getAccessTime(), hs.getAccessTime());
+      assertEquals(rs.getModificationTime(), hs.getModificationTime());
+    }
+  }
+
+  @Test(timeout=20000)
+  public void testBlockLoad() throws Exception {
+    conf.setClass(ImageWriter.Options.UGI_CLASS,
+        SingleUGIResolver.class, UGIResolver.class);
+    createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
+        FixedBlockResolver.class);
+    startCluster(NNDIRPATH, 1, new StorageType[] {StorageType.PROVIDED}, null);
+  }
+
+  @Test(timeout=500000)
+  public void testDefaultReplication() throws Exception {
+    int targetReplication = 2;
+    conf.setInt(FixedBlockMultiReplicaResolver.REPLICATION, targetReplication);
+    createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
+        FixedBlockMultiReplicaResolver.class);
+    // make the last Datanode with only DISK
+    startCluster(NNDIRPATH, 3, null,
+        new StorageType[][] {
+          {StorageType.PROVIDED},
+          {StorageType.PROVIDED},
+          {StorageType.DISK}}
+        );
+    // wait for the replication to finish
+    Thread.sleep(50000);
+
+    FileSystem fs = cluster.getFileSystem();
+    int count = 0;
+    for (TreePath e : new FSTreeWalk(NAMEPATH, conf)) {
+      FileStatus rs = e.getFileStatus();
+      Path hp = removePrefix(NAMEPATH, rs.getPath());
+      LOG.info("hp " + hp.toUri().getPath());
+      //skip HDFS specific files, which may have been created later on.
+      if (hp.toString().contains("in_use.lock")
+          || hp.toString().contains("current")) {
+        continue;
+      }
+      e.accept(count++);
+      assertTrue(fs.exists(hp));
+      FileStatus hs = fs.getFileStatus(hp);
+
+      if (rs.isFile()) {
+        BlockLocation[] bl = fs.getFileBlockLocations(
+            hs.getPath(), 0, hs.getLen());
+        int i = 0;
+        for(; i < bl.length; i++) {
+          int currentRep = bl[i].getHosts().length;
+          assertEquals(targetReplication , currentRep);
+        }
+      }
+    }
+  }
+
+
+  static Path removePrefix(Path base, Path walk) {
+    Path wpath = new Path(walk.toUri().getPath());
+    Path bpath = new Path(base.toUri().getPath());
+    Path ret = new Path("/");
+    while (!(bpath.equals(wpath) || "".equals(wpath.getName()))) {
+      ret = "".equals(ret.getName())
+        ? new Path("/", wpath.getName())
+        : new Path(new Path("/", wpath.getName()),
+                   new Path(ret.toString().substring(1)));
+      wpath = wpath.getParent();
+    }
+    if (!bpath.equals(wpath)) {
+      throw new IllegalArgumentException(base + " not a prefix of " + walk);
+    }
+    return ret;
+  }
+
+  @Test(timeout=30000)
+  public void testBlockRead() throws Exception {
+    conf.setClass(ImageWriter.Options.UGI_CLASS,
+        FsUGIResolver.class, UGIResolver.class);
+    createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
+        FixedBlockResolver.class);
+    startCluster(NNDIRPATH, 3, new StorageType[] {StorageType.PROVIDED}, null);
+    FileSystem fs = cluster.getFileSystem();
+    Thread.sleep(2000);
+    int count = 0;
+    // read NN metadata, verify contents match
+    for (TreePath e : new FSTreeWalk(NAMEPATH, conf)) {
+      FileStatus rs = e.getFileStatus();
+      Path hp = removePrefix(NAMEPATH, rs.getPath());
+      LOG.info("hp " + hp.toUri().getPath());
+      //skip HDFS specific files, which may have been created later on.
+      if(hp.toString().contains("in_use.lock")
+          || hp.toString().contains("current")) {
+        continue;
+      }
+      e.accept(count++);
+      assertTrue(fs.exists(hp));
+      FileStatus hs = fs.getFileStatus(hp);
+      assertEquals(hp.toUri().getPath(), hs.getPath().toUri().getPath());
+      assertEquals(rs.getPermission(), hs.getPermission());
+      assertEquals(rs.getOwner(), hs.getOwner());
+      assertEquals(rs.getGroup(), hs.getGroup());
+
+      if (rs.isFile()) {
+        assertEquals(rs.getLen(), hs.getLen());
+        try (ReadableByteChannel i = Channels.newChannel(
+              new FileInputStream(new File(rs.getPath().toUri())))) {
+          try (ReadableByteChannel j = Channels.newChannel(
+                fs.open(hs.getPath()))) {
+            ByteBuffer ib = ByteBuffer.allocate(4096);
+            ByteBuffer jb = ByteBuffer.allocate(4096);
+            while (true) {
+              int il = i.read(ib);
+              int jl = j.read(jb);
+              if (il < 0 || jl < 0) {
+                assertEquals(il, jl);
+                break;
+              }
+              ib.flip();
+              jb.flip();
+              int cmp = Math.min(ib.remaining(), jb.remaining());
+              for (int k = 0; k < cmp; ++k) {
+                assertEquals(ib.get(), jb.get());
+              }
+              ib.compact();
+              jb.compact();
+            }
+
+          }
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org