You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2017/05/11 21:42:40 UTC
[44/50] [abbrv] hadoop git commit: HDFS-10675. Datanode support to
read from external stores.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index adec209..15e71f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
@@ -241,10 +242,11 @@ public interface FsVolumeSpi
private final FsVolumeSpi volume;
+ private final FileRegion fileRegion;
/**
* Get the file's length in async block scan
*/
- private final long blockFileLength;
+ private final long blockLength;
private final static Pattern CONDENSED_PATH_REGEX =
Pattern.compile("(?<!^)(\\\\|/){2,}");
@@ -294,13 +296,30 @@ public interface FsVolumeSpi
*/
public ScanInfo(long blockId, File blockFile, File metaFile,
FsVolumeSpi vol) {
+ this(blockId, blockFile, metaFile, vol, null,
+ (blockFile != null) ? blockFile.length() : 0);
+ }
+
+ /**
+ * Create a ScanInfo object for a block. This constructor will examine
+ * the block data and meta-data files.
+ *
+ * @param blockId the block ID
+ * @param blockFile the path to the block data file
+ * @param metaFile the path to the block meta-data file
+ * @param vol the volume that contains the block
+ * @param fileRegion the file region (for provided blocks)
+ * @param length the length of the block data
+ */
+ public ScanInfo(long blockId, File blockFile, File metaFile,
+ FsVolumeSpi vol, FileRegion fileRegion, long length) {
this.blockId = blockId;
String condensedVolPath =
(vol == null || vol.getBaseURI() == null) ? null :
getCondensedPath(new File(vol.getBaseURI()).getAbsolutePath());
this.blockSuffix = blockFile == null ? null :
getSuffix(blockFile, condensedVolPath);
- this.blockFileLength = (blockFile != null) ? blockFile.length() : 0;
+ this.blockLength = length;
if (metaFile == null) {
this.metaSuffix = null;
} else if (blockFile == null) {
@@ -310,6 +329,7 @@ public interface FsVolumeSpi
condensedVolPath + blockSuffix);
}
this.volume = vol;
+ this.fileRegion = fileRegion;
}
/**
@@ -328,8 +348,8 @@ public interface FsVolumeSpi
*
* @return the length of the data block
*/
- public long getBlockFileLength() {
- return blockFileLength;
+ public long getBlockLength() {
+ return blockLength;
}
/**
@@ -399,6 +419,10 @@ public interface FsVolumeSpi
getMetaFile().getName()) :
HdfsConstants.GRANDFATHER_GENERATION_STAMP;
}
+
+ public FileRegion getFileRegion() {
+ return fileRegion;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java
new file mode 100644
index 0000000..24921c4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The default usage statistics for a provided volume.
+ */
+public class DefaultProvidedVolumeDF
+ implements ProvidedVolumeDF, Configurable {
+
+ @Override
+ public void setConf(Configuration conf) {
+ }
+
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
+
+ @Override
+ public long getCapacity() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public long getSpaceUsed() {
+ return 0;
+ }
+
+ @Override
+ public long getBlockPoolUsed(String bpid) {
+ return 0;
+ }
+
+ @Override
+ public long getAvailable() {
+ return Long.MAX_VALUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 9a5002a..3463566 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -1702,6 +1703,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Set<String> missingVolumesReported = new HashSet<>();
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+ //skip blocks in PROVIDED storage
+ if (b.getVolume().getStorageType() == StorageType.PROVIDED) {
+ continue;
+ }
String volStorageID = b.getVolume().getStorageID();
if (!builders.containsKey(volStorageID)) {
if (!missingVolumesReported.contains(volStorageID)) {
@@ -1837,7 +1842,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
try (AutoCloseableLock lock = datasetLock.acquire()) {
r = volumeMap.get(bpid, blockId);
}
-
if (r != null) {
if (r.blockDataExists()) {
return r;
@@ -2178,13 +2182,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @param vol Volume of the block file
*/
@Override
- public void checkAndUpdate(String bpid, long blockId, File diskFile,
- File diskMetaFile, FsVolumeSpi vol) throws IOException {
+ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
+ throws IOException {
+
+ long blockId = scanInfo.getBlockId();
+ File diskFile = scanInfo.getBlockFile();
+ File diskMetaFile = scanInfo.getMetaFile();
+ FsVolumeSpi vol = scanInfo.getVolume();
+
Block corruptBlock = null;
ReplicaInfo memBlockInfo;
try (AutoCloseableLock lock = datasetLock.acquire()) {
memBlockInfo = volumeMap.get(bpid, blockId);
- if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
+ if (memBlockInfo != null &&
+ memBlockInfo.getState() != ReplicaState.FINALIZED) {
// Block is not finalized - ignore the difference
return;
}
@@ -2199,6 +2210,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Block.getGenerationStamp(diskMetaFile.getName()) :
HdfsConstants.GRANDFATHER_GENERATION_STAMP;
+ if (vol.getStorageType() == StorageType.PROVIDED) {
+ if (memBlockInfo == null) {
+ //replica exists on provided store but not in memory
+ ReplicaInfo diskBlockInfo =
+ new ReplicaBuilder(ReplicaState.FINALIZED)
+ .setFileRegion(scanInfo.getFileRegion())
+ .setFsVolume(vol)
+ .setConf(conf)
+ .build();
+
+ volumeMap.add(bpid, diskBlockInfo);
+ LOG.warn("Added missing block to memory " + diskBlockInfo);
+ } else {
+ //replica exists in memory but not in the provided store
+ volumeMap.remove(bpid, blockId);
+ LOG.warn("Deleting missing provided block " + memBlockInfo);
+ }
+ return;
+ }
+
if (!diskFileExists) {
if (memBlockInfo == null) {
// Block file does not exist and block does not exist in memory
@@ -2970,7 +3001,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newReplicaInfo =
replicaState.getLazyPersistVolume().activateSavedReplica(bpid,
replicaInfo, replicaState);
-
// Update the volumeMap entry.
volumeMap.add(bpid, newReplicaInfo);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index 32759c4..9f115a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
@@ -32,10 +34,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
/** Utility methods. */
@InterfaceAudience.Private
@@ -44,6 +48,22 @@ public class FsDatasetUtil {
return f.getName().endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX);
}
+ public static byte[] createNullChecksumByteArray() {
+ DataChecksum csum =
+ DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(out);
+ try {
+ BlockMetadataHeader.writeHeader(dataOut, csum);
+ dataOut.close();
+ } catch (IOException e) {
+ FsVolumeImpl.LOG.error(
+ "Exception in creating null checksum stream: " + e);
+ return null;
+ }
+ return out.toByteArray();
+ }
+
static File getOrigFile(File unlinkTmpFile) {
final String name = unlinkTmpFile.getName();
if (!name.endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX)) {
@@ -135,8 +155,9 @@ public class FsDatasetUtil {
* Compute the checksum for a block file that does not already have
* its checksum computed, and save it to dstMeta file.
*/
- public static void computeChecksum(File srcMeta, File dstMeta, File blockFile,
- int smallBufferSize, Configuration conf) throws IOException {
+ public static void computeChecksum(File srcMeta, File dstMeta,
+ File blockFile, int smallBufferSize, Configuration conf)
+ throws IOException {
Preconditions.checkNotNull(srcMeta);
Preconditions.checkNotNull(dstMeta);
Preconditions.checkNotNull(blockFile);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index b948fb7..267a5cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -155,18 +155,24 @@ public class FsVolumeImpl implements FsVolumeSpi {
this.reservedForReplicas = new AtomicLong(0L);
this.storageLocation = sd.getStorageLocation();
this.currentDir = sd.getCurrentDir();
- File parent = currentDir.getParentFile();
- this.usage = new DF(parent, conf);
this.storageType = storageLocation.getStorageType();
this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY
+ "." + StringUtils.toLowerCase(storageType.toString()), conf.getLong(
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT));
this.configuredCapacity = -1;
+ if (currentDir != null) {
+ File parent = currentDir.getParentFile();
+ this.usage = new DF(parent, conf);
+ cacheExecutor = initializeCacheExecutor(parent);
+ this.metrics = DataNodeVolumeMetrics.create(conf, parent.getPath());
+ } else {
+ this.usage = null;
+ cacheExecutor = null;
+ this.metrics = null;
+ }
this.conf = conf;
this.fileIoProvider = fileIoProvider;
- cacheExecutor = initializeCacheExecutor(parent);
- this.metrics = DataNodeVolumeMetrics.create(conf, getBaseURI().getPath());
}
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
@@ -446,7 +452,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
/**
* Unplanned Non-DFS usage, i.e. Extra usage beyond reserved.
*
- * @return
+ * @return Disk usage excluding space used by HDFS and excluding space
+ * reserved for blocks open for write.
* @throws IOException
*/
public long getNonDfsUsed() throws IOException {
@@ -524,7 +531,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
public String[] getBlockPoolList() {
return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);
}
-
+
/**
* Temporary files. They get moved to the finalized block directory when
* the block is finalized.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
index 427f81b..2da9170 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
@@ -67,6 +68,11 @@ public class FsVolumeImplBuilder {
}
FsVolumeImpl build() throws IOException {
+ if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
+ return new ProvidedVolumeImpl(dataset, storageID, sd,
+ fileIoProvider != null ? fileIoProvider :
+ new FileIoProvider(null, null), conf);
+ }
return new FsVolumeImpl(
dataset, storageID, sd,
fileIoProvider != null ? fileIoProvider :
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java
new file mode 100644
index 0000000..4d28883
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+/**
+ * This interface is used to define the usage statistics
+ * of the provided storage.
+ */
+public interface ProvidedVolumeDF {
+
+ long getCapacity();
+
+ long getSpaceUsed();
+
+ long getBlockPoolUsed(String bpid);
+
+ long getAvailable();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
new file mode 100644
index 0000000..a48e117
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
@@ -0,0 +1,526 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
+import org.apache.hadoop.util.Timer;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+
+/**
+ * This class is used to create provided volumes.
+ */
+public class ProvidedVolumeImpl extends FsVolumeImpl {
+
+ static class ProvidedBlockPoolSlice {
+ private FsVolumeImpl providedVolume;
+
+ private FileRegionProvider provider;
+ private Configuration conf;
+ private String bpid;
+ private ReplicaMap bpVolumeMap;
+
+ ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
+ Configuration conf) {
+ this.providedVolume = volume;
+ bpVolumeMap = new ReplicaMap(new AutoCloseableLock());
+ Class<? extends FileRegionProvider> fmt =
+ conf.getClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
+ TextFileRegionProvider.class, FileRegionProvider.class);
+ provider = ReflectionUtils.newInstance(fmt, conf);
+ this.conf = conf;
+ this.bpid = bpid;
+ bpVolumeMap.initBlockPool(bpid);
+ LOG.info("Created provider: " + provider.getClass());
+ }
+
+ FileRegionProvider getFileRegionProvider() {
+ return provider;
+ }
+
+ public void getVolumeMap(ReplicaMap volumeMap,
+ RamDiskReplicaTracker ramDiskReplicaMap) throws IOException {
+ Iterator<FileRegion> iter = provider.iterator();
+ while(iter.hasNext()) {
+ FileRegion region = iter.next();
+ if (region.getBlockPoolId() != null &&
+ region.getBlockPoolId().equals(bpid)) {
+ ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
+ .setBlockId(region.getBlock().getBlockId())
+ .setURI(region.getPath().toUri())
+ .setOffset(region.getOffset())
+ .setLength(region.getBlock().getNumBytes())
+ .setGenerationStamp(region.getBlock().getGenerationStamp())
+ .setFsVolume(providedVolume)
+ .setConf(conf).build();
+
+ ReplicaInfo oldReplica =
+ volumeMap.get(bpid, newReplica.getBlockId());
+ if (oldReplica == null) {
+ volumeMap.add(bpid, newReplica);
+ bpVolumeMap.add(bpid, newReplica);
+ } else {
+ throw new IOException(
+ "A block with id " + newReplica.getBlockId() +
+ " already exists in the volumeMap");
+ }
+ }
+ }
+ }
+
+ public boolean isEmpty() {
+ return bpVolumeMap.replicas(bpid).size() == 0;
+ }
+
+ public void shutdown(BlockListAsLongs blocksListsAsLongs) {
+ //nothing to do!
+ }
+
+ public void compileReport(LinkedList<ScanInfo> report,
+ ReportCompiler reportCompiler)
+ throws IOException, InterruptedException {
+ /* refresh the provider and return the list of blocks found.
+ * the assumption here is that the block ids in the external
+ * block map, after the refresh, are consistent with those
+ * from before the refresh, i.e., for blocks which did not change,
+ * the ids remain the same.
+ */
+ provider.refresh();
+ Iterator<FileRegion> iter = provider.iterator();
+ while(iter.hasNext()) {
+ reportCompiler.throttle();
+ FileRegion region = iter.next();
+ if (region.getBlockPoolId().equals(bpid)) {
+ LOG.info("Adding ScanInfo for blkid " +
+ region.getBlock().getBlockId());
+ report.add(new ScanInfo(region.getBlock().getBlockId(), null, null,
+ providedVolume, region, region.getLength()));
+ }
+ }
+ }
+ }
+
+ private URI baseURI;
+ private final Map<String, ProvidedBlockPoolSlice> bpSlices =
+ new ConcurrentHashMap<String, ProvidedBlockPoolSlice>();
+
+ private ProvidedVolumeDF df;
+
+ ProvidedVolumeImpl(FsDatasetImpl dataset, String storageID,
+ StorageDirectory sd, FileIoProvider fileIoProvider,
+ Configuration conf) throws IOException {
+ super(dataset, storageID, sd, fileIoProvider, conf);
+ assert getStorageLocation().getStorageType() == StorageType.PROVIDED:
+ "Only provided storages must use ProvidedVolume";
+
+ baseURI = getStorageLocation().getUri();
+ Class<? extends ProvidedVolumeDF> dfClass =
+ conf.getClass(DFSConfigKeys.DFS_PROVIDER_DF_CLASS,
+ DefaultProvidedVolumeDF.class, ProvidedVolumeDF.class);
+ df = ReflectionUtils.newInstance(dfClass, conf);
+ }
+
+ @Override
+ public String[] getBlockPoolList() {
+ return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);
+ }
+
+ @Override
+ public long getCapacity() {
+ if (configuredCapacity < 0) {
+ return df.getCapacity();
+ }
+ return configuredCapacity;
+ }
+
+ @Override
+ public long getDfsUsed() throws IOException {
+ return df.getSpaceUsed();
+ }
+
+ @Override
+ long getBlockPoolUsed(String bpid) throws IOException {
+ return df.getBlockPoolUsed(bpid);
+ }
+
+ @Override
+ public long getAvailable() throws IOException {
+ return df.getAvailable();
+ }
+
+ @Override
+ long getActualNonDfsUsed() throws IOException {
+ return df.getSpaceUsed();
+ }
+
+ @Override
+ public long getNonDfsUsed() throws IOException {
+ return 0L;
+ }
+
+ @Override
+ public URI getBaseURI() {
+ return baseURI;
+ }
+
+ @Override
+ public File getFinalizedDir(String bpid) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void reserveSpaceForReplica(long bytesToReserve) {
+ throw new UnsupportedOperationException(
+ "ProvidedVolume does not yet support writes");
+ }
+
+ @Override
+ public void releaseReservedSpace(long bytesToRelease) {
+ throw new UnsupportedOperationException(
+ "ProvidedVolume does not yet support writes");
+ }
+
+ private static final ObjectWriter WRITER =
+ new ObjectMapper().writerWithDefaultPrettyPrinter();
+ private static final ObjectReader READER =
+ new ObjectMapper().reader(ProvidedBlockIteratorState.class);
+
+ private static class ProvidedBlockIteratorState {
+ ProvidedBlockIteratorState() {
+ iterStartMs = Time.now();
+ lastSavedMs = iterStartMs;
+ atEnd = false;
+ lastBlockId = -1;
+ }
+
+ // The wall-clock ms since the epoch at which this iterator was last saved.
+ @JsonProperty
+ private long lastSavedMs;
+
+ // The wall-clock ms since the epoch at which this iterator was created.
+ @JsonProperty
+ private long iterStartMs;
+
+ @JsonProperty
+ private boolean atEnd;
+
+ //The id of the last block read when the state of the iterator is saved.
+ //This implementation assumes that provided blocks are returned
+ //in sorted order of the block ids.
+ @JsonProperty
+ private long lastBlockId;
+ }
+
+ private class ProviderBlockIteratorImpl
+ implements FsVolumeSpi.BlockIterator {
+
+ private String bpid;
+ private String name;
+ private FileRegionProvider provider;
+ private Iterator<FileRegion> blockIterator;
+ private ProvidedBlockIteratorState state;
+
+ ProviderBlockIteratorImpl(String bpid, String name,
+ FileRegionProvider provider) {
+ this.bpid = bpid;
+ this.name = name;
+ this.provider = provider;
+ rewind();
+ }
+
+ @Override
+ public void close() throws IOException {
+ //No action needed
+ }
+
+ @Override
+ public ExtendedBlock nextBlock() throws IOException {
+ if (null == blockIterator || !blockIterator.hasNext()) {
+ return null;
+ }
+ FileRegion nextRegion = null;
+ while (null == nextRegion && blockIterator.hasNext()) {
+ FileRegion temp = blockIterator.next();
+ if (temp.getBlock().getBlockId() < state.lastBlockId) {
+ continue;
+ }
+ if (temp.getBlockPoolId().equals(bpid)) {
+ nextRegion = temp;
+ }
+ }
+ if (null == nextRegion) {
+ return null;
+ }
+ state.lastBlockId = nextRegion.getBlock().getBlockId();
+ return new ExtendedBlock(bpid, nextRegion.getBlock());
+ }
+
+ @Override
+ public boolean atEnd() {
+ return blockIterator != null ? !blockIterator.hasNext(): true;
+ }
+
+ @Override
+ public void rewind() {
+ blockIterator = provider.iterator();
+ state = new ProvidedBlockIteratorState();
+ }
+
+ @Override
+ public void save() throws IOException {
+ //We do not persist the state of this iterator anywhere, locally.
+ //We just re-scan provided volumes as necessary.
+ state.lastSavedMs = Time.now();
+ }
+
+ @Override
+ public void setMaxStalenessMs(long maxStalenessMs) {
+ //do not use max staleness
+ }
+
+ @Override
+ public long getIterStartMs() {
+ return state.iterStartMs;
+ }
+
+ @Override
+ public long getLastSavedMs() {
+ return state.lastSavedMs;
+ }
+
+ @Override
+ public String getBlockPoolId() {
+ return bpid;
+ }
+
+ public void load() throws IOException {
+ //on load, we just rewind the iterator for provided volumes.
+ rewind();
+ LOG.trace("load({}, {}): loaded iterator {}: {}", getStorageID(),
+ bpid, name, WRITER.writeValueAsString(state));
+ }
+ }
+
+ @Override
+ public BlockIterator newBlockIterator(String bpid, String name) {
+ return new ProviderBlockIteratorImpl(bpid, name,
+ bpSlices.get(bpid).getFileRegionProvider());
+ }
+
+ @Override
+ public BlockIterator loadBlockIterator(String bpid, String name)
+ throws IOException {
+ ProviderBlockIteratorImpl iter = new ProviderBlockIteratorImpl(bpid, name,
+ bpSlices.get(bpid).getFileRegionProvider());
+ iter.load();
+ return iter;
+ }
+
+ @Override
+ ReplicaInfo addFinalizedBlock(String bpid, Block b,
+ ReplicaInfo replicaInfo, long bytesReserved) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedVolume does not yet support writes");
+ }
+
+ @Override
+ public VolumeCheckResult check(VolumeCheckContext ignored)
+ throws DiskErrorException {
+ return VolumeCheckResult.HEALTHY;
+ }
+
+ @Override
+ void getVolumeMap(ReplicaMap volumeMap,
+ final RamDiskReplicaTracker ramDiskReplicaMap)
+ throws IOException {
+ LOG.info("Creating volumemap for provided volume " + this);
+ for(ProvidedBlockPoolSlice s : bpSlices.values()) {
+ s.getVolumeMap(volumeMap, ramDiskReplicaMap);
+ }
+ }
+
+ private ProvidedBlockPoolSlice getProvidedBlockPoolSlice(String bpid)
+ throws IOException {
+ ProvidedBlockPoolSlice bp = bpSlices.get(bpid);
+ if (bp == null) {
+ throw new IOException("block pool " + bpid + " is not found");
+ }
+ return bp;
+ }
+
+ @Override
+ void getVolumeMap(String bpid, ReplicaMap volumeMap,
+ final RamDiskReplicaTracker ramDiskReplicaMap)
+ throws IOException {
+ getProvidedBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
+ }
+
+ @VisibleForTesting
+ FileRegionProvider getFileRegionProvider(String bpid) throws IOException {
+ return getProvidedBlockPoolSlice(bpid).getFileRegionProvider();
+ }
+
+ @Override
+ public String toString() {
+ return this.baseURI.toString();
+ }
+
+ @Override
+ void addBlockPool(String bpid, Configuration conf) throws IOException {
+ addBlockPool(bpid, conf, null);
+ }
+
+ @Override
+ void addBlockPool(String bpid, Configuration conf, Timer timer)
+ throws IOException {
+ LOG.info("Adding block pool " + bpid +
+ " to volume with id " + getStorageID());
+ ProvidedBlockPoolSlice bp;
+ bp = new ProvidedBlockPoolSlice(bpid, this, conf);
+ bpSlices.put(bpid, bp);
+ }
+
+ void shutdown() {
+ if (cacheExecutor != null) {
+ cacheExecutor.shutdown();
+ }
+ Set<Entry<String, ProvidedBlockPoolSlice>> set = bpSlices.entrySet();
+ for (Entry<String, ProvidedBlockPoolSlice> entry : set) {
+ entry.getValue().shutdown(null);
+ }
+ }
+
+ @Override
+ void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) {
+ ProvidedBlockPoolSlice bp = bpSlices.get(bpid);
+ if (bp != null) {
+ bp.shutdown(blocksListsAsLongs);
+ }
+ bpSlices.remove(bpid);
+ }
+
+ @Override
+ boolean isBPDirEmpty(String bpid) throws IOException {
+ return getProvidedBlockPoolSlice(bpid).isEmpty();
+ }
+
+ @Override
+ void deleteBPDirectories(String bpid, boolean force) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedVolume does not yet support writes");
+ }
+
+ @Override
+ public LinkedList<ScanInfo> compileReport(String bpid,
+ LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
+ throws InterruptedException, IOException {
+ LOG.info("Compiling report for volume: " + this + " bpid " + bpid);
+ //get the report from the appropriate block pool.
+ if(bpSlices.containsKey(bpid)) {
+ bpSlices.get(bpid).compileReport(report, reportCompiler);
+ }
+ return report;
+ }
+
+ @Override
+ public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo,
+ long newGS, long estimateBlockLen) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedVolume does not yet support writes");
+ }
+
+ @Override
+ public ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedVolume does not yet support writes");
+ }
+
+ @Override
+ public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock b,
+ ReplicaInfo temp) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedVolume does not yet support writes");
+ }
+
+ @Override
+ public ReplicaInPipeline createTemporary(ExtendedBlock b)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedVolume does not yet support writes");
+ }
+
+ @Override
+ public ReplicaInPipeline updateRURCopyOnTruncate(ReplicaInfo rur,
+ String bpid, long newBlockId, long recoveryId, long newlength)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedVolume does not yet support writes");
+ }
+
+ @Override
+ public ReplicaInfo moveBlockToTmpLocation(ExtendedBlock block,
+ ReplicaInfo replicaInfo, int smallBufferSize,
+ Configuration conf) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedVolume does not yet support writes");
+ }
+
+ @Override
+ public File[] copyBlockToLazyPersistLocation(String bpId, long blockId,
+ long genStamp, ReplicaInfo replicaInfo, int smallBufferSize,
+ Configuration conf) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedVolume does not yet support writes");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 7eac87d..24ef80c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -685,7 +685,7 @@ public class Mover {
}
}
- static class Cli extends Configured implements Tool {
+ public static class Cli extends Configured implements Tool {
private static final String USAGE = "Usage: hdfs mover "
+ "[-p <files/dirs> | -f <local file>]"
+ "\n\t-p <files/dirs>\ta space separated list of HDFS files/dirs to migrate."
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
index 872ee74..45e001d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-class FSImageCompression {
+public class FSImageCompression {
/** Codec to use to save or load image, or null if the image is not compressed */
private CompressionCodec imageCodec;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
index 63d1a28..4aae7d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
@@ -658,6 +658,10 @@ public class NNStorage extends Storage implements Closeable,
void readProperties(StorageDirectory sd, StartupOption startupOption)
throws IOException {
Properties props = readPropertiesFile(sd.getVersionFile());
+ if (props == null) {
+ throw new IOException(
+ "Properties not found for storage directory " + sd);
+ }
if (HdfsServerConstants.RollingUpgradeStartupOption.ROLLBACK
.matches(startupOption)) {
int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion"));
@@ -975,7 +979,11 @@ public class NNStorage extends Storage implements Closeable,
StorageDirectory sd = sdit.next();
try {
Properties props = readPropertiesFile(sd.getVersionFile());
- cid = props.getProperty("clusterID");
+ if (props == null) {
+ cid = null;
+ } else {
+ cid = props.getProperty("clusterID");
+ }
LOG.info("current cluster id for sd="+sd.getCurrentDir() +
";lv=" + layoutVersion + ";cid=" + cid);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index f0f2220..6df243f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4461,6 +4461,84 @@
</property>
<property>
+ <name>dfs.provider.class</name>
+ <value>org.apache.hadoop.hdfs.server.common.TextFileRegionProvider</value>
+ <description>
+ The class that is used to load information about blocks stored in
+ provided storages.
+ org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider
+ is used as the default, which expects the blocks to be specified
+ using a delimited text file.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.provided.df.class</name>
+ <value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.DefaultProvidedVolumeDF</value>
+ <description>
+ The class that is used to measure usage statistics of provided stores.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.provided.storage.id</name>
+ <value>DS-PROVIDED</value>
+ <description>
+ The storage ID used for provided stores.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.provided.blockformat.class</name>
+ <value>org.apache.hadoop.hdfs.server.common.TextFileRegionFormat</value>
+ <description>
+ The class that is used to specify the input format of the blocks on
+ provided storages. The default is
+ org.apache.hadoop.hdfs.server.common.TextFileRegionFormat which uses
+ file regions to describe blocks. The file regions are specified as a
+ delimited text file. Each file region is a 6-tuple containing the
+ block id, remote file path, offset into file, length of block, the
+ block pool id containing the block, and the generation stamp of the
+ block.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.provided.textprovider.delimiter</name>
+ <value>,</value>
+ <description>
+ The delimiter used when the provided block map is specified as
+ a text file.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.provided.textprovider.read.path</name>
+ <value></value>
+ <description>
+ The path specifying the provided block map as a text file, specified as
+ a URI.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.provided.textprovider.read.codec</name>
+ <value></value>
+ <description>
+ The codec used to de-compress the provided block map.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.provided.textprovider.write.path</name>
+ <value></value>
+ <description>
+ The path to which the provided block map should be written as a text
+ file, specified as a URI.
+ </description>
+ </property>
+
+ <property>
<name>dfs.lock.suppress.warning.interval</name>
<value>10s</value>
<description>Instrumentation reporting long critical sections will suppress
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
index 25eb5b6..8bc8b0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
@@ -208,7 +208,7 @@ public class TestDFSRollback {
UpgradeUtilities.createDataNodeVersionFile(
dataCurrentDirs,
storageInfo,
- UpgradeUtilities.getCurrentBlockPoolID(cluster));
+ UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
cluster.startDataNodes(conf, 1, false, StartupOption.ROLLBACK, null);
assertTrue(cluster.isDataNodeUp());
@@ -256,7 +256,7 @@ public class TestDFSRollback {
NodeType.DATA_NODE);
UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
- UpgradeUtilities.getCurrentBlockPoolID(cluster));
+ UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
startBlockPoolShouldFail(StartupOption.ROLLBACK,
cluster.getNamesystem().getBlockPoolId());
@@ -283,7 +283,7 @@ public class TestDFSRollback {
NodeType.DATA_NODE);
UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
- UpgradeUtilities.getCurrentBlockPoolID(cluster));
+ UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
startBlockPoolShouldFail(StartupOption.ROLLBACK,
cluster.getNamesystem().getBlockPoolId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
index d202223..0c09eda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
@@ -265,7 +265,7 @@ public class TestDFSStartupVersions {
conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY), "current");
log("DataNode version info", DATA_NODE, i, versions[i]);
UpgradeUtilities.createDataNodeVersionFile(storage,
- versions[i].storageInfo, bpid, versions[i].blockPoolId);
+ versions[i].storageInfo, bpid, versions[i].blockPoolId, conf);
try {
cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
} catch (Exception ignore) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
index fe1ede0..0d9f502 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
@@ -290,7 +290,7 @@ public class TestDFSUpgrade {
UpgradeUtilities.getCurrentFsscTime(cluster), NodeType.DATA_NODE);
UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
- UpgradeUtilities.getCurrentBlockPoolID(cluster));
+ UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities
.getCurrentBlockPoolID(null));
@@ -308,7 +308,7 @@ public class TestDFSUpgrade {
NodeType.DATA_NODE);
UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
- UpgradeUtilities.getCurrentBlockPoolID(cluster));
+ UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
// Ensure corresponding block pool failed to initialized
startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities
.getCurrentBlockPoolID(null));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
index b0504f0..174dea8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
@@ -386,8 +386,10 @@ public class UpgradeUtilities {
new File(datanodeStorage.toString()));
sd.setStorageUuid(DatanodeStorage.generateUuid());
Properties properties = Storage.readPropertiesFile(sd.getVersionFile());
- properties.setProperty("storageID", sd.getStorageUuid());
- Storage.writeProperties(sd.getVersionFile(), properties);
+ if (properties != null) {
+ properties.setProperty("storageID", sd.getStorageUuid());
+ Storage.writeProperties(sd.getVersionFile(), properties);
+ }
retVal[i] = newDir;
}
@@ -463,8 +465,9 @@ public class UpgradeUtilities {
* @param bpid Block pool Id
*/
public static void createDataNodeVersionFile(File[] parent,
- StorageInfo version, String bpid) throws IOException {
- createDataNodeVersionFile(parent, version, bpid, bpid);
+ StorageInfo version, String bpid, Configuration conf)
+ throws IOException {
+ createDataNodeVersionFile(parent, version, bpid, bpid, conf);
}
/**
@@ -479,7 +482,8 @@ public class UpgradeUtilities {
* @param bpidToWrite Block pool Id to write into the version file
*/
public static void createDataNodeVersionFile(File[] parent,
- StorageInfo version, String bpid, String bpidToWrite) throws IOException {
+ StorageInfo version, String bpid, String bpidToWrite, Configuration conf)
+ throws IOException {
DataStorage storage = new DataStorage(version);
storage.setDatanodeUuid("FixedDatanodeUuid");
@@ -487,7 +491,7 @@ public class UpgradeUtilities {
for (int i = 0; i < parent.length; i++) {
File versionFile = new File(parent[i], "VERSION");
StorageDirectory sd = new StorageDirectory(parent[i].getParentFile());
- DataStorage.createStorageID(sd, false);
+ DataStorage.createStorageID(sd, false, conf);
storage.writeProperties(versionFile, sd);
versionFiles[i] = versionFile;
File bpDir = BlockPoolSliceStorage.getBpRoot(bpid, parent[i]);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java
new file mode 100644
index 0000000..eaaac22
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat.*;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test for the text based block format for provided block maps.
+ */
+public class TestTextBlockFormat {
+
+ static final Path OUTFILE = new Path("hdfs://dummyServer:0000/dummyFile.txt");
+
+ void check(TextWriter.Options opts, final Path vp,
+ final Class<? extends CompressionCodec> vc) throws IOException {
+ TextFileRegionFormat mFmt = new TextFileRegionFormat() {
+ @Override
+ public TextWriter createWriter(Path file, CompressionCodec codec,
+ String delim, Configuration conf) throws IOException {
+ assertEquals(vp, file);
+ if (null == vc) {
+ assertNull(codec);
+ } else {
+ assertEquals(vc, codec.getClass());
+ }
+ return null; // ignored
+ }
+ };
+ mFmt.getWriter(opts);
+ }
+
+ @Test
+ public void testWriterOptions() throws Exception {
+ TextWriter.Options opts = TextWriter.defaults();
+ assertTrue(opts instanceof WriterOptions);
+ WriterOptions wopts = (WriterOptions) opts;
+ Path def = new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
+ assertEquals(def, wopts.getFile());
+ assertNull(wopts.getCodec());
+
+ opts.filename(OUTFILE);
+ check(opts, OUTFILE, null);
+
+ opts.filename(OUTFILE);
+ opts.codec("gzip");
+ Path cp = new Path(OUTFILE.getParent(), OUTFILE.getName() + ".gz");
+ check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class);
+
+ }
+
+ @Test
+ public void testCSVReadWrite() throws Exception {
+ final DataOutputBuffer out = new DataOutputBuffer();
+ FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
+ FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
+ FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
+ try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), ",")) {
+ csv.store(r1);
+ csv.store(r2);
+ csv.store(r3);
+ }
+ Iterator<FileRegion> i3;
+ try (TextReader csv = new TextReader(null, null, null, ",") {
+ @Override
+ public InputStream createStream() {
+ DataInputBuffer in = new DataInputBuffer();
+ in.reset(out.getData(), 0, out.getLength());
+ return in;
+ }}) {
+ Iterator<FileRegion> i1 = csv.iterator();
+ assertEquals(r1, i1.next());
+ Iterator<FileRegion> i2 = csv.iterator();
+ assertEquals(r1, i2.next());
+ assertEquals(r2, i2.next());
+ assertEquals(r3, i2.next());
+ assertEquals(r2, i1.next());
+ assertEquals(r3, i1.next());
+
+ assertFalse(i1.hasNext());
+ assertFalse(i2.hasNext());
+ i3 = csv.iterator();
+ }
+ try {
+ i3.next();
+ } catch (IllegalStateException e) {
+ return;
+ }
+ fail("Invalid iterator");
+ }
+
+ @Test
+ public void testCSVReadWriteTsv() throws Exception {
+ final DataOutputBuffer out = new DataOutputBuffer();
+ FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
+ FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
+ FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
+ try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), "\t")) {
+ csv.store(r1);
+ csv.store(r2);
+ csv.store(r3);
+ }
+ Iterator<FileRegion> i3;
+ try (TextReader csv = new TextReader(null, null, null, "\t") {
+ @Override
+ public InputStream createStream() {
+ DataInputBuffer in = new DataInputBuffer();
+ in.reset(out.getData(), 0, out.getLength());
+ return in;
+ }}) {
+ Iterator<FileRegion> i1 = csv.iterator();
+ assertEquals(r1, i1.next());
+ Iterator<FileRegion> i2 = csv.iterator();
+ assertEquals(r1, i2.next());
+ assertEquals(r2, i2.next());
+ assertEquals(r3, i2.next());
+ assertEquals(r2, i1.next());
+ assertEquals(r3, i1.next());
+
+ assertFalse(i1.hasNext());
+ assertFalse(i2.hasNext());
+ i3 = csv.iterator();
+ }
+ try {
+ i3.next();
+ } catch (IllegalStateException e) {
+ return;
+ }
+ fail("Invalid iterator");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 18b4922..2c0775b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -616,7 +617,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
this.datanode = datanode;
if (storage != null) {
for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
- DataStorage.createStorageID(storage.getStorageDir(i), false);
+ DataStorage.createStorageID(storage.getStorageDir(i), false, conf);
}
this.datanodeUuid = storage.getDatanodeUuid();
} else {
@@ -1351,8 +1352,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
- public void checkAndUpdate(String bpid, long blockId, File diskFile,
- File diskMetaFile, FsVolumeSpi vol) throws IOException {
+ public void checkAndUpdate(String bpid, ScanInfo info) throws IOException {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 2e439d6..4539481 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -94,8 +95,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
- public void checkAndUpdate(String bpid, long blockId, File diskFile,
- File diskMetaFile, FsVolumeSpi vol) {
+ public void checkAndUpdate(String bpid, ScanInfo info) {
+ return;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 3293561..25ff1e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -119,11 +119,12 @@ public class TestFsDatasetImpl {
private final static String BLOCKPOOL = "BP-TEST";
- private static Storage.StorageDirectory createStorageDirectory(File root)
+ private static Storage.StorageDirectory createStorageDirectory(File root,
+ Configuration conf)
throws SecurityException, IOException {
Storage.StorageDirectory sd = new Storage.StorageDirectory(
StorageLocation.parse(root.toURI().toString()));
- DataStorage.createStorageID(sd, false);
+ DataStorage.createStorageID(sd, false, conf);
return sd;
}
@@ -137,7 +138,7 @@ public class TestFsDatasetImpl {
File loc = new File(BASE_DIR + "/data" + i);
dirStrings.add(new Path(loc.toString()).toUri().toString());
loc.mkdirs();
- dirs.add(createStorageDirectory(loc));
+ dirs.add(createStorageDirectory(loc, conf));
when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
}
@@ -197,7 +198,8 @@ public class TestFsDatasetImpl {
String pathUri = new Path(path).toUri().toString();
expectedVolumes.add(new File(pathUri).getAbsolutePath());
StorageLocation loc = StorageLocation.parse(pathUri);
- Storage.StorageDirectory sd = createStorageDirectory(new File(path));
+ Storage.StorageDirectory sd = createStorageDirectory(
+ new File(path), conf);
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(loc),
@@ -315,7 +317,8 @@ public class TestFsDatasetImpl {
String newVolumePath = BASE_DIR + "/newVolumeToRemoveLater";
StorageLocation loc = StorageLocation.parse(newVolumePath);
- Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath));
+ Storage.StorageDirectory sd = createStorageDirectory(
+ new File(newVolumePath), conf);
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(loc),
@@ -348,7 +351,7 @@ public class TestFsDatasetImpl {
any(ReplicaMap.class),
any(RamDiskReplicaLruTracker.class));
- Storage.StorageDirectory sd = createStorageDirectory(badDir);
+ Storage.StorageDirectory sd = createStorageDirectory(badDir, conf);
sd.lock();
DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode),
@@ -492,7 +495,7 @@ public class TestFsDatasetImpl {
String path = BASE_DIR + "/newData0";
String pathUri = new Path(path).toUri().toString();
StorageLocation loc = StorageLocation.parse(pathUri);
- Storage.StorageDirectory sd = createStorageDirectory(new File(path));
+ Storage.StorageDirectory sd = createStorageDirectory(new File(path), conf);
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
new file mode 100644
index 0000000..2c119fe
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Basic test cases for provided implementation.
+ */
+public class TestProvidedImpl {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestFsDatasetImpl.class);
+ private static final String BASE_DIR =
+ new FileSystemTestHelper().getTestRootDir();
+ private static final int NUM_LOCAL_INIT_VOLUMES = 1;
+ private static final int NUM_PROVIDED_INIT_VOLUMES = 1;
+ private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
+ private static final int NUM_PROVIDED_BLKS = 10;
+ private static final long BLK_LEN = 128 * 1024;
+ private static final int MIN_BLK_ID = 0;
+ private static final int CHOSEN_BP_ID = 0;
+
+ private static String providedBasePath = BASE_DIR;
+
+ private Configuration conf;
+ private DataNode datanode;
+ private DataStorage storage;
+ private FsDatasetImpl dataset;
+ private static Map<Long, String> blkToPathMap;
+ private static List<FsVolumeImpl> providedVolumes;
+
+ /**
+ * A simple FileRegion iterator for tests.
+ */
+ public static class TestFileRegionIterator implements Iterator<FileRegion> {
+
+ private int numBlocks;
+ private int currentCount;
+ private String basePath;
+
+ public TestFileRegionIterator(String basePath, int minID, int numBlocks) {
+ this.currentCount = minID;
+ this.numBlocks = numBlocks;
+ this.basePath = basePath;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentCount < numBlocks;
+ }
+
+ @Override
+ public FileRegion next() {
+ FileRegion region = null;
+ if (hasNext()) {
+ File newFile = new File(basePath, "file" + currentCount);
+ if(!newFile.exists()) {
+ try {
+ LOG.info("Creating file for blkid " + currentCount);
+ blkToPathMap.put((long) currentCount, newFile.getAbsolutePath());
+ LOG.info("Block id " + currentCount + " corresponds to file " +
+ newFile.getAbsolutePath());
+ newFile.createNewFile();
+ Writer writer = new OutputStreamWriter(
+ new FileOutputStream(newFile.getAbsolutePath()), "utf-8");
+ for(int i=0; i< BLK_LEN/(Integer.SIZE/8); i++) {
+ writer.write(currentCount);
+ }
+ writer.flush();
+ writer.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ region = new FileRegion(currentCount, new Path(newFile.toString()),
+ 0, BLK_LEN, BLOCK_POOL_IDS[CHOSEN_BP_ID]);
+ currentCount++;
+ }
+ return region;
+ }
+
+ @Override
+ public void remove() {
+ //do nothing.
+ }
+
+ public void resetMinBlockId(int minId) {
+ currentCount = minId;
+ }
+
+ public void resetBlockCount(int numBlocks) {
+ this.numBlocks = numBlocks;
+ }
+
+ }
+
+ /**
+ * A simple FileRegion provider for tests.
+ */
+ public static class TestFileRegionProvider
+ extends FileRegionProvider implements Configurable {
+
+ private Configuration conf;
+ private int minId;
+ private int numBlocks;
+
+ TestFileRegionProvider() {
+ minId = MIN_BLK_ID;
+ numBlocks = NUM_PROVIDED_BLKS;
+ }
+
+ @Override
+ public Iterator<FileRegion> iterator() {
+ return new TestFileRegionIterator(providedBasePath, minId, numBlocks);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void refresh() {
+ //do nothing!
+ }
+
+ public void setMinBlkId(int minId) {
+ this.minId = minId;
+ }
+
+ public void setBlockCount(int numBlocks) {
+ this.numBlocks = numBlocks;
+ }
+ }
+
+ private static Storage.StorageDirectory createLocalStorageDirectory(
+ File root, Configuration conf)
+ throws SecurityException, IOException {
+ Storage.StorageDirectory sd =
+ new Storage.StorageDirectory(
+ StorageLocation.parse(root.toURI().toString()));
+ DataStorage.createStorageID(sd, false, conf);
+ return sd;
+ }
+
+ private static Storage.StorageDirectory createProvidedStorageDirectory(
+ String confString, Configuration conf)
+ throws SecurityException, IOException {
+ Storage.StorageDirectory sd =
+ new Storage.StorageDirectory(StorageLocation.parse(confString));
+ DataStorage.createStorageID(sd, false, conf);
+ return sd;
+ }
+
+ private static void createStorageDirs(DataStorage storage,
+ Configuration conf, int numDirs, int numProvidedDirs)
+ throws IOException {
+ List<Storage.StorageDirectory> dirs =
+ new ArrayList<Storage.StorageDirectory>();
+ List<String> dirStrings = new ArrayList<String>();
+ FileUtils.deleteDirectory(new File(BASE_DIR));
+ for (int i = 0; i < numDirs; i++) {
+ File loc = new File(BASE_DIR, "data" + i);
+ dirStrings.add(new Path(loc.toString()).toUri().toString());
+ loc.mkdirs();
+ dirs.add(createLocalStorageDirectory(loc, conf));
+ when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
+ }
+
+ for (int i = numDirs; i < numDirs + numProvidedDirs; i++) {
+ File loc = new File(BASE_DIR, "data" + i);
+ providedBasePath = loc.getAbsolutePath();
+ loc.mkdirs();
+ String dirString = "[PROVIDED]" +
+ new Path(loc.toString()).toUri().toString();
+ dirStrings.add(dirString);
+ dirs.add(createProvidedStorageDirectory(dirString, conf));
+ when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
+ }
+
+ String dataDir = StringUtils.join(",", dirStrings);
+ conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
+ when(storage.dirIterator()).thenReturn(dirs.iterator());
+ when(storage.getNumStorageDirs()).thenReturn(numDirs + numProvidedDirs);
+ }
+
+ private int getNumVolumes() {
+ try (FsDatasetSpi.FsVolumeReferences volumes =
+ dataset.getFsVolumeReferences()) {
+ return volumes.size();
+ } catch (IOException e) {
+ return 0;
+ }
+ }
+
+ private void compareBlkFile(InputStream ins, String filepath)
+ throws FileNotFoundException, IOException {
+ try (ReadableByteChannel i = Channels.newChannel(
+ new FileInputStream(new File(filepath)))) {
+ try (ReadableByteChannel j = Channels.newChannel(ins)) {
+ ByteBuffer ib = ByteBuffer.allocate(4096);
+ ByteBuffer jb = ByteBuffer.allocate(4096);
+ while (true) {
+ int il = i.read(ib);
+ int jl = j.read(jb);
+ if (il < 0 || jl < 0) {
+ assertEquals(il, jl);
+ break;
+ }
+ ib.flip();
+ jb.flip();
+ int cmp = Math.min(ib.remaining(), jb.remaining());
+ for (int k = 0; k < cmp; ++k) {
+ assertEquals(ib.get(), jb.get());
+ }
+ ib.compact();
+ jb.compact();
+ }
+ }
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ datanode = mock(DataNode.class);
+ storage = mock(DataStorage.class);
+ this.conf = new Configuration();
+ this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
+
+ when(datanode.getConf()).thenReturn(conf);
+ final DNConf dnConf = new DNConf(datanode);
+ when(datanode.getDnConf()).thenReturn(dnConf);
+
+ final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
+ when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
+ final ShortCircuitRegistry shortCircuitRegistry =
+ new ShortCircuitRegistry(conf);
+ when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
+
+ this.conf.setClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
+ TestFileRegionProvider.class, FileRegionProvider.class);
+
+ blkToPathMap = new HashMap<Long, String>();
+ providedVolumes = new LinkedList<FsVolumeImpl>();
+
+ createStorageDirs(
+ storage, conf, NUM_LOCAL_INIT_VOLUMES, NUM_PROVIDED_INIT_VOLUMES);
+
+ dataset = new FsDatasetImpl(datanode, storage, conf);
+ FsVolumeReferences volumes = dataset.getFsVolumeReferences();
+ for (int i = 0; i < volumes.size(); i++) {
+ FsVolumeSpi vol = volumes.get(i);
+ if (vol.getStorageType() == StorageType.PROVIDED) {
+ providedVolumes.add((FsVolumeImpl) vol);
+ }
+ }
+
+ for (String bpid : BLOCK_POOL_IDS) {
+ dataset.addBlockPool(bpid, conf);
+ }
+
+ assertEquals(NUM_LOCAL_INIT_VOLUMES + NUM_PROVIDED_INIT_VOLUMES,
+ getNumVolumes());
+ assertEquals(0, dataset.getNumFailedVolumes());
+ }
+
+ @Test
+ public void testProvidedStorageID() throws IOException {
+ for (int i = 0; i < providedVolumes.size(); i++) {
+ assertEquals(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT,
+ providedVolumes.get(i).getStorageID());
+ }
+ }
+
+ @Test
+ public void testBlockLoad() throws IOException {
+ for (int i = 0; i < providedVolumes.size(); i++) {
+ FsVolumeImpl vol = providedVolumes.get(i);
+ ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+ vol.getVolumeMap(volumeMap, null);
+
+ assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
+ for (int j = 0; j < BLOCK_POOL_IDS.length; j++) {
+ if (j != CHOSEN_BP_ID) {
+ //this block pool should not have any blocks
+ assertEquals(null, volumeMap.replicas(BLOCK_POOL_IDS[j]));
+ }
+ }
+ assertEquals(NUM_PROVIDED_BLKS,
+ volumeMap.replicas(BLOCK_POOL_IDS[CHOSEN_BP_ID]).size());
+ }
+ }
+
+ @Test
+ public void testProvidedBlockRead() throws IOException {
+ for (int id = 0; id < NUM_PROVIDED_BLKS; id++) {
+ ExtendedBlock eb = new ExtendedBlock(
+ BLOCK_POOL_IDS[CHOSEN_BP_ID], id, BLK_LEN,
+ HdfsConstants.GRANDFATHER_GENERATION_STAMP);
+ InputStream ins = dataset.getBlockInputStream(eb, 0);
+ String filepath = blkToPathMap.get((long) id);
+ compareBlkFile(ins, filepath);
+ }
+ }
+
+ @Test
+ public void testProvidedBlockIterator() throws IOException {
+ for (int i = 0; i < providedVolumes.size(); i++) {
+ FsVolumeImpl vol = providedVolumes.get(i);
+ BlockIterator iter =
+ vol.newBlockIterator(BLOCK_POOL_IDS[CHOSEN_BP_ID], "temp");
+ Set<Long> blockIdsUsed = new HashSet<Long>();
+ while(!iter.atEnd()) {
+ ExtendedBlock eb = iter.nextBlock();
+ long blkId = eb.getBlockId();
+ assertTrue(blkId >= MIN_BLK_ID && blkId < NUM_PROVIDED_BLKS);
+ //all block ids must be unique!
+ assertTrue(!blockIdsUsed.contains(blkId));
+ blockIdsUsed.add(blkId);
+ }
+ assertEquals(NUM_PROVIDED_BLKS, blockIdsUsed.size());
+ }
+ }
+
+
+ @Test
+ public void testRefresh() throws IOException {
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
+ for (int i = 0; i < providedVolumes.size(); i++) {
+ ProvidedVolumeImpl vol = (ProvidedVolumeImpl) providedVolumes.get(i);
+ TestFileRegionProvider provider = (TestFileRegionProvider)
+ vol.getFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
+ //equivalent to two new blocks appearing
+ provider.setBlockCount(NUM_PROVIDED_BLKS + 2);
+ //equivalent to deleting the first block
+ provider.setMinBlkId(MIN_BLK_ID + 1);
+
+ DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf);
+ scanner.reconcile();
+ ReplicaInfo info = dataset.getBlockReplica(
+ BLOCK_POOL_IDS[CHOSEN_BP_ID], NUM_PROVIDED_BLKS + 1);
+ //new replica should be added to the dataset
+ assertTrue(info != null);
+ try {
+ info = dataset.getBlockReplica(BLOCK_POOL_IDS[CHOSEN_BP_ID], 0);
+ } catch(Exception ex) {
+ LOG.info("Exception expected: " + ex);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java
index fa3399b..236627e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java
@@ -64,7 +64,10 @@ public class TestClusterId {
fsImage.getStorage().dirIterator(NNStorage.NameNodeDirType.IMAGE);
StorageDirectory sd = sdit.next();
Properties props = Storage.readPropertiesFile(sd.getVersionFile());
- String cid = props.getProperty("clusterID");
+ String cid = null;
+ if (props != null) {
+ cid = props.getProperty("clusterID");
+ }
LOG.info("successfully formated : sd="+sd.getCurrentDir() + ";cid="+cid);
return cid;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org