You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2017/05/11 21:42:46 UTC
[50/50] [abbrv] hadoop git commit: HDFS-11190. [READ] Namenode
support for data stored in external stores.
HDFS-11190. [READ] Namenode support for data stored in external stores.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/77dc3a0f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/77dc3a0f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/77dc3a0f
Branch: refs/heads/HDFS-9806
Commit: 77dc3a0f63bec431013a3c5f271ae6bc25e644af
Parents: b3bb531
Author: Virajith Jalaparti <vi...@apache.org>
Authored: Fri Apr 21 11:12:36 2017 -0700
Committer: Virajith Jalaparti <vi...@apache.org>
Committed: Thu May 11 14:41:16 2017 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/protocol/LocatedBlock.java | 96 ++++-
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 +
.../blockmanagement/BlockFormatProvider.java | 91 ++++
.../server/blockmanagement/BlockManager.java | 95 +++--
.../server/blockmanagement/BlockProvider.java | 65 +++
.../BlockStoragePolicySuite.java | 6 +
.../blockmanagement/DatanodeDescriptor.java | 34 +-
.../server/blockmanagement/DatanodeManager.java | 2 +
.../blockmanagement/DatanodeStorageInfo.java | 4 +
.../blockmanagement/LocatedBlockBuilder.java | 109 +++++
.../blockmanagement/ProvidedStorageMap.java | 427 +++++++++++++++++++
.../src/main/resources/hdfs-default.xml | 30 +-
.../hadoop/hdfs/TestBlockStoragePolicy.java | 4 +
.../blockmanagement/TestDatanodeManager.java | 66 ++-
.../TestNameNodeProvidedImplementation.java | 345 +++++++++++++++
15 files changed, 1293 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index 85bec92..5ad0bca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.protocol;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import com.google.common.base.Preconditions;
@@ -62,40 +63,50 @@ public class LocatedBlock {
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
// By default, startOffset is unknown(-1) and corrupt is false.
- this(b, locs, null, null, -1, false, EMPTY_LOCS);
+ this(b, convert(locs, null, null), null, null, -1, false, EMPTY_LOCS);
}
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs,
String[] storageIDs, StorageType[] storageTypes) {
- this(b, locs, storageIDs, storageTypes, -1, false, EMPTY_LOCS);
+ this(b, convert(locs, storageIDs, storageTypes),
+ storageIDs, storageTypes, -1, false, EMPTY_LOCS);
}
- public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs,
- StorageType[] storageTypes, long startOffset,
+ public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs,
+ String[] storageIDs, StorageType[] storageTypes, long startOffset,
+ boolean corrupt, DatanodeInfo[] cachedLocs) {
+ this(b, convert(locs, storageIDs, storageTypes),
+ storageIDs, storageTypes, startOffset, corrupt,
+ null == cachedLocs || 0 == cachedLocs.length ? EMPTY_LOCS : cachedLocs);
+ }
+
+ public LocatedBlock(ExtendedBlock b, DatanodeInfoWithStorage[] locs,
+ String[] storageIDs, StorageType[] storageTypes, long startOffset,
boolean corrupt, DatanodeInfo[] cachedLocs) {
this.b = b;
this.offset = startOffset;
this.corrupt = corrupt;
- if (locs==null) {
- this.locs = EMPTY_LOCS;
- } else {
- this.locs = new DatanodeInfoWithStorage[locs.length];
- for(int i = 0; i < locs.length; i++) {
- DatanodeInfo di = locs[i];
- DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di,
- storageIDs != null ? storageIDs[i] : null,
- storageTypes != null ? storageTypes[i] : null);
- this.locs[i] = storage;
- }
- }
+ this.locs = null == locs ? EMPTY_LOCS : locs;
this.storageIDs = storageIDs;
this.storageTypes = storageTypes;
+ this.cachedLocs = null == cachedLocs || 0 == cachedLocs.length
+ ? EMPTY_LOCS
+ : cachedLocs;
+ }
+
+ private static DatanodeInfoWithStorage[] convert(
+ DatanodeInfo[] infos, String[] storageIDs, StorageType[] storageTypes) {
+ if (null == infos) {
+ return EMPTY_LOCS;
+ }
- if (cachedLocs == null || cachedLocs.length == 0) {
- this.cachedLocs = EMPTY_LOCS;
- } else {
- this.cachedLocs = cachedLocs;
+ DatanodeInfoWithStorage[] ret = new DatanodeInfoWithStorage[infos.length];
+ for(int i = 0; i < infos.length; i++) {
+ ret[i] = new DatanodeInfoWithStorage(infos[i],
+ storageIDs != null ? storageIDs[i] : null,
+ storageTypes != null ? storageTypes[i] : null);
}
+ return ret;
}
public Token<BlockTokenIdentifier> getBlockToken() {
@@ -145,6 +156,51 @@ public class LocatedBlock {
}
}
+ /**
+ * Comparator that ensures that a PROVIDED storage type is greater than
+ * any other storage type. Any other storage types are considered equal.
+ */
+ private class ProvidedLastComparator
+ implements Comparator<DatanodeInfoWithStorage> {
+ @Override
+ public int compare(DatanodeInfoWithStorage dns1,
+ DatanodeInfoWithStorage dns2) {
+ if (StorageType.PROVIDED.equals(dns1.getStorageType())
+ && !StorageType.PROVIDED.equals(dns2.getStorageType())) {
+ return 1;
+ }
+ if (!StorageType.PROVIDED.equals(dns1.getStorageType())
+ && StorageType.PROVIDED.equals(dns2.getStorageType())) {
+ return -1;
+ }
+ // Storage types of dns1 and dns2 are now both provided or not provided;
+ // thus, are essentially equal for the purpose of this comparator.
+ return 0;
+ }
+ }
+
+ /**
+ * Moves all locations that have {@link StorageType}
+ * {@code PROVIDED} to the end of the locations array without
+ * changing the relative ordering of the remaining locations
+ * Only the first {@code activeLen} locations are considered.
+ * The caller must immediately invoke {@link
+ * org.apache.hadoop.hdfs.protocol.LocatedBlock#updateCachedStorageInfo}
+ * to update the cached Storage ID/Type arrays.
+ * @param activeLen
+ */
+ public void moveProvidedToEnd(int activeLen) {
+
+ if (activeLen <= 0) {
+ return;
+ }
+ // as this is a stable sort, for elements that are equal,
+ // the current order of the elements is maintained
+ Arrays.sort(locs, 0,
+ (activeLen < locs.length) ? activeLen : locs.length,
+ new ProvidedLastComparator());
+ }
+
public long getStartOffset() {
return offset;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 6406d35..e252b6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -314,6 +314,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.edits.asynclogging";
public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = false;
+ public static final String DFS_NAMENODE_PROVIDED_ENABLED = "dfs.namenode.provided.enabled";
+ public static final boolean DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT = false;
+
+ public static final String DFS_NAMENODE_BLOCK_PROVIDER_CLASS = "dfs.namenode.block.provider.class";
+
public static final String DFS_PROVIDER_CLASS = "dfs.provider.class";
public static final String DFS_PROVIDER_DF_CLASS = "dfs.provided.df.class";
public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java
new file mode 100644
index 0000000..930263d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.BlockAlias;
+import org.apache.hadoop.hdfs.server.common.BlockFormat;
+import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Loads provided blocks from a {@link BlockFormat}.
+ */
+public class BlockFormatProvider extends BlockProvider
+ implements Configurable {
+
+ private Configuration conf;
+ private BlockFormat<? extends BlockAlias> blockFormat;
+ public static final Logger LOG =
+ LoggerFactory.getLogger(BlockFormatProvider.class);
+
+ @Override
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void setConf(Configuration conf) {
+ Class<? extends BlockFormat> c = conf.getClass(
+ DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
+ TextFileRegionFormat.class, BlockFormat.class);
+ blockFormat = ReflectionUtils.newInstance(c, conf);
+ LOG.info("Loaded BlockFormat class : " + c.getClass().getName());
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public Iterator<Block> iterator() {
+ try {
+ final BlockFormat.Reader<? extends BlockAlias> reader =
+ blockFormat.getReader(null);
+
+ return new Iterator<Block>() {
+
+ private final Iterator<? extends BlockAlias> inner = reader.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+
+ @Override
+ public Block next() {
+ return inner.next().getBlock();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read provided blocks", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index a9592bf..c5b9c8b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -375,6 +375,9 @@ public class BlockManager implements BlockStatsMXBean {
*/
private final short minReplicationToBeInMaintenance;
+ /** Storages accessible from multiple DNs. */
+ private final ProvidedStorageMap providedStorageMap;
+
public BlockManager(final Namesystem namesystem, boolean haEnabled,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
@@ -407,6 +410,8 @@ public class BlockManager implements BlockStatsMXBean {
blockTokenSecretManager = createBlockTokenSecretManager(conf);
+ providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
+
this.maxCorruptFilesReturned = conf.getInt(
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
@@ -1053,7 +1058,7 @@ public class BlockManager implements BlockStatsMXBean {
final long fileLength = bc.computeContentSummary(
getStoragePolicySuite()).getLength();
final long pos = fileLength - lastBlock.getNumBytes();
- return createLocatedBlock(lastBlock, pos,
+ return createLocatedBlock(null, lastBlock, pos,
BlockTokenIdentifier.AccessMode.WRITE);
}
@@ -1074,8 +1079,10 @@ public class BlockManager implements BlockStatsMXBean {
return locations;
}
- private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
- final long offset, final long length, final int nrBlocksToReturn,
+ private void createLocatedBlockList(
+ LocatedBlockBuilder locatedBlocks,
+ final BlockInfo[] blocks,
+ final long offset, final long length,
final AccessMode mode) throws IOException {
int curBlk;
long curPos = 0, blkSize = 0;
@@ -1090,21 +1097,22 @@ public class BlockManager implements BlockStatsMXBean {
}
if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
- return Collections.emptyList();
+ return;
long endOff = offset + length;
- List<LocatedBlock> results = new ArrayList<>(blocks.length);
do {
- results.add(createLocatedBlock(blocks[curBlk], curPos, mode));
+ locatedBlocks.addBlock(
+ createLocatedBlock(locatedBlocks, blocks[curBlk], curPos, mode));
curPos += blocks[curBlk].getNumBytes();
curBlk++;
} while (curPos < endOff
&& curBlk < blocks.length
- && results.size() < nrBlocksToReturn);
- return results;
+ && !locatedBlocks.isBlockMax());
+ return;
}
- private LocatedBlock createLocatedBlock(final BlockInfo[] blocks,
+ private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks,
+ final BlockInfo[] blocks,
final long endPos, final AccessMode mode) throws IOException {
int curBlk;
long curPos = 0;
@@ -1117,12 +1125,13 @@ public class BlockManager implements BlockStatsMXBean {
curPos += blkSize;
}
- return createLocatedBlock(blocks[curBlk], curPos, mode);
+ return createLocatedBlock(locatedBlocks, blocks[curBlk], curPos, mode);
}
- private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos,
- final AccessMode mode) throws IOException {
- final LocatedBlock lb = createLocatedBlock(blk, pos);
+ private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks,
+ final BlockInfo blk, final long pos, final AccessMode mode)
+ throws IOException {
+ final LocatedBlock lb = createLocatedBlock(locatedBlocks, blk, pos);
if (mode != null) {
setBlockToken(lb, mode);
}
@@ -1130,21 +1139,24 @@ public class BlockManager implements BlockStatsMXBean {
}
/** @return a LocatedBlock for the given block */
- private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos)
- throws IOException {
+ private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks,
+ final BlockInfo blk, final long pos) throws IOException {
if (!blk.isComplete()) {
final BlockUnderConstructionFeature uc = blk.getUnderConstructionFeature();
if (blk.isStriped()) {
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(),
blk);
+ //TODO use locatedBlocks builder??
return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
false);
} else {
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(),
blk);
- return newLocatedBlock(eb, storages, pos, false);
+ return null == locatedBlocks
+ ? newLocatedBlock(eb, storages, pos, false)
+ : locatedBlocks.newLocatedBlock(eb, storages, pos, false);
}
}
@@ -1208,9 +1220,10 @@ public class BlockManager implements BlockStatsMXBean {
" numCorrupt: " + numCorruptNodes +
" numCorruptRepls: " + numCorruptReplicas;
final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(), blk);
- return blockIndices == null ?
- newLocatedBlock(eb, machines, pos, isCorrupt) :
- newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
+ return blockIndices == null
+ ? null == locatedBlocks ? newLocatedBlock(eb, machines, pos, isCorrupt)
+ : locatedBlocks.newLocatedBlock(eb, machines, pos, isCorrupt)
+ : newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
}
/** Create a LocatedBlocks. */
@@ -1232,27 +1245,31 @@ public class BlockManager implements BlockStatsMXBean {
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
}
final AccessMode mode = needBlockToken? BlockTokenIdentifier.AccessMode.READ: null;
- final List<LocatedBlock> locatedblocks = createLocatedBlockList(
- blocks, offset, length, Integer.MAX_VALUE, mode);
- final LocatedBlock lastlb;
- final boolean isComplete;
+ LocatedBlockBuilder locatedBlocks = providedStorageMap
+ .newLocatedBlocks(Integer.MAX_VALUE)
+ .fileLength(fileSizeExcludeBlocksUnderConstruction)
+ .lastUC(isFileUnderConstruction)
+ .encryption(feInfo)
+ .erasureCoding(ecPolicy);
+
+ createLocatedBlockList(locatedBlocks, blocks, offset, length, mode);
if (!inSnapshot) {
final BlockInfo last = blocks[blocks.length - 1];
final long lastPos = last.isComplete()?
fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
: fileSizeExcludeBlocksUnderConstruction;
- lastlb = createLocatedBlock(last, lastPos, mode);
- isComplete = last.isComplete();
+
+ locatedBlocks
+ .lastBlock(createLocatedBlock(locatedBlocks, last, lastPos, mode))
+ .lastComplete(last.isComplete());
} else {
- lastlb = createLocatedBlock(blocks,
- fileSizeExcludeBlocksUnderConstruction, mode);
- isComplete = true;
+ locatedBlocks
+ .lastBlock(createLocatedBlock(locatedBlocks, blocks,
+ fileSizeExcludeBlocksUnderConstruction, mode))
+ .lastComplete(true);
}
- LocatedBlocks locations = new LocatedBlocks(
- fileSizeExcludeBlocksUnderConstruction,
- isFileUnderConstruction, locatedblocks, lastlb, isComplete, feInfo,
- ecPolicy);
+ LocatedBlocks locations = locatedBlocks.build();
// Set caching information for the located blocks.
CacheManager cm = namesystem.getCacheManager();
if (cm != null) {
@@ -2336,7 +2353,10 @@ public class BlockManager implements BlockStatsMXBean {
// To minimize startup time, we discard any second (or later) block reports
// that we receive while still in startup phase.
- DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());
+ // !#! Register DN with provided storage, not with storage owned by DN
+ // !#! DN should still have a ref to the DNStorageInfo
+ DatanodeStorageInfo storageInfo =
+ providedStorageMap.getStorage(node, storage);
if (storageInfo == null) {
// We handle this for backwards compatibility.
@@ -2368,9 +2388,12 @@ public class BlockManager implements BlockStatsMXBean {
nodeID.getDatanodeUuid());
processFirstBlockReport(storageInfo, newReport);
} else {
- invalidatedBlocks = processReport(storageInfo, newReport, context);
+ // Block reports for provided storage are not
+ // maintained by DN heartbeats
+ if (!StorageType.PROVIDED.equals(storageInfo.getStorageType())) {
+ invalidatedBlocks = processReport(storageInfo, newReport, context);
+ }
}
-
storageInfo.receivedBlockReport();
} finally {
endTime = Time.monotonicNow();
@@ -2589,7 +2612,7 @@ public class BlockManager implements BlockStatsMXBean {
* @param report - the initial block report, to be processed
* @throws IOException
*/
- private void processFirstBlockReport(
+ void processFirstBlockReport(
final DatanodeStorageInfo storageInfo,
final BlockListAsLongs report) throws IOException {
if (report == null) return;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java
new file mode 100644
index 0000000..d8bed16
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap.ProvidedBlockList;
+import org.apache.hadoop.hdfs.util.RwLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to load provided blocks in the {@link BlockManager}.
+ */
+public abstract class BlockProvider implements Iterable<Block> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ProvidedStorageMap.class);
+
+ private RwLock lock;
+ private BlockManager bm;
+ private DatanodeStorageInfo storage;
+ private boolean hasDNs = false;
+
+ /**
+ * @param lock the namesystem lock
+ * @param bm block manager
+ * @param storage storage for provided blocks
+ */
+ void init(RwLock lock, BlockManager bm, DatanodeStorageInfo storage) {
+ this.bm = bm;
+ this.lock = lock;
+ this.storage = storage;
+ }
+
+ /**
+ * start the processing of block report for provided blocks.
+ * @throws IOException
+ */
+ void start() throws IOException {
+ assert lock.hasWriteLock() : "Not holding write lock";
+ if (hasDNs) {
+ return;
+ }
+ LOG.info("Calling process first blk report from storage: " + storage);
+ // first pass; periodic refresh should call bm.processReport
+ bm.processFirstBlockReport(storage, new ProvidedBlockList(iterator()));
+ hasDNs = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
index c8923da..6ea5198 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
@@ -82,6 +82,12 @@ public class BlockStoragePolicySuite {
HdfsConstants.COLD_STORAGE_POLICY_NAME,
new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY,
StorageType.EMPTY_ARRAY);
+ final byte providedId = HdfsConstants.PROVIDED_STORAGE_POLICY_ID;
+ policies[providedId] = new BlockStoragePolicy(providedId,
+ HdfsConstants.PROVIDED_STORAGE_POLICY_NAME,
+ new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
+ new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
+ new StorageType[]{StorageType.PROVIDED, StorageType.DISK});
return new BlockStoragePolicySuite(hotId, policies);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 4b87fd4..ab16f1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -151,7 +151,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
private final LeavingServiceStatus leavingServiceStatus =
new LeavingServiceStatus();
- private final Map<String, DatanodeStorageInfo> storageMap =
+ protected final Map<String, DatanodeStorageInfo> storageMap =
new HashMap<>();
/**
@@ -322,6 +322,12 @@ public class DatanodeDescriptor extends DatanodeInfo {
boolean hasStaleStorages() {
synchronized (storageMap) {
for (DatanodeStorageInfo storage : storageMap.values()) {
+ if (StorageType.PROVIDED.equals(storage.getStorageType())) {
+ // to verify provided storage participated in this hb, requires
+ // check to pass DNDesc.
+ // e.g., storageInfo.verifyBlockReportId(this, curBlockReportId)
+ continue;
+ }
if (storage.areBlockContentsStale()) {
return true;
}
@@ -439,17 +445,22 @@ public class DatanodeDescriptor extends DatanodeInfo {
this.volumeFailures = volFailures;
this.volumeFailureSummary = volumeFailureSummary;
for (StorageReport report : reports) {
+ totalCapacity += report.getCapacity();
+ totalRemaining += report.getRemaining();
+ totalBlockPoolUsed += report.getBlockPoolUsed();
+ totalDfsUsed += report.getDfsUsed();
+ totalNonDfsUsed += report.getNonDfsUsed();
+
+ if (StorageType.PROVIDED.equals(
+ report.getStorage().getStorageType())) {
+ continue;
+ }
DatanodeStorageInfo storage = updateStorage(report.getStorage());
if (checkFailedStorages) {
failedStorageInfos.remove(storage);
}
storage.receivedHeartbeat(report);
- totalCapacity += report.getCapacity();
- totalRemaining += report.getRemaining();
- totalBlockPoolUsed += report.getBlockPoolUsed();
- totalDfsUsed += report.getDfsUsed();
- totalNonDfsUsed += report.getNonDfsUsed();
}
rollBlocksScheduled(getLastUpdateMonotonic());
@@ -471,6 +482,17 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
+ void injectStorage(DatanodeStorageInfo s) {
+ synchronized (storageMap) {
+ DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
+ if (null == storage) {
+ storageMap.put(s.getStorageID(), s);
+ } else {
+ assert storage == s : "found " + storage + " expected " + s;
+ }
+ }
+ }
+
/**
* Remove stale storages from storageMap. We must not remove any storages
* as long as they have associated block replicas.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 7dcc9fd..038aaf7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -525,6 +525,8 @@ public class DatanodeManager {
} else {
networktopology.sortByDistance(client, lb.getLocations(), activeLen);
}
+ //move PROVIDED storage to the end to prefer local replicas.
+ lb.moveProvidedToEnd(activeLen);
// must update cache since we modified locations array
lb.updateCachedStorageInfo();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 8af86d3..ed3905f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -172,6 +172,10 @@ public class DatanodeStorageInfo {
this.state = state;
}
+ void setHeartbeatedSinceFailover(boolean value) {
+ heartbeatedSinceFailover = value;
+ }
+
boolean areBlocksOnFailedStorage() {
return getState() == State.FAILED && !blocks.isEmpty();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LocatedBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LocatedBlockBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LocatedBlockBuilder.java
new file mode 100644
index 0000000..0056887
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LocatedBlockBuilder.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class LocatedBlockBuilder {
+
+ protected long flen;
+ protected List<LocatedBlock> blocks = Collections.<LocatedBlock>emptyList();
+ protected boolean isUC;
+ protected LocatedBlock last;
+ protected boolean lastComplete;
+ protected FileEncryptionInfo feInfo;
+ private final int maxBlocks;
+ protected ErasureCodingPolicy ecPolicy;
+
+ LocatedBlockBuilder(int maxBlocks) {
+ this.maxBlocks = maxBlocks;
+ }
+
+ boolean isBlockMax() {
+ return blocks.size() >= maxBlocks;
+ }
+
+ LocatedBlockBuilder fileLength(long fileLength) {
+ flen = fileLength;
+ return this;
+ }
+
+ LocatedBlockBuilder addBlock(LocatedBlock block) {
+ if (blocks.isEmpty()) {
+ blocks = new ArrayList<>();
+ }
+ blocks.add(block);
+ return this;
+ }
+
+ // return new block so tokens can be set
+ LocatedBlock newLocatedBlock(ExtendedBlock eb,
+ DatanodeStorageInfo[] storage,
+ long pos, boolean isCorrupt) {
+ LocatedBlock blk =
+ BlockManager.newLocatedBlock(eb, storage, pos, isCorrupt);
+ return blk;
+ }
+
+ LocatedBlockBuilder lastUC(boolean underConstruction) {
+ isUC = underConstruction;
+ return this;
+ }
+
+ LocatedBlockBuilder lastBlock(LocatedBlock block) {
+ last = block;
+ return this;
+ }
+
+ LocatedBlockBuilder lastComplete(boolean complete) {
+ lastComplete = complete;
+ return this;
+ }
+
+ LocatedBlockBuilder encryption(FileEncryptionInfo fileEncryptionInfo) {
+ feInfo = fileEncryptionInfo;
+ return this;
+ }
+
+ LocatedBlockBuilder erasureCoding(ErasureCodingPolicy codingPolicy) {
+ ecPolicy = codingPolicy;
+ return this;
+ }
+
+ LocatedBlocks build(DatanodeDescriptor client) {
+ return build();
+ }
+
+ LocatedBlocks build() {
+ return new LocatedBlocks(flen, isUC, blocks, last,
+ lastComplete, feInfo, ecPolicy);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
new file mode 100644
index 0000000..d222344
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
+import org.apache.hadoop.hdfs.util.RwLock;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * This class allows us to manage and multiplex between storages local to
+ * datanodes, and provided storage.
+ */
+public class ProvidedStorageMap {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ProvidedStorageMap.class);
+
+ // limit to a single provider for now
+ private final BlockProvider blockProvider;
+ private final String storageId;
+ private final ProvidedDescriptor providedDescriptor;
+ private final DatanodeStorageInfo providedStorageInfo;
+ private boolean providedEnabled;
+
+ ProvidedStorageMap(RwLock lock, BlockManager bm, Configuration conf)
+ throws IOException {
+
+ storageId = conf.get(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
+ DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT);
+
+ providedEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED,
+ DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT);
+
+ if (!providedEnabled) {
+ // disable mapping
+ blockProvider = null;
+ providedDescriptor = null;
+ providedStorageInfo = null;
+ return;
+ }
+
+ DatanodeStorage ds = new DatanodeStorage(
+ storageId, State.NORMAL, StorageType.PROVIDED);
+ providedDescriptor = new ProvidedDescriptor();
+ providedStorageInfo = providedDescriptor.createProvidedStorage(ds);
+
+ // load block reader into storage
+ Class<? extends BlockProvider> fmt = conf.getClass(
+ DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS,
+ BlockFormatProvider.class, BlockProvider.class);
+
+ blockProvider = ReflectionUtils.newInstance(fmt, conf);
+ blockProvider.init(lock, bm, providedStorageInfo);
+ LOG.info("Loaded block provider class: " +
+ blockProvider.getClass() + " storage: " + providedStorageInfo);
+ }
+
+ /**
+ * @param dn datanode descriptor
+ * @param s data node storage
+ * @return the {@link DatanodeStorageInfo} for the specified datanode.
+ * If {@code s} corresponds to a provided storage, the storage info
+ * representing provided storage is returned.
+ * @throws IOException
+ */
+ DatanodeStorageInfo getStorage(DatanodeDescriptor dn, DatanodeStorage s)
+ throws IOException {
+ if (providedEnabled && storageId.equals(s.getStorageID())) {
+ if (StorageType.PROVIDED.equals(s.getStorageType())) {
+ // poll service, initiate
+ blockProvider.start();
+ dn.injectStorage(providedStorageInfo);
+ return providedDescriptor.getProvidedStorage(dn, s);
+ }
+ LOG.warn("Reserved storage {} reported as non-provided from {}", s, dn);
+ }
+ return dn.getStorageInfo(s.getStorageID());
+ }
+
+ public LocatedBlockBuilder newLocatedBlocks(int maxValue) {
+ if (!providedEnabled) {
+ return new LocatedBlockBuilder(maxValue);
+ }
+ return new ProvidedBlocksBuilder(maxValue);
+ }
+
+ /**
+ * Builder used for creating {@link LocatedBlocks} when a block is provided.
+ */
+ class ProvidedBlocksBuilder extends LocatedBlockBuilder {
+
+ private ShadowDatanodeInfoWithStorage pending;
+
+ ProvidedBlocksBuilder(int maxBlocks) {
+ super(maxBlocks);
+ pending = new ShadowDatanodeInfoWithStorage(
+ providedDescriptor, storageId);
+ }
+
+ @Override
+ LocatedBlock newLocatedBlock(ExtendedBlock eb,
+ DatanodeStorageInfo[] storages, long pos, boolean isCorrupt) {
+
+ DatanodeInfoWithStorage[] locs =
+ new DatanodeInfoWithStorage[storages.length];
+ String[] sids = new String[storages.length];
+ StorageType[] types = new StorageType[storages.length];
+ for (int i = 0; i < storages.length; ++i) {
+ sids[i] = storages[i].getStorageID();
+ types[i] = storages[i].getStorageType();
+ if (StorageType.PROVIDED.equals(storages[i].getStorageType())) {
+ locs[i] = pending;
+ } else {
+ locs[i] = new DatanodeInfoWithStorage(
+ storages[i].getDatanodeDescriptor(), sids[i], types[i]);
+ }
+ }
+ return new LocatedBlock(eb, locs, sids, types, pos, isCorrupt, null);
+ }
+
+ @Override
+ LocatedBlocks build(DatanodeDescriptor client) {
+ // TODO: to support multiple provided storages, need to pass/maintain map
+ // set all fields of pending DatanodeInfo
+ List<String> excludedUUids = new ArrayList<String>();
+ for (LocatedBlock b: blocks) {
+ DatanodeInfo[] infos = b.getLocations();
+ StorageType[] types = b.getStorageTypes();
+
+ for (int i = 0; i < types.length; i++) {
+ if (!StorageType.PROVIDED.equals(types[i])) {
+ excludedUUids.add(infos[i].getDatanodeUuid());
+ }
+ }
+ }
+
+ DatanodeDescriptor dn = providedDescriptor.choose(client, excludedUUids);
+ if (dn == null) {
+ dn = providedDescriptor.choose(client);
+ }
+
+ pending.replaceInternal(dn);
+ return new LocatedBlocks(
+ flen, isUC, blocks, last, lastComplete, feInfo, ecPolicy);
+ }
+
+ @Override
+ LocatedBlocks build() {
+ return build(providedDescriptor.chooseRandom());
+ }
+ }
+
+ /**
+ * An abstract {@link DatanodeInfoWithStorage} to represent provided storage.
+ */
+ static class ShadowDatanodeInfoWithStorage extends DatanodeInfoWithStorage {
+ private String shadowUuid;
+
+ ShadowDatanodeInfoWithStorage(DatanodeDescriptor d, String storageId) {
+ super(d, storageId, StorageType.PROVIDED);
+ }
+
+ @Override
+ public String getDatanodeUuid() {
+ return shadowUuid;
+ }
+
+ public void setDatanodeUuid(String uuid) {
+ shadowUuid = uuid;
+ }
+
+ void replaceInternal(DatanodeDescriptor dn) {
+ updateRegInfo(dn); // overwrite DatanodeID (except UUID)
+ setDatanodeUuid(dn.getDatanodeUuid());
+ setCapacity(dn.getCapacity());
+ setDfsUsed(dn.getDfsUsed());
+ setRemaining(dn.getRemaining());
+ setBlockPoolUsed(dn.getBlockPoolUsed());
+ setCacheCapacity(dn.getCacheCapacity());
+ setCacheUsed(dn.getCacheUsed());
+ setLastUpdate(dn.getLastUpdate());
+ setLastUpdateMonotonic(dn.getLastUpdateMonotonic());
+ setXceiverCount(dn.getXceiverCount());
+ setNetworkLocation(dn.getNetworkLocation());
+ adminState = dn.getAdminState();
+ setUpgradeDomain(dn.getUpgradeDomain());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return super.equals(obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+ }
+
+ /**
+ * An abstract DatanodeDescriptor to track datanodes with provided storages.
+ * NOTE: never resolved through registerDatanode, so not in the topology.
+ */
+ static class ProvidedDescriptor extends DatanodeDescriptor {
+
+ private final NavigableMap<String, DatanodeDescriptor> dns =
+ new ConcurrentSkipListMap<>();
+
+ ProvidedDescriptor() {
+ super(new DatanodeID(
+ null, // String ipAddr,
+ null, // String hostName,
+ UUID.randomUUID().toString(), // String datanodeUuid,
+ 0, // int xferPort,
+ 0, // int infoPort,
+ 0, // int infoSecurePort,
+ 0)); // int ipcPort
+ }
+
+ DatanodeStorageInfo getProvidedStorage(
+ DatanodeDescriptor dn, DatanodeStorage s) {
+ dns.put(dn.getDatanodeUuid(), dn);
+ // TODO: maintain separate RPC ident per dn
+ return storageMap.get(s.getStorageID());
+ }
+
+ DatanodeStorageInfo createProvidedStorage(DatanodeStorage ds) {
+ assert null == storageMap.get(ds.getStorageID());
+ DatanodeStorageInfo storage = new DatanodeStorageInfo(this, ds);
+ storage.setHeartbeatedSinceFailover(true);
+ storageMap.put(storage.getStorageID(), storage);
+ return storage;
+ }
+
+ DatanodeDescriptor choose(DatanodeDescriptor client) {
+ // exact match for now
+ DatanodeDescriptor dn = dns.get(client.getDatanodeUuid());
+ if (null == dn) {
+ dn = chooseRandom();
+ }
+ return dn;
+ }
+
+ DatanodeDescriptor choose(DatanodeDescriptor client,
+ List<String> excludedUUids) {
+ // exact match for now
+ DatanodeDescriptor dn = dns.get(client.getDatanodeUuid());
+
+ if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) {
+ dn = null;
+ Set<String> exploredUUids = new HashSet<String>();
+
+ while(exploredUUids.size() < dns.size()) {
+ Map.Entry<String, DatanodeDescriptor> d =
+ dns.ceilingEntry(UUID.randomUUID().toString());
+ if (null == d) {
+ d = dns.firstEntry();
+ }
+ String uuid = d.getValue().getDatanodeUuid();
+ //this node has already been explored, and was not selected earlier
+ if (exploredUUids.contains(uuid)) {
+ continue;
+ }
+ exploredUUids.add(uuid);
+ //this node has been excluded
+ if (excludedUUids.contains(uuid)) {
+ continue;
+ }
+ return dns.get(uuid);
+ }
+ }
+
+ return dn;
+ }
+
+ DatanodeDescriptor chooseRandom(DatanodeStorageInfo[] excludedStorages) {
+ // TODO: Currently this is not uniformly random;
+ // skewed toward sparse sections of the ids
+ Set<DatanodeDescriptor> excludedNodes =
+ new HashSet<DatanodeDescriptor>();
+ if (excludedStorages != null) {
+ for (int i= 0; i < excludedStorages.length; i++) {
+ LOG.info("Excluded: " + excludedStorages[i].getDatanodeDescriptor());
+ excludedNodes.add(excludedStorages[i].getDatanodeDescriptor());
+ }
+ }
+ Set<DatanodeDescriptor> exploredNodes = new HashSet<DatanodeDescriptor>();
+
+ while(exploredNodes.size() < dns.size()) {
+ Map.Entry<String, DatanodeDescriptor> d =
+ dns.ceilingEntry(UUID.randomUUID().toString());
+ if (null == d) {
+ d = dns.firstEntry();
+ }
+ DatanodeDescriptor node = d.getValue();
+ //this node has already been explored, and was not selected earlier
+ if (exploredNodes.contains(node)) {
+ continue;
+ }
+ exploredNodes.add(node);
+ //this node has been excluded
+ if (excludedNodes.contains(node)) {
+ continue;
+ }
+ return node;
+ }
+ return null;
+ }
+
+ DatanodeDescriptor chooseRandom() {
+ return chooseRandom(null);
+ }
+
+ @Override
+ void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
+ // pick a random datanode, delegate to it
+ DatanodeDescriptor node = chooseRandom(targets);
+ if (node != null) {
+ node.addBlockToBeReplicated(block, targets);
+ } else {
+ LOG.error("Cannot find a source node to replicate block: "
+ + block + " from");
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return (this == obj) || super.equals(obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+ }
+
+ /**
+ * Used to emulate block reports for provided blocks.
+ */
+ static class ProvidedBlockList extends BlockListAsLongs {
+
+ private final Iterator<Block> inner;
+
+ ProvidedBlockList(Iterator<Block> inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public Iterator<BlockReportReplica> iterator() {
+ return new Iterator<BlockReportReplica>() {
+ @Override
+ public BlockReportReplica next() {
+ return new BlockReportReplica(inner.next());
+ }
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public int getNumberOfBlocks() {
+ // VERIFY: only printed for debugging
+ return -1;
+ }
+
+ @Override
+ public ByteString getBlocksBuffer() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long[] getBlockListAsLongs() {
+ // should only be used for backwards compat, DN.ver > NN.ver
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 6df243f..0b8063c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4461,14 +4461,30 @@
</property>
<property>
+ <name>dfs.namenode.provided.enabled</name>
+ <value>false</value>
+ <description>
+ Enables the Namenode to handle provided storages.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.namenode.block.provider.class</name>
+ <value>org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider</value>
+ <description>
+ The class that is used to load provided blocks in the Namenode.
+ </description>
+ </property>
+
+ <property>
<name>dfs.provider.class</name>
<value>org.apache.hadoop.hdfs.server.common.TextFileRegionProvider</value>
<description>
- The class that is used to load information about blocks stored in
- provided storages.
- org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider
- is used as the default, which expects the blocks to be specified
- using a delimited text file.
+ The class that is used to load information about blocks stored in
+ provided storages.
+ org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider
+ is used as the default, which expects the blocks to be specified
+ using a delimited text file.
</description>
</property>
@@ -4476,7 +4492,7 @@
<name>dfs.provided.df.class</name>
<value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.DefaultProvidedVolumeDF</value>
<description>
- The class that is used to measure usage statistics of provided stores.
+ The class that is used to measure usage statistics of provided stores.
</description>
</property>
@@ -4484,7 +4500,7 @@
<name>dfs.provided.storage.id</name>
<value>DS-PROVIDED</value>
<description>
- The storage ID used for provided stores.
+ The storage ID used for provided stores.
</description>
</property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
index 3a8fb59..12045a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
@@ -84,6 +84,7 @@ public class TestBlockStoragePolicy {
static final byte ONESSD = HdfsConstants.ONESSD_STORAGE_POLICY_ID;
static final byte ALLSSD = HdfsConstants.ALLSSD_STORAGE_POLICY_ID;
static final byte LAZY_PERSIST = HdfsConstants.MEMORY_STORAGE_POLICY_ID;
+ static final byte PROVIDED = HdfsConstants.PROVIDED_STORAGE_POLICY_ID;
@Test (timeout=300000)
public void testConfigKeyEnabled() throws IOException {
@@ -143,6 +144,9 @@ public class TestBlockStoragePolicy {
expectedPolicyStrings.put(ALLSSD, "BlockStoragePolicy{ALL_SSD:" + ALLSSD +
", storageTypes=[SSD], creationFallbacks=[DISK], " +
"replicationFallbacks=[DISK]}");
+ expectedPolicyStrings.put(PROVIDED, "BlockStoragePolicy{PROVIDED:" + PROVIDED +
+ ", storageTypes=[PROVIDED, DISK], creationFallbacks=[PROVIDED, DISK], " +
+ "replicationFallbacks=[PROVIDED, DISK]}");
for(byte i = 1; i < 16; i++) {
final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index 30e2aaf..dd24311 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -25,6 +25,7 @@ import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -291,7 +292,7 @@ public class TestDatanodeManager {
*/
@Test
public void testSortLocatedBlocks() throws IOException, URISyntaxException {
- HelperFunction(null);
+ HelperFunction(null, 0);
}
/**
@@ -303,7 +304,7 @@ public class TestDatanodeManager {
*/
@Test
public void testgoodScript() throws IOException, URISyntaxException {
- HelperFunction("/" + Shell.appendScriptExtension("topology-script"));
+ HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 0);
}
@@ -316,7 +317,21 @@ public class TestDatanodeManager {
*/
@Test
public void testBadScript() throws IOException, URISyntaxException {
- HelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script"));
+ HelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script"), 0);
+ }
+
+ /**
+ * Test with different sorting functions but include datanodes
+ * with provided storage
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ @Test
+ public void testWithProvidedTypes() throws IOException, URISyntaxException {
+ HelperFunction(null, 1);
+ HelperFunction(null, 3);
+ HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 1);
+ HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 2);
}
/**
@@ -324,11 +339,12 @@ public class TestDatanodeManager {
* we invoke this function with and without topology scripts
*
* @param scriptFileName - Script Name or null
+ * @param providedStorages - number of provided storages to add
*
* @throws URISyntaxException
* @throws IOException
*/
- public void HelperFunction(String scriptFileName)
+ public void HelperFunction(String scriptFileName, int providedStorages)
throws URISyntaxException, IOException {
// create the DatanodeManager which will be tested
Configuration conf = new Configuration();
@@ -343,17 +359,25 @@ public class TestDatanodeManager {
}
DatanodeManager dm = mockDatanodeManager(fsn, conf);
+ int totalDNs = 5 + providedStorages;
+
// register 5 datanodes, each with different storage ID and type
- DatanodeInfo[] locs = new DatanodeInfo[5];
- String[] storageIDs = new String[5];
- StorageType[] storageTypes = new StorageType[]{
- StorageType.ARCHIVE,
- StorageType.DEFAULT,
- StorageType.DISK,
- StorageType.RAM_DISK,
- StorageType.SSD
- };
- for (int i = 0; i < 5; i++) {
+ DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
+ String[] storageIDs = new String[totalDNs];
+ List<StorageType> storageTypesList = new ArrayList<>(
+ Arrays.asList(StorageType.ARCHIVE,
+ StorageType.DEFAULT,
+ StorageType.DISK,
+ StorageType.RAM_DISK,
+ StorageType.SSD));
+
+ for (int i = 0; i < providedStorages; i++) {
+ storageTypesList.add(StorageType.PROVIDED);
+ }
+
+ StorageType[] storageTypes= storageTypesList.toArray(new StorageType[0]);
+
+ for (int i = 0; i < totalDNs; i++) {
// register new datanode
String uuid = "UUID-" + i;
String ip = "IP-" + i;
@@ -389,9 +413,9 @@ public class TestDatanodeManager {
DatanodeInfo[] sortedLocs = block.getLocations();
storageIDs = block.getStorageIDs();
storageTypes = block.getStorageTypes();
- assertThat(sortedLocs.length, is(5));
- assertThat(storageIDs.length, is(5));
- assertThat(storageTypes.length, is(5));
+ assertThat(sortedLocs.length, is(totalDNs));
+ assertThat(storageIDs.length, is(totalDNs));
+ assertThat(storageTypes.length, is(totalDNs));
for (int i = 0; i < sortedLocs.length; i++) {
assertThat(((DatanodeInfoWithStorage) sortedLocs[i]).getStorageID(),
is(storageIDs[i]));
@@ -405,6 +429,14 @@ public class TestDatanodeManager {
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
assertThat(sortedLocs[sortedLocs.length - 2].getAdminState(),
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+ // check that the StorageType of datanoodes immediately
+ // preceding the decommissioned datanodes is PROVIDED
+ for (int i = 0; i < providedStorages; i++) {
+ assertThat(
+ ((DatanodeInfoWithStorage)
+ sortedLocs[sortedLocs.length - 3 - i]).getStorageType(),
+ is(StorageType.PROVIDED));
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77dc3a0f/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
new file mode 100644
index 0000000..3b75806
--- /dev/null
+++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Random;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockProvider;
+import org.apache.hadoop.hdfs.server.common.BlockFormat;
+import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
+import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat;
+import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+public class TestNameNodeProvidedImplementation {
+
+ @Rule public TestName name = new TestName();
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestNameNodeProvidedImplementation.class);
+
+ final Random r = new Random();
+ final File fBASE = new File(MiniDFSCluster.getBaseDirectory());
+ final Path BASE = new Path(fBASE.toURI().toString());
+ final Path NAMEPATH = new Path(BASE, "providedDir");;
+ final Path NNDIRPATH = new Path(BASE, "nnDir");
+ final Path BLOCKFILE = new Path(NNDIRPATH, "blocks.csv");
+ final String SINGLEUSER = "usr1";
+ final String SINGLEGROUP = "grp1";
+
+ Configuration conf;
+ MiniDFSCluster cluster;
+
+ @Before
+ public void setSeed() throws Exception {
+ if (fBASE.exists() && !FileUtil.fullyDelete(fBASE)) {
+ throw new IOException("Could not fully delete " + fBASE);
+ }
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println(name.getMethodName() + " seed: " + seed);
+ conf = new HdfsConfiguration();
+ conf.set(SingleUGIResolver.USER, SINGLEUSER);
+ conf.set(SingleUGIResolver.GROUP, SINGLEGROUP);
+
+ conf.set(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
+ DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT);
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true);
+
+ conf.setClass(DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS,
+ BlockFormatProvider.class, BlockProvider.class);
+ conf.setClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
+ TextFileRegionProvider.class, FileRegionProvider.class);
+ conf.setClass(DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
+ TextFileRegionFormat.class, BlockFormat.class);
+
+ conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_WRITE_PATH,
+ BLOCKFILE.toString());
+ conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_READ_PATH,
+ BLOCKFILE.toString());
+ conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER, ",");
+
+ File imageDir = new File(NAMEPATH.toUri());
+ if (!imageDir.exists()) {
+ LOG.info("Creating directory: " + imageDir);
+ imageDir.mkdirs();
+ }
+
+ File nnDir = new File(NNDIRPATH.toUri());
+ if (!nnDir.exists()) {
+ nnDir.mkdirs();
+ }
+
+ // create 10 random files under BASE
+ for (int i=0; i < 10; i++) {
+ File newFile = new File(new Path(NAMEPATH, "file" + i).toUri());
+ if(!newFile.exists()) {
+ try {
+ LOG.info("Creating " + newFile.toString());
+ newFile.createNewFile();
+ Writer writer = new OutputStreamWriter(
+ new FileOutputStream(newFile.getAbsolutePath()), "utf-8");
+ for(int j=0; j < 10*i; j++) {
+ writer.write("0");
+ }
+ writer.flush();
+ writer.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ @After
+ public void shutdown() throws Exception {
+ try {
+ if (cluster != null) {
+ cluster.shutdown(true, true);
+ }
+ } finally {
+ cluster = null;
+ }
+ }
+
+ void createImage(TreeWalk t, Path out,
+ Class<? extends BlockResolver> blockIdsClass) throws Exception {
+ ImageWriter.Options opts = ImageWriter.defaults();
+ opts.setConf(conf);
+ opts.output(out.toString())
+ .blocks(TextFileRegionFormat.class)
+ .blockIds(blockIdsClass);
+ try (ImageWriter w = new ImageWriter(opts)) {
+ for (TreePath e : t) {
+ w.accept(e);
+ }
+ }
+ }
+
+ void startCluster(Path nspath, int numDatanodes,
+ StorageType[] storageTypes,
+ StorageType[][] storageTypesPerDatanode)
+ throws IOException {
+ conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString());
+
+ if (storageTypesPerDatanode != null) {
+ cluster = new MiniDFSCluster.Builder(conf)
+ .format(false)
+ .manageNameDfsDirs(false)
+ .numDataNodes(numDatanodes)
+ .storageTypes(storageTypesPerDatanode)
+ .build();
+ } else if (storageTypes != null) {
+ cluster = new MiniDFSCluster.Builder(conf)
+ .format(false)
+ .manageNameDfsDirs(false)
+ .numDataNodes(numDatanodes)
+ .storagesPerDatanode(storageTypes.length)
+ .storageTypes(storageTypes)
+ .build();
+ } else {
+ cluster = new MiniDFSCluster.Builder(conf)
+ .format(false)
+ .manageNameDfsDirs(false)
+ .numDataNodes(numDatanodes)
+ .build();
+ }
+ cluster.waitActive();
+ }
+
+ @Test(timeout = 20000)
+ public void testLoadImage() throws Exception {
+ final long seed = r.nextLong();
+ LOG.info("NAMEPATH: " + NAMEPATH);
+ createImage(new RandomTreeWalk(seed), NNDIRPATH, FixedBlockResolver.class);
+ startCluster(NNDIRPATH, 0, new StorageType[] {StorageType.PROVIDED}, null);
+
+ FileSystem fs = cluster.getFileSystem();
+ for (TreePath e : new RandomTreeWalk(seed)) {
+ FileStatus rs = e.getFileStatus();
+ Path hp = new Path(rs.getPath().toUri().getPath());
+ assertTrue(fs.exists(hp));
+ FileStatus hs = fs.getFileStatus(hp);
+ assertEquals(rs.getPath().toUri().getPath(),
+ hs.getPath().toUri().getPath());
+ assertEquals(rs.getPermission(), hs.getPermission());
+ assertEquals(rs.getLen(), hs.getLen());
+ assertEquals(SINGLEUSER, hs.getOwner());
+ assertEquals(SINGLEGROUP, hs.getGroup());
+ assertEquals(rs.getAccessTime(), hs.getAccessTime());
+ assertEquals(rs.getModificationTime(), hs.getModificationTime());
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testBlockLoad() throws Exception {
+ conf.setClass(ImageWriter.Options.UGI_CLASS,
+ SingleUGIResolver.class, UGIResolver.class);
+ createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
+ FixedBlockResolver.class);
+ startCluster(NNDIRPATH, 1, new StorageType[] {StorageType.PROVIDED}, null);
+ }
+
+ @Test(timeout=500000)
+ public void testDefaultReplication() throws Exception {
+ int targetReplication = 2;
+ conf.setInt(FixedBlockMultiReplicaResolver.REPLICATION, targetReplication);
+ createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
+ FixedBlockMultiReplicaResolver.class);
+ // make the last Datanode with only DISK
+ startCluster(NNDIRPATH, 3, null,
+ new StorageType[][] {
+ {StorageType.PROVIDED},
+ {StorageType.PROVIDED},
+ {StorageType.DISK}}
+ );
+ // wait for the replication to finish
+ Thread.sleep(50000);
+
+ FileSystem fs = cluster.getFileSystem();
+ int count = 0;
+ for (TreePath e : new FSTreeWalk(NAMEPATH, conf)) {
+ FileStatus rs = e.getFileStatus();
+ Path hp = removePrefix(NAMEPATH, rs.getPath());
+ LOG.info("hp " + hp.toUri().getPath());
+ //skip HDFS specific files, which may have been created later on.
+ if (hp.toString().contains("in_use.lock")
+ || hp.toString().contains("current")) {
+ continue;
+ }
+ e.accept(count++);
+ assertTrue(fs.exists(hp));
+ FileStatus hs = fs.getFileStatus(hp);
+
+ if (rs.isFile()) {
+ BlockLocation[] bl = fs.getFileBlockLocations(
+ hs.getPath(), 0, hs.getLen());
+ int i = 0;
+ for(; i < bl.length; i++) {
+ int currentRep = bl[i].getHosts().length;
+ assertEquals(targetReplication , currentRep);
+ }
+ }
+ }
+ }
+
+
+ static Path removePrefix(Path base, Path walk) {
+ Path wpath = new Path(walk.toUri().getPath());
+ Path bpath = new Path(base.toUri().getPath());
+ Path ret = new Path("/");
+ while (!(bpath.equals(wpath) || "".equals(wpath.getName()))) {
+ ret = "".equals(ret.getName())
+ ? new Path("/", wpath.getName())
+ : new Path(new Path("/", wpath.getName()),
+ new Path(ret.toString().substring(1)));
+ wpath = wpath.getParent();
+ }
+ if (!bpath.equals(wpath)) {
+ throw new IllegalArgumentException(base + " not a prefix of " + walk);
+ }
+ return ret;
+ }
+
+ @Test(timeout=30000)
+ public void testBlockRead() throws Exception {
+ conf.setClass(ImageWriter.Options.UGI_CLASS,
+ FsUGIResolver.class, UGIResolver.class);
+ createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
+ FixedBlockResolver.class);
+ startCluster(NNDIRPATH, 3, new StorageType[] {StorageType.PROVIDED}, null);
+ FileSystem fs = cluster.getFileSystem();
+ Thread.sleep(2000);
+ int count = 0;
+ // read NN metadata, verify contents match
+ for (TreePath e : new FSTreeWalk(NAMEPATH, conf)) {
+ FileStatus rs = e.getFileStatus();
+ Path hp = removePrefix(NAMEPATH, rs.getPath());
+ LOG.info("hp " + hp.toUri().getPath());
+ //skip HDFS specific files, which may have been created later on.
+ if(hp.toString().contains("in_use.lock")
+ || hp.toString().contains("current")) {
+ continue;
+ }
+ e.accept(count++);
+ assertTrue(fs.exists(hp));
+ FileStatus hs = fs.getFileStatus(hp);
+ assertEquals(hp.toUri().getPath(), hs.getPath().toUri().getPath());
+ assertEquals(rs.getPermission(), hs.getPermission());
+ assertEquals(rs.getOwner(), hs.getOwner());
+ assertEquals(rs.getGroup(), hs.getGroup());
+
+ if (rs.isFile()) {
+ assertEquals(rs.getLen(), hs.getLen());
+ try (ReadableByteChannel i = Channels.newChannel(
+ new FileInputStream(new File(rs.getPath().toUri())))) {
+ try (ReadableByteChannel j = Channels.newChannel(
+ fs.open(hs.getPath()))) {
+ ByteBuffer ib = ByteBuffer.allocate(4096);
+ ByteBuffer jb = ByteBuffer.allocate(4096);
+ while (true) {
+ int il = i.read(ib);
+ int jl = j.read(jb);
+ if (il < 0 || jl < 0) {
+ assertEquals(il, jl);
+ break;
+ }
+ ib.flip();
+ jb.flip();
+ int cmp = Math.min(ib.remaining(), jb.remaining());
+ for (int k = 0; k < cmp; ++k) {
+ assertEquals(ib.get(), jb.get());
+ }
+ ib.compact();
+ jb.compact();
+ }
+
+ }
+ }
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org