You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2017/12/16 02:10:53 UTC
[26/46] hadoop git commit: HDFS-12777. [READ] Reduce memory and CPU
footprint for PROVIDED volumes.
HDFS-12777. [READ] Reduce memory and CPU footprint for PROVIDED volumes.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e1a28f95
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e1a28f95
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e1a28f95
Branch: refs/heads/trunk
Commit: e1a28f95b8ffcb86300148f10a23b710f8388341
Parents: 6cd80b2
Author: Virajith Jalaparti <vi...@apache.org>
Authored: Fri Nov 10 10:19:33 2017 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Dec 15 17:51:39 2017 -0800
----------------------------------------------------------------------
.../hdfs/server/datanode/DirectoryScanner.java | 4 +
.../datanode/FinalizedProvidedReplica.java | 8 ++
.../hdfs/server/datanode/ProvidedReplica.java | 77 +++++++++++++++++++-
.../hdfs/server/datanode/ReplicaBuilder.java | 37 +++++++++-
.../fsdataset/impl/ProvidedVolumeImpl.java | 30 +++++++-
.../fsdataset/impl/TestProvidedImpl.java | 76 ++++++++++++-------
6 files changed, 196 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1a28f95/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 3b6d06c..8fb8551 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -530,6 +530,10 @@ public class DirectoryScanner implements Runnable {
new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
for (int i = 0; i < volumes.size(); i++) {
+ if (volumes.get(i).getStorageType() == StorageType.PROVIDED) {
+ // Disable scanning PROVIDED volumes to keep overhead low
+ continue;
+ }
ReportCompiler reportCompiler =
new ReportCompiler(datanode, volumes.get(i));
Future<ScanInfoPerBlockPool> result =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1a28f95/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java
index e23d6be..bcc9a38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java
@@ -21,6 +21,7 @@ import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
@@ -37,6 +38,13 @@ public class FinalizedProvidedReplica extends ProvidedReplica {
remoteFS);
}
+ public FinalizedProvidedReplica(long blockId, Path pathPrefix,
+ String pathSuffix, long fileOffset, long blockLen, long genStamp,
+ FsVolumeSpi volume, Configuration conf, FileSystem remoteFS) {
+ super(blockId, pathPrefix, pathSuffix, fileOffset, blockLen,
+ genStamp, volume, conf, remoteFS);
+ }
+
@Override
public ReplicaState getState() {
return ReplicaState.FINALIZED;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1a28f95/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
index 2b3bd13..8681421 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -51,18 +52,23 @@ public abstract class ProvidedReplica extends ReplicaInfo {
static final byte[] NULL_CHECKSUM_ARRAY =
FsDatasetUtil.createNullChecksumByteArray();
private URI fileURI;
+ private Path pathPrefix;
+ private String pathSuffix;
private long fileOffset;
private Configuration conf;
private FileSystem remoteFS;
/**
* Constructor.
+ *
* @param blockId block id
* @param fileURI remote URI this block is to be read from
* @param fileOffset the offset in the remote URI
* @param blockLen the length of the block
* @param genStamp the generation stamp of the block
* @param volume the volume this block belongs to
+ * @param conf the configuration
+ * @param remoteFS reference to the remote filesystem to use for this replica.
*/
public ProvidedReplica(long blockId, URI fileURI, long fileOffset,
long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf,
@@ -85,23 +91,86 @@ public abstract class ProvidedReplica extends ReplicaInfo {
}
}
+ /**
+ * Constructor.
+ *
+ * @param blockId block id
+ * @param pathPrefix A prefix of the {@link Path} associated with this replica
+ * on the remote {@link FileSystem}.
+ * @param pathSuffix A suffix of the {@link Path} associated with this replica
+ * on the remote {@link FileSystem}. Resolving the {@code pathSuffix}
+ * against the {@code pathPrefix} should provide the exact
+ * {@link Path} of the data associated with this replica on the
+ * remote {@link FileSystem}.
+ * @param fileOffset the offset in the remote URI
+ * @param blockLen the length of the block
+ * @param genStamp the generation stamp of the block
+ * @param volume the volume this block belongs to
+ * @param conf the configuration
+ * @param remoteFS reference to the remote filesystem to use for this replica.
+ */
+ public ProvidedReplica(long blockId, Path pathPrefix, String pathSuffix,
+ long fileOffset, long blockLen, long genStamp, FsVolumeSpi volume,
+ Configuration conf, FileSystem remoteFS) {
+ super(volume, blockId, blockLen, genStamp);
+ this.fileURI = null;
+ this.pathPrefix = pathPrefix;
+ this.pathSuffix = pathSuffix;
+ this.fileOffset = fileOffset;
+ this.conf = conf;
+ if (remoteFS != null) {
+ this.remoteFS = remoteFS;
+ } else {
+ LOG.warn(
+ "Creating an reference to the remote FS for provided block " + this);
+ try {
+ this.remoteFS = FileSystem.get(pathPrefix.toUri(), this.conf);
+ } catch (IOException e) {
+ LOG.warn("Failed to obtain filesystem for " + pathPrefix);
+ this.remoteFS = null;
+ }
+ }
+ }
+
public ProvidedReplica(ProvidedReplica r) {
super(r);
this.fileURI = r.fileURI;
this.fileOffset = r.fileOffset;
this.conf = r.conf;
this.remoteFS = r.remoteFS;
+ this.pathPrefix = r.pathPrefix;
+ this.pathSuffix = r.pathSuffix;
}
@Override
public URI getBlockURI() {
- return this.fileURI;
+ return getRemoteURI();
+ }
+
+ @VisibleForTesting
+ public String getPathSuffix() {
+ return pathSuffix;
+ }
+
+ @VisibleForTesting
+ public Path getPathPrefix() {
+ return pathPrefix;
+ }
+
+ private URI getRemoteURI() {
+ if (fileURI != null) {
+ return fileURI;
+ } else if (pathPrefix == null) {
+ return new Path(pathSuffix).toUri();
+ } else {
+ return new Path(pathPrefix, pathSuffix).toUri();
+ }
}
@Override
public InputStream getDataInputStream(long seekOffset) throws IOException {
if (remoteFS != null) {
- FSDataInputStream ins = remoteFS.open(new Path(fileURI));
+ FSDataInputStream ins = remoteFS.open(new Path(getRemoteURI()));
ins.seek(fileOffset + seekOffset);
return new BoundedInputStream(
new FSDataInputStream(ins), getBlockDataLength());
@@ -132,7 +201,7 @@ public abstract class ProvidedReplica extends ReplicaInfo {
public boolean blockDataExists() {
if(remoteFS != null) {
try {
- return remoteFS.exists(new Path(fileURI));
+ return remoteFS.exists(new Path(getRemoteURI()));
} catch (IOException e) {
return false;
}
@@ -220,7 +289,7 @@ public abstract class ProvidedReplica extends ReplicaInfo {
public int compareWith(ScanInfo info) {
//local scanning cannot find any provided blocks.
if (info.getFileRegion().equals(
- new FileRegion(this.getBlockId(), new Path(fileURI),
+ new FileRegion(this.getBlockId(), new Path(getRemoteURI()),
fileOffset, this.getNumBytes(), this.getGenerationStamp()))) {
return 0;
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1a28f95/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
index c5cb6a5..de68e2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.net.URI;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
@@ -52,6 +53,8 @@ public class ReplicaBuilder {
private Configuration conf;
private FileRegion fileRegion;
private FileSystem remoteFS;
+ private String pathSuffix;
+ private Path pathPrefix;
public ReplicaBuilder(ReplicaState state) {
volume = null;
@@ -145,6 +148,28 @@ public class ReplicaBuilder {
return this;
}
+ /**
+ * Set the suffix of the {@link Path} associated with the replica.
+ * Intended to be use only for {@link ProvidedReplica}s.
+ * @param suffix the path suffix.
+ * @return the builder with the path suffix set.
+ */
+ public ReplicaBuilder setPathSuffix(String suffix) {
+ this.pathSuffix = suffix;
+ return this;
+ }
+
+ /**
+ * Set the prefix of the {@link Path} associated with the replica.
+ * Intended to be use only for {@link ProvidedReplica}s.
+ * @param prefix the path prefix.
+ * @return the builder with the path prefix set.
+ */
+ public ReplicaBuilder setPathPrefix(Path prefix) {
+ this.pathPrefix = prefix;
+ return this;
+ }
+
public LocalReplicaInPipeline buildLocalReplicaInPipeline()
throws IllegalArgumentException {
LocalReplicaInPipeline info = null;
@@ -275,14 +300,20 @@ public class ReplicaBuilder {
throw new IllegalArgumentException("Finalized PROVIDED replica " +
"cannot be constructed from another replica");
}
- if (fileRegion == null && uri == null) {
+ if (fileRegion == null && uri == null &&
+ (pathPrefix == null || pathSuffix == null)) {
throw new IllegalArgumentException(
"Trying to construct a provided replica on " + volume +
" without enough information");
}
if (fileRegion == null) {
- info = new FinalizedProvidedReplica(blockId, uri, offset,
- length, genStamp, volume, conf, remoteFS);
+ if (uri != null) {
+ info = new FinalizedProvidedReplica(blockId, uri, offset,
+ length, genStamp, volume, conf, remoteFS);
+ } else {
+ info = new FinalizedProvidedReplica(blockId, pathPrefix, pathSuffix,
+ offset, length, genStamp, volume, conf, remoteFS);
+ }
} else {
info = new FinalizedProvidedReplica(fileRegion.getBlock().getBlockId(),
fileRegion.getPath().toUri(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1a28f95/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
index 092672d..d103b64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -65,6 +66,29 @@ import org.apache.hadoop.util.Time;
*/
public class ProvidedVolumeImpl extends FsVolumeImpl {
+ /**
+ * Get a suffix of the full path, excluding the given prefix.
+ *
+ * @param prefix a prefix of the path.
+ * @param fullPath the full path whose suffix is needed.
+ * @return the suffix of the path, which when resolved against {@code prefix}
+ * gets back the {@code fullPath}.
+ */
+ @VisibleForTesting
+ protected static String getSuffix(final Path prefix, final Path fullPath) {
+ String prefixStr = prefix.toString();
+ String pathStr = fullPath.toString();
+ if (!pathStr.startsWith(prefixStr)) {
+ LOG.debug("Path {} is not a prefix of the path {}", prefix, fullPath);
+ return pathStr;
+ }
+ String suffix = pathStr.replaceFirst("^" + prefixStr, "");
+ if (suffix.startsWith("/")) {
+ suffix = suffix.substring(1);
+ }
+ return suffix;
+ }
+
static class ProvidedBlockPoolSlice {
private ProvidedVolumeImpl providedVolume;
@@ -106,15 +130,19 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
return;
}
Iterator<FileRegion> iter = reader.iterator();
+ Path blockPrefixPath = new Path(providedVolume.getBaseURI());
while (iter.hasNext()) {
FileRegion region = iter.next();
if (region.getBlockPoolId() != null
&& region.getBlockPoolId().equals(bpid)
&& containsBlock(providedVolume.baseURI,
region.getPath().toUri())) {
+ String blockSuffix =
+ getSuffix(blockPrefixPath, new Path(region.getPath().toUri()));
ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
.setBlockId(region.getBlock().getBlockId())
- .setURI(region.getPath().toUri())
+ .setPathPrefix(blockPrefixPath)
+ .setPathSuffix(blockSuffix)
.setOffset(region.getOffset())
.setLength(region.getBlock().getNumBytes())
.setGenerationStamp(region.getBlock().getGenerationStamp())
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1a28f95/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
index 40d77f7a..ecab06b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
@@ -62,7 +62,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
-import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
+import org.apache.hadoop.hdfs.server.datanode.ProvidedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@@ -509,33 +509,6 @@ public class TestProvidedImpl {
}
}
- @Test
- public void testRefresh() throws IOException {
- conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
- for (int i = 0; i < providedVolumes.size(); i++) {
- ProvidedVolumeImpl vol = (ProvidedVolumeImpl) providedVolumes.get(i);
- TestFileRegionBlockAliasMap testBlockFormat =
- (TestFileRegionBlockAliasMap) vol
- .getBlockFormat(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
- //equivalent to two new blocks appearing
- testBlockFormat.setBlockCount(NUM_PROVIDED_BLKS + 2);
- //equivalent to deleting the first block
- testBlockFormat.setMinBlkId(MIN_BLK_ID + 1);
-
- DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf);
- scanner.reconcile();
- ReplicaInfo info = dataset.getBlockReplica(
- BLOCK_POOL_IDS[CHOSEN_BP_ID], NUM_PROVIDED_BLKS + 1);
- //new replica should be added to the dataset
- assertTrue(info != null);
- try {
- info = dataset.getBlockReplica(BLOCK_POOL_IDS[CHOSEN_BP_ID], 0);
- } catch(Exception ex) {
- LOG.info("Exception expected: " + ex);
- }
- }
- }
-
private int getBlocksInProvidedVolumes(String basePath, int numBlocks,
int minBlockId) throws IOException {
TestFileRegionIterator fileRegionIterator =
@@ -621,4 +594,51 @@ public class TestProvidedImpl {
ProvidedVolumeImpl.containsBlock(new URI("/bucket1/dir1/"),
new URI("s3a:/bucket1/dir1/temp.txt")));
}
+
+ @Test
+ public void testProvidedReplicaSuffixExtraction() {
+ assertEquals("B.txt", ProvidedVolumeImpl.getSuffix(
+ new Path("file:///A/"), new Path("file:///A/B.txt")));
+ assertEquals("B/C.txt", ProvidedVolumeImpl.getSuffix(
+ new Path("file:///A/"), new Path("file:///A/B/C.txt")));
+ assertEquals("B/C/D.txt", ProvidedVolumeImpl.getSuffix(
+ new Path("file:///A/"), new Path("file:///A/B/C/D.txt")));
+ assertEquals("D.txt", ProvidedVolumeImpl.getSuffix(
+ new Path("file:///A/B/C/"), new Path("file:///A/B/C/D.txt")));
+ assertEquals("file:/A/B/C/D.txt", ProvidedVolumeImpl.getSuffix(
+ new Path("file:///X/B/C/"), new Path("file:///A/B/C/D.txt")));
+ assertEquals("D.txt", ProvidedVolumeImpl.getSuffix(
+ new Path("/A/B/C"), new Path("/A/B/C/D.txt")));
+ assertEquals("D.txt", ProvidedVolumeImpl.getSuffix(
+ new Path("/A/B/C/"), new Path("/A/B/C/D.txt")));
+
+ assertEquals("data/current.csv", ProvidedVolumeImpl.getSuffix(
+ new Path("wasb:///users/alice/"),
+ new Path("wasb:///users/alice/data/current.csv")));
+ assertEquals("current.csv", ProvidedVolumeImpl.getSuffix(
+ new Path("wasb:///users/alice/data"),
+ new Path("wasb:///users/alice/data/current.csv")));
+
+ assertEquals("wasb:/users/alice/data/current.csv",
+ ProvidedVolumeImpl.getSuffix(
+ new Path("wasb:///users/bob/"),
+ new Path("wasb:///users/alice/data/current.csv")));
+ }
+
+ @Test
+ public void testProvidedReplicaPrefix() throws Exception {
+ for (int i = 0; i < providedVolumes.size(); i++) {
+ FsVolumeImpl vol = providedVolumes.get(i);
+ ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+ vol.getVolumeMap(volumeMap, null);
+
+ Path expectedPrefix = new Path(
+ StorageLocation.normalizeFileURI(new File(providedBasePath).toURI()));
+ for (ReplicaInfo info : volumeMap
+ .replicas(BLOCK_POOL_IDS[CHOSEN_BP_ID])) {
+ ProvidedReplica pInfo = (ProvidedReplica) info;
+ assertEquals(expectedPrefix, pInfo.getPathPrefix());
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org