You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2017/05/11 21:42:41 UTC
[45/50] [abbrv] hadoop git commit: HDFS-10675. Datanode support to
read from external stores.
HDFS-10675. Datanode support to read from external stores.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/68e3c683
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/68e3c683
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/68e3c683
Branch: refs/heads/HDFS-9806
Commit: 68e3c683d2ddd957ba76d72cc60c194c9f16944d
Parents: 83dd14a
Author: Virajith Jalaparti <vi...@apache.org>
Authored: Wed Mar 29 14:29:28 2017 -0700
Committer: Virajith Jalaparti <vi...@apache.org>
Committed: Thu May 11 14:41:16 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/fs/StorageType.java | 3 +-
.../org/apache/hadoop/fs/shell/TestCount.java | 3 +-
.../hadoop/hdfs/protocol/HdfsConstants.java | 4 +
.../hadoop/hdfs/protocolPB/PBHelperClient.java | 4 +
.../src/main/proto/hdfs.proto | 1 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 15 +
.../hadoop/hdfs/server/common/BlockAlias.java | 29 +
.../hadoop/hdfs/server/common/BlockFormat.java | 82 +++
.../hadoop/hdfs/server/common/FileRegion.java | 121 +++++
.../hdfs/server/common/FileRegionProvider.java | 37 ++
.../hadoop/hdfs/server/common/Storage.java | 71 ++-
.../hadoop/hdfs/server/common/StorageInfo.java | 6 +
.../server/common/TextFileRegionFormat.java | 442 ++++++++++++++++
.../server/common/TextFileRegionProvider.java | 88 ++++
.../server/datanode/BlockPoolSliceStorage.java | 21 +-
.../hdfs/server/datanode/DataStorage.java | 44 +-
.../hdfs/server/datanode/DirectoryScanner.java | 20 +-
.../datanode/FinalizedProvidedReplica.java | 91 ++++
.../hdfs/server/datanode/ProvidedReplica.java | 248 +++++++++
.../hdfs/server/datanode/ReplicaBuilder.java | 100 +++-
.../hdfs/server/datanode/ReplicaInfo.java | 20 +-
.../hdfs/server/datanode/StorageLocation.java | 26 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 4 +-
.../server/datanode/fsdataset/FsVolumeSpi.java | 32 +-
.../fsdataset/impl/DefaultProvidedVolumeDF.java | 58 ++
.../datanode/fsdataset/impl/FsDatasetImpl.java | 40 +-
.../datanode/fsdataset/impl/FsDatasetUtil.java | 25 +-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 19 +-
.../fsdataset/impl/FsVolumeImplBuilder.java | 6 +
.../fsdataset/impl/ProvidedVolumeDF.java | 34 ++
.../fsdataset/impl/ProvidedVolumeImpl.java | 526 +++++++++++++++++++
.../apache/hadoop/hdfs/server/mover/Mover.java | 2 +-
.../server/namenode/FSImageCompression.java | 2 +-
.../hadoop/hdfs/server/namenode/NNStorage.java | 10 +-
.../src/main/resources/hdfs-default.xml | 78 +++
.../org/apache/hadoop/hdfs/TestDFSRollback.java | 6 +-
.../hadoop/hdfs/TestDFSStartupVersions.java | 2 +-
.../org/apache/hadoop/hdfs/TestDFSUpgrade.java | 4 +-
.../apache/hadoop/hdfs/UpgradeUtilities.java | 16 +-
.../hdfs/server/common/TestTextBlockFormat.java | 160 ++++++
.../server/datanode/SimulatedFSDataset.java | 6 +-
.../extdataset/ExternalDatasetImpl.java | 5 +-
.../fsdataset/impl/TestFsDatasetImpl.java | 17 +-
.../fsdataset/impl/TestProvidedImpl.java | 426 +++++++++++++++
.../hdfs/server/namenode/TestClusterId.java | 5 +-
45 files changed, 2872 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
index 0948801..2ecd206 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
@@ -37,7 +37,8 @@ public enum StorageType {
RAM_DISK(true),
SSD(false),
DISK(false),
- ARCHIVE(false);
+ ARCHIVE(false),
+ PROVIDED(false);
private final boolean isTransient;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java
index 2a1c38c..1666a3c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java
@@ -285,7 +285,7 @@ public class TestCount {
// <----13---> <-------17------> <----13-----> <------17------->
" SSD_QUOTA REM_SSD_QUOTA DISK_QUOTA REM_DISK_QUOTA " +
// <----13---> <-------17------>
- "ARCHIVE_QUOTA REM_ARCHIVE_QUOTA " +
+ "ARCHIVE_QUOTA REM_ARCHIVE_QUOTA PROVIDED_QUOTA REM_PROVIDED_QUOTA " +
"PATHNAME";
verify(out).println(withStorageTypeHeader);
verifyNoMoreInteractions(out);
@@ -340,6 +340,7 @@ public class TestCount {
" SSD_QUOTA REM_SSD_QUOTA " +
" DISK_QUOTA REM_DISK_QUOTA " +
"ARCHIVE_QUOTA REM_ARCHIVE_QUOTA " +
+ "PROVIDED_QUOTA REM_PROVIDED_QUOTA " +
"PATHNAME";
verify(out).println(withStorageTypeHeader);
verifyNoMoreInteractions(out);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index 0d31bc4..7e92dad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -47,6 +47,10 @@ public final class HdfsConstants {
public static final String WARM_STORAGE_POLICY_NAME = "WARM";
public static final byte COLD_STORAGE_POLICY_ID = 2;
public static final String COLD_STORAGE_POLICY_NAME = "COLD";
+ // branch HDFS-9806 XXX temporary until HDFS-7076
+ public static final byte PROVIDED_STORAGE_POLICY_ID = 1;
+ public static final String PROVIDED_STORAGE_POLICY_NAME = "PROVIDED";
+
// TODO should be conf injected?
public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 614f653..18f0267 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -381,6 +381,8 @@ public class PBHelperClient {
return StorageTypeProto.ARCHIVE;
case RAM_DISK:
return StorageTypeProto.RAM_DISK;
+ case PROVIDED:
+ return StorageTypeProto.PROVIDED;
default:
throw new IllegalStateException(
"BUG: StorageType not found, type=" + type);
@@ -397,6 +399,8 @@ public class PBHelperClient {
return StorageType.ARCHIVE;
case RAM_DISK:
return StorageType.RAM_DISK;
+ case PROVIDED:
+ return StorageType.PROVIDED;
default:
throw new IllegalStateException(
"BUG: StorageTypeProto not found, type=" + type);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 08ed3c8..470304a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -210,6 +210,7 @@ enum StorageTypeProto {
SSD = 2;
ARCHIVE = 3;
RAM_DISK = 4;
+ PROVIDED = 5;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b95c7e6..6406d35 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -314,6 +314,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.edits.asynclogging";
public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = false;
+ public static final String DFS_PROVIDER_CLASS = "dfs.provider.class";
+ public static final String DFS_PROVIDER_DF_CLASS = "dfs.provided.df.class";
+ public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id";
+ public static final String DFS_PROVIDER_STORAGEUUID_DEFAULT = "DS-PROVIDED";
+ public static final String DFS_PROVIDER_BLK_FORMAT_CLASS = "dfs.provided.blockformat.class";
+
+ public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER = "dfs.provided.textprovider.delimiter";
+ public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT = ",";
+
+ public static final String DFS_PROVIDED_BLOCK_MAP_READ_PATH = "dfs.provided.textprovider.read.path";
+ public static final String DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT = "file:///tmp/blocks.csv";
+
+ public static final String DFS_PROVIDED_BLOCK_MAP_CODEC = "dfs.provided.textprovider.read.codec";
+ public static final String DFS_PROVIDED_BLOCK_MAP_WRITE_PATH = "dfs.provided.textprovider.write.path";
+
public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java
new file mode 100644
index 0000000..b2fac97
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Interface used to load provided blocks.
+ */
+public interface BlockAlias {
+
+ Block getBlock();
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java
new file mode 100644
index 0000000..66e7fdf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * An abstract class used to read and write block maps for provided blocks.
+ */
+public abstract class BlockFormat<T extends BlockAlias> {
+
+ /**
+ * An abstract class that is used to read {@link BlockAlias}es
+ * for provided blocks.
+ */
+ public static abstract class Reader<U extends BlockAlias>
+ implements Iterable<U>, Closeable {
+
+ /**
+ * reader options.
+ */
+ public interface Options { }
+
+ public abstract U resolve(Block ident) throws IOException;
+
+ }
+
+ /**
+ * Returns the reader for the provided block map.
+ * @param opts reader options
+ * @return {@link Reader} to the block map.
+ * @throws IOException
+ */
+ public abstract Reader<T> getReader(Reader.Options opts) throws IOException;
+
+ /**
+ * An abstract class used as a writer for the provided block map.
+ */
+ public static abstract class Writer<U extends BlockAlias>
+ implements Closeable {
+ /**
+ * writer options.
+ */
+ public interface Options { }
+
+ public abstract void store(U token) throws IOException;
+
+ }
+
+ /**
+ * Returns the writer for the provided block map.
+ * @param opts writer options.
+ * @return {@link Writer} to the block map.
+ * @throws IOException
+ */
+ public abstract Writer<T> getWriter(Writer.Options opts) throws IOException;
+
+ /**
+ * Refresh based on the underlying block map.
+ * @throws IOException
+ */
+ public abstract void refresh() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java
new file mode 100644
index 0000000..c568b90
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+/**
+ * This class is used to represent provided blocks that are file regions,
+ * i.e., can be described using (path, offset, length).
+ */
+public class FileRegion implements BlockAlias {
+
+ private final Path path;
+ private final long offset;
+ private final long length;
+ private final long blockId;
+ private final String bpid;
+ private final long genStamp;
+
+ public FileRegion(long blockId, Path path, long offset,
+ long length, String bpid, long genStamp) {
+ this.path = path;
+ this.offset = offset;
+ this.length = length;
+ this.blockId = blockId;
+ this.bpid = bpid;
+ this.genStamp = genStamp;
+ }
+
+ public FileRegion(long blockId, Path path, long offset,
+ long length, String bpid) {
+ this(blockId, path, offset, length, bpid,
+ HdfsConstants.GRANDFATHER_GENERATION_STAMP);
+
+ }
+
+ public FileRegion(long blockId, Path path, long offset,
+ long length, long genStamp) {
+ this(blockId, path, offset, length, null, genStamp);
+
+ }
+
+ public FileRegion(long blockId, Path path, long offset, long length) {
+ this(blockId, path, offset, length, null);
+ }
+
+ @Override
+ public Block getBlock() {
+ return new Block(blockId, length, genStamp);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof FileRegion)) {
+ return false;
+ }
+ FileRegion o = (FileRegion) other;
+ return blockId == o.blockId
+ && offset == o.offset
+ && length == o.length
+ && genStamp == o.genStamp
+ && path.equals(o.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)(blockId & Integer.MIN_VALUE);
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public long getGenerationStamp() {
+ return genStamp;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ block=\"").append(getBlock()).append("\"");
+ sb.append(", path=\"").append(getPath()).append("\"");
+ sb.append(", off=\"").append(getOffset()).append("\"");
+ sb.append(", len=\"").append(getBlock().getNumBytes()).append("\"");
+ sb.append(", genStamp=\"").append(getBlock()
+ .getGenerationStamp()).append("\"");
+ sb.append(", bpid=\"").append(bpid).append("\"");
+ sb.append(" }");
+ return sb.toString();
+ }
+
+ public String getBlockPoolId() {
+ return this.bpid;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java
new file mode 100644
index 0000000..2e94239
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * This class is a stub for reading file regions from the block map.
+ */
+public class FileRegionProvider implements Iterable<FileRegion> {
+ @Override
+ public Iterator<FileRegion> iterator() {
+ return Collections.emptyListIterator();
+ }
+
+ public void refresh() throws IOException {
+ return;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 414d3a7..9ad61d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@@ -196,7 +197,10 @@ public abstract class Storage extends StorageInfo {
Iterator<StorageDirectory> it =
(dirType == null) ? dirIterator() : dirIterator(dirType);
for ( ;it.hasNext(); ) {
- list.add(new File(it.next().getCurrentDir(), fileName));
+ File currentDir = it.next().getCurrentDir();
+ if (currentDir != null) {
+ list.add(new File(currentDir, fileName));
+ }
}
return list;
}
@@ -328,10 +332,20 @@ public abstract class Storage extends StorageInfo {
*/
public StorageDirectory(String bpid, StorageDirType dirType,
boolean isShared, StorageLocation location) {
- this(new File(location.getBpURI(bpid, STORAGE_DIR_CURRENT)), dirType,
+ this(getBlockPoolCurrentDir(bpid, location), dirType,
isShared, location);
}
+ private static File getBlockPoolCurrentDir(String bpid,
+ StorageLocation location) {
+ if (location == null ||
+ location.getStorageType() == StorageType.PROVIDED) {
+ return null;
+ } else {
+ return new File(location.getBpURI(bpid, STORAGE_DIR_CURRENT));
+ }
+ }
+
private StorageDirectory(File dir, StorageDirType dirType,
boolean isShared, StorageLocation location) {
this.root = dir;
@@ -347,7 +361,8 @@ public abstract class Storage extends StorageInfo {
}
private static File getStorageLocationFile(StorageLocation location) {
- if (location == null) {
+ if (location == null ||
+ location.getStorageType() == StorageType.PROVIDED) {
return null;
}
try {
@@ -406,6 +421,10 @@ public abstract class Storage extends StorageInfo {
*/
public void clearDirectory() throws IOException {
File curDir = this.getCurrentDir();
+ if (curDir == null) {
+ //if the directory is null, there is nothing to do.
+ return;
+ }
if (curDir.exists()) {
File[] files = FileUtil.listFiles(curDir);
LOG.info("Will remove files: " + Arrays.toString(files));
@@ -423,6 +442,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getCurrentDir() {
+ if (root == null) {
+ return null;
+ }
return new File(root, STORAGE_DIR_CURRENT);
}
@@ -443,6 +465,9 @@ public abstract class Storage extends StorageInfo {
* @return the version file path
*/
public File getVersionFile() {
+ if (root == null) {
+ return null;
+ }
return new File(new File(root, STORAGE_DIR_CURRENT), STORAGE_FILE_VERSION);
}
@@ -452,6 +477,9 @@ public abstract class Storage extends StorageInfo {
* @return the previous version file path
*/
public File getPreviousVersionFile() {
+ if (root == null) {
+ return null;
+ }
return new File(new File(root, STORAGE_DIR_PREVIOUS), STORAGE_FILE_VERSION);
}
@@ -462,6 +490,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getPreviousDir() {
+ if (root == null) {
+ return null;
+ }
return new File(root, STORAGE_DIR_PREVIOUS);
}
@@ -476,6 +507,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getPreviousTmp() {
+ if (root == null) {
+ return null;
+ }
return new File(root, STORAGE_TMP_PREVIOUS);
}
@@ -490,6 +524,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getRemovedTmp() {
+ if (root == null) {
+ return null;
+ }
return new File(root, STORAGE_TMP_REMOVED);
}
@@ -503,6 +540,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getFinalizedTmp() {
+ if (root == null) {
+ return null;
+ }
return new File(root, STORAGE_TMP_FINALIZED);
}
@@ -517,6 +557,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getLastCheckpointTmp() {
+ if (root == null) {
+ return null;
+ }
return new File(root, STORAGE_TMP_LAST_CKPT);
}
@@ -530,6 +573,9 @@ public abstract class Storage extends StorageInfo {
* @return the directory path
*/
public File getPreviousCheckpoint() {
+ if (root == null) {
+ return null;
+ }
return new File(root, STORAGE_PREVIOUS_CKPT);
}
@@ -543,7 +589,7 @@ public abstract class Storage extends StorageInfo {
private void checkEmptyCurrent() throws InconsistentFSStateException,
IOException {
File currentDir = getCurrentDir();
- if(!currentDir.exists()) {
+ if(currentDir == null || !currentDir.exists()) {
// if current/ does not exist, it's safe to format it.
return;
}
@@ -589,6 +635,13 @@ public abstract class Storage extends StorageInfo {
public StorageState analyzeStorage(StartupOption startOpt, Storage storage,
boolean checkCurrentIsEmpty)
throws IOException {
+
+ if (location != null &&
+ location.getStorageType() == StorageType.PROVIDED) {
+ //currently we assume that PROVIDED storages are always NORMAL
+ return StorageState.NORMAL;
+ }
+
assert root != null : "root is null";
boolean hadMkdirs = false;
String rootPath = root.getCanonicalPath();
@@ -710,6 +763,10 @@ public abstract class Storage extends StorageInfo {
*/
public void doRecover(StorageState curState) throws IOException {
File curDir = getCurrentDir();
+ if (curDir == null || root == null) {
+ //at this point, we do not support recovery on PROVIDED storages
+ return;
+ }
String rootPath = root.getCanonicalPath();
switch(curState) {
case COMPLETE_UPGRADE: // mv previous.tmp -> previous
@@ -883,7 +940,8 @@ public abstract class Storage extends StorageInfo {
@Override
public String toString() {
- return "Storage Directory " + this.root;
+ return "Storage Directory root= " + this.root +
+ "; location= " + this.location;
}
/**
@@ -1153,6 +1211,9 @@ public abstract class Storage extends StorageInfo {
}
public void writeProperties(File to, StorageDirectory sd) throws IOException {
+ if (to == null) {
+ return;
+ }
Properties props = new Properties();
setPropertiesFromFields(props, sd);
writeProperties(to, props);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
index 50363c9..28871e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
@@ -152,6 +152,9 @@ public class StorageInfo {
*/
protected void setFieldsFromProperties(
Properties props, StorageDirectory sd) throws IOException {
+ if (props == null) {
+ return;
+ }
setLayoutVersion(props, sd);
setNamespaceID(props, sd);
setcTime(props, sd);
@@ -241,6 +244,9 @@ public class StorageInfo {
}
public static Properties readPropertiesFile(File from) throws IOException {
+ if (from == null) {
+ return null;
+ }
RandomAccessFile file = new RandomAccessFile(from, "rws");
FileInputStream in = null;
Properties props = new Properties();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java
new file mode 100644
index 0000000..eacd08f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class is used for block maps stored as text files,
+ * with a specified delimiter.
+ */
+public class TextFileRegionFormat
+ extends BlockFormat<FileRegion> implements Configurable {
+
+ private Configuration conf;
+ private ReaderOptions readerOpts = TextReader.defaults();
+ private WriterOptions writerOpts = TextWriter.defaults();
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TextFileRegionFormat.class);
+ @Override
+ public void setConf(Configuration conf) {
+ readerOpts.setConf(conf);
+ writerOpts.setConf(conf);
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public Reader<FileRegion> getReader(Reader.Options opts)
+ throws IOException {
+ if (null == opts) {
+ opts = readerOpts;
+ }
+ if (!(opts instanceof ReaderOptions)) {
+ throw new IllegalArgumentException("Invalid options " + opts.getClass());
+ }
+ ReaderOptions o = (ReaderOptions) opts;
+ Configuration readerConf = (null == o.getConf())
+ ? new Configuration()
+ : o.getConf();
+ return createReader(o.file, o.delim, readerConf);
+ }
+
+ @VisibleForTesting
+ TextReader createReader(Path file, String delim, Configuration cfg)
+ throws IOException {
+ FileSystem fs = file.getFileSystem(cfg);
+ if (fs instanceof LocalFileSystem) {
+ fs = ((LocalFileSystem)fs).getRaw();
+ }
+ CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
+ CompressionCodec codec = factory.getCodec(file);
+ return new TextReader(fs, file, codec, delim);
+ }
+
+ @Override
+ public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException {
+ if (null == opts) {
+ opts = writerOpts;
+ }
+ if (!(opts instanceof WriterOptions)) {
+ throw new IllegalArgumentException("Invalid options " + opts.getClass());
+ }
+ WriterOptions o = (WriterOptions) opts;
+ Configuration cfg = (null == o.getConf())
+ ? new Configuration()
+ : o.getConf();
+ if (o.codec != null) {
+ CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
+ CompressionCodec codec = factory.getCodecByName(o.codec);
+ String name = o.file.getName() + codec.getDefaultExtension();
+ o.filename(new Path(o.file.getParent(), name));
+ return createWriter(o.file, codec, o.delim, cfg);
+ }
+ return createWriter(o.file, null, o.delim, conf);
+ }
+
+ @VisibleForTesting
+ TextWriter createWriter(Path file, CompressionCodec codec, String delim,
+ Configuration cfg) throws IOException {
+ FileSystem fs = file.getFileSystem(cfg);
+ if (fs instanceof LocalFileSystem) {
+ fs = ((LocalFileSystem)fs).getRaw();
+ }
+ OutputStream tmp = fs.create(file);
+ java.io.Writer out = new BufferedWriter(new OutputStreamWriter(
+ (null == codec) ? tmp : codec.createOutputStream(tmp), "UTF-8"));
+ return new TextWriter(out, delim);
+ }
+
+ /**
+ * Class specifying reader options for the {@link TextFileRegionFormat}.
+ */
+ public static class ReaderOptions
+ implements TextReader.Options, Configurable {
+
+ private Configuration conf;
+ private String delim =
+ DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT;
+ private Path file = new Path(
+ new File(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT)
+ .toURI().toString());
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ String tmpfile = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_READ_PATH,
+ DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
+ file = new Path(tmpfile);
+ delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER,
+ DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT);
+ LOG.info("TextFileRegionFormat: read path " + tmpfile.toString());
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public ReaderOptions filename(Path file) {
+ this.file = file;
+ return this;
+ }
+
+ @Override
+ public ReaderOptions delimiter(String delim) {
+ this.delim = delim;
+ return this;
+ }
+ }
+
+ /**
+ * Class specifying writer options for the {@link TextFileRegionFormat}.
+ */
+ public static class WriterOptions
+ implements TextWriter.Options, Configurable {
+
+ private Configuration conf;
+ private String codec = null;
+ private Path file =
+ new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
+ private String delim =
+ DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ String tmpfile = conf.get(
+ DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_WRITE_PATH, file.toString());
+ file = new Path(tmpfile);
+ codec = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_CODEC);
+ delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER,
+ DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public WriterOptions filename(Path file) {
+ this.file = file;
+ return this;
+ }
+
+ public String getCodec() {
+ return codec;
+ }
+
+ public Path getFile() {
+ return file;
+ }
+
+ @Override
+ public WriterOptions codec(String codec) {
+ this.codec = codec;
+ return this;
+ }
+
+ @Override
+ public WriterOptions delimiter(String delim) {
+ this.delim = delim;
+ return this;
+ }
+
+ }
+
+ /**
+ * This class is used as a reader for block maps which
+ * are stored as delimited text files.
+ */
+ public static class TextReader extends Reader<FileRegion> {
+
+ /**
+ * Options for {@link TextReader}.
+ */
+ public interface Options extends Reader.Options {
+ Options filename(Path file);
+ Options delimiter(String delim);
+ }
+
+ static ReaderOptions defaults() {
+ return new ReaderOptions();
+ }
+
+ private final Path file;
+ private final String delim;
+ private final FileSystem fs;
+ private final CompressionCodec codec;
+ private final Map<FRIterator, BufferedReader> iterators;
+
+ protected TextReader(FileSystem fs, Path file, CompressionCodec codec,
+ String delim) {
+ this(fs, file, codec, delim,
+ new IdentityHashMap<FRIterator, BufferedReader>());
+ }
+
+ TextReader(FileSystem fs, Path file, CompressionCodec codec, String delim,
+ Map<FRIterator, BufferedReader> iterators) {
+ this.fs = fs;
+ this.file = file;
+ this.codec = codec;
+ this.delim = delim;
+ this.iterators = Collections.synchronizedMap(iterators);
+ }
+
+ @Override
+ public FileRegion resolve(Block ident) throws IOException {
+ // consider layering index w/ composable format
+ Iterator<FileRegion> i = iterator();
+ try {
+ while (i.hasNext()) {
+ FileRegion f = i.next();
+ if (f.getBlock().equals(ident)) {
+ return f;
+ }
+ }
+ } finally {
+ BufferedReader r = iterators.remove(i);
+ if (r != null) {
+ // null on last element
+ r.close();
+ }
+ }
+ return null;
+ }
+
+ class FRIterator implements Iterator<FileRegion> {
+
+ private FileRegion pending;
+
+ @Override
+ public boolean hasNext() {
+ return pending != null;
+ }
+
+ @Override
+ public FileRegion next() {
+ if (null == pending) {
+ throw new NoSuchElementException();
+ }
+ FileRegion ret = pending;
+ try {
+ pending = nextInternal(this);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private FileRegion nextInternal(Iterator<FileRegion> i) throws IOException {
+ BufferedReader r = iterators.get(i);
+ if (null == r) {
+ throw new IllegalStateException();
+ }
+ String line = r.readLine();
+ if (null == line) {
+ iterators.remove(i);
+ return null;
+ }
+ String[] f = line.split(delim);
+ if (f.length != 6) {
+ throw new IOException("Invalid line: " + line);
+ }
+ return new FileRegion(Long.parseLong(f[0]), new Path(f[1]),
+ Long.parseLong(f[2]), Long.parseLong(f[3]), f[5],
+ Long.parseLong(f[4]));
+ }
+
+ public InputStream createStream() throws IOException {
+ InputStream i = fs.open(file);
+ if (codec != null) {
+ i = codec.createInputStream(i);
+ }
+ return i;
+ }
+
+ @Override
+ public Iterator<FileRegion> iterator() {
+ FRIterator i = new FRIterator();
+ try {
+ BufferedReader r =
+ new BufferedReader(new InputStreamReader(createStream(), "UTF-8"));
+ iterators.put(i, r);
+ i.pending = nextInternal(i);
+ } catch (IOException e) {
+ iterators.remove(i);
+ throw new RuntimeException(e);
+ }
+ return i;
+ }
+
+ @Override
+ public void close() throws IOException {
+ ArrayList<IOException> ex = new ArrayList<>();
+ synchronized (iterators) {
+ for (Iterator<BufferedReader> i = iterators.values().iterator();
+ i.hasNext();) {
+ try {
+ BufferedReader r = i.next();
+ r.close();
+ } catch (IOException e) {
+ ex.add(e);
+ } finally {
+ i.remove();
+ }
+ }
+ iterators.clear();
+ }
+ if (!ex.isEmpty()) {
+ throw MultipleIOException.createIOException(ex);
+ }
+ }
+
+ }
+
+ /**
+ * This class is used as a writer for block maps which
+ * are stored as delimited text files.
+ */
+ public static class TextWriter extends Writer<FileRegion> {
+
+ /**
+ * Interface for Writer options.
+ */
+ public interface Options extends Writer.Options {
+ Options codec(String codec);
+ Options filename(Path file);
+ Options delimiter(String delim);
+ }
+
+ public static WriterOptions defaults() {
+ return new WriterOptions();
+ }
+
+ private final String delim;
+ private final java.io.Writer out;
+
+ public TextWriter(java.io.Writer out, String delim) {
+ this.out = out;
+ this.delim = delim;
+ }
+
+ @Override
+ public void store(FileRegion token) throws IOException {
+ out.append(String.valueOf(token.getBlock().getBlockId())).append(delim);
+ out.append(token.getPath().toString()).append(delim);
+ out.append(Long.toString(token.getOffset())).append(delim);
+ out.append(Long.toString(token.getLength())).append(delim);
+ out.append(Long.toString(token.getGenerationStamp())).append(delim);
+ out.append(token.getBlockPoolId()).append("\n");
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+
+ }
+
+ @Override
+ public void refresh() throws IOException {
+ //nothing to do;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java
new file mode 100644
index 0000000..0fa667e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This class is used to read file regions from block maps
+ * specified using delimited text.
+ */
+public class TextFileRegionProvider
+ extends FileRegionProvider implements Configurable {
+
+ private Configuration conf;
+ private BlockFormat<FileRegion> fmt;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setConf(Configuration conf) {
+ fmt = ReflectionUtils.newInstance(
+ conf.getClass(DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
+ TextFileRegionFormat.class,
+ BlockFormat.class),
+ conf);
+ ((Configurable)fmt).setConf(conf); //redundant?
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public Iterator<FileRegion> iterator() {
+ try {
+ final BlockFormat.Reader<FileRegion> r = fmt.getReader(null);
+ return new Iterator<FileRegion>() {
+
+ private final Iterator<FileRegion> inner = r.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+
+ @Override
+ public FileRegion next() {
+ return inner.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read provided blocks", e);
+ }
+ }
+
+ @Override
+ public void refresh() throws IOException {
+ fmt.refresh();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index bc41715..012d1f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@@ -360,6 +361,9 @@ public class BlockPoolSliceStorage extends Storage {
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
StartupOption startOpt, List<Callable<StorageDirectory>> callables,
Configuration conf) throws IOException {
+ if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
+ return false; // regular startup for PROVIDED storage directories
+ }
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
Preconditions.checkState(!getTrashRootDir(sd).exists(),
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
@@ -439,6 +443,10 @@ public class BlockPoolSliceStorage extends Storage {
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
return;
}
+ //no upgrades for storage directories that are PROVIDED
+ if (bpSd.getRoot() == null) {
+ return;
+ }
final int oldLV = getLayoutVersion();
LOG.info("Upgrading block pool storage directory " + bpSd.getRoot()
+ ".\n old LV = " + oldLV
@@ -589,8 +597,9 @@ public class BlockPoolSliceStorage extends Storage {
throws IOException {
File prevDir = bpSd.getPreviousDir();
// regular startup if previous dir does not exist
- if (!prevDir.exists())
+ if (prevDir == null || !prevDir.exists()) {
return;
+ }
// read attributes out of the VERSION file of previous directory
BlockPoolSliceStorage prevInfo = new BlockPoolSliceStorage();
prevInfo.readPreviousVersionProperties(bpSd);
@@ -631,6 +640,10 @@ public class BlockPoolSliceStorage extends Storage {
* that holds the snapshot.
*/
void doFinalize(File dnCurDir) throws IOException {
+ LOG.info("doFinalize: " + dnCurDir);
+ if (dnCurDir == null) {
+ return; //we do nothing if the directory is null
+ }
File bpRoot = getBpRoot(blockpoolID, dnCurDir);
StorageDirectory bpSd = new StorageDirectory(bpRoot);
// block pool level previous directory
@@ -841,6 +854,9 @@ public class BlockPoolSliceStorage extends Storage {
public void setRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
throws IOException {
for (StorageDirectory sd : dnStorageDirs) {
+ if (sd.getCurrentDir() == null) {
+ return;
+ }
File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
if (!storagesWithRollingUpgradeMarker.contains(bpRoot.toString())) {
@@ -863,6 +879,9 @@ public class BlockPoolSliceStorage extends Storage {
public void clearRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
throws IOException {
for (StorageDirectory sd : dnStorageDirs) {
+ if (sd.getCurrentDir() == null) {
+ continue;
+ }
File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
if (!storagesWithoutRollingUpgradeMarker.contains(bpRoot.toString())) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 9a71081..2153036 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -129,22 +130,31 @@ public class DataStorage extends Storage {
this.datanodeUuid = newDatanodeUuid;
}
- private static boolean createStorageID(StorageDirectory sd, int lv) {
+ private static boolean createStorageID(StorageDirectory sd, int lv,
+ Configuration conf) {
// Clusters previously upgraded from layout versions earlier than
// ADD_DATANODE_AND_STORAGE_UUIDS failed to correctly generate a
// new storage ID. We check for that and fix it now.
final boolean haveValidStorageId = DataNodeLayoutVersion.supports(
LayoutVersion.Feature.ADD_DATANODE_AND_STORAGE_UUIDS, lv)
&& DatanodeStorage.isValidStorageId(sd.getStorageUuid());
- return createStorageID(sd, !haveValidStorageId);
+ return createStorageID(sd, !haveValidStorageId, conf);
}
/** Create an ID for this storage.
* @return true if a new storage ID was generated.
* */
public static boolean createStorageID(
- StorageDirectory sd, boolean regenerateStorageIds) {
+ StorageDirectory sd, boolean regenerateStorageIds, Configuration conf) {
final String oldStorageID = sd.getStorageUuid();
+ if (sd.getStorageLocation() != null &&
+ sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
+ // We only support one provided storage per datanode for now.
+ // TODO support multiple provided storage ids per datanode.
+ sd.setStorageUuid(conf.get(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
+ DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT));
+ return false;
+ }
if (oldStorageID == null || regenerateStorageIds) {
sd.setStorageUuid(DatanodeStorage.generateUuid());
LOG.info("Generated new storageID " + sd.getStorageUuid() +
@@ -273,7 +283,7 @@ public class DataStorage extends Storage {
LOG.info("Storage directory with location " + location
+ " is not formatted for namespace " + nsInfo.getNamespaceID()
+ ". Formatting...");
- format(sd, nsInfo, datanode.getDatanodeUuid());
+ format(sd, nsInfo, datanode.getDatanodeUuid(), datanode.getConf());
break;
default: // recovery part is common
sd.doRecover(curState);
@@ -547,15 +557,15 @@ public class DataStorage extends Storage {
}
void format(StorageDirectory sd, NamespaceInfo nsInfo,
- String datanodeUuid) throws IOException {
+ String newDatanodeUuid, Configuration conf) throws IOException {
sd.clearDirectory(); // create directory
this.layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION;
this.clusterID = nsInfo.getClusterID();
this.namespaceID = nsInfo.getNamespaceID();
this.cTime = 0;
- setDatanodeUuid(datanodeUuid);
+ setDatanodeUuid(newDatanodeUuid);
- createStorageID(sd, false);
+ createStorageID(sd, false, conf);
writeProperties(sd);
}
@@ -600,6 +610,9 @@ public class DataStorage extends Storage {
private void setFieldsFromProperties(Properties props, StorageDirectory sd,
boolean overrideLayoutVersion, int toLayoutVersion) throws IOException {
+ if (props == null) {
+ return;
+ }
if (overrideLayoutVersion) {
this.layoutVersion = toLayoutVersion;
} else {
@@ -694,6 +707,10 @@ public class DataStorage extends Storage {
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
StartupOption startOpt, List<Callable<StorageDirectory>> callables,
Configuration conf) throws IOException {
+ if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
+ createStorageID(sd, layoutVersion, conf);
+ return false; // regular start up for PROVIDED storage directories
+ }
if (startOpt == StartupOption.ROLLBACK) {
doRollback(sd, nsInfo); // rollback if applicable
}
@@ -724,7 +741,7 @@ public class DataStorage extends Storage {
// regular start up.
if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
- createStorageID(sd, layoutVersion);
+ createStorageID(sd, layoutVersion, conf);
return false; // need to write properties
}
@@ -733,7 +750,7 @@ public class DataStorage extends Storage {
if (federationSupported) {
// If the existing on-disk layout version supports federation,
// simply update the properties.
- upgradeProperties(sd);
+ upgradeProperties(sd, conf);
} else {
doUpgradePreFederation(sd, nsInfo, callables, conf);
}
@@ -829,15 +846,16 @@ public class DataStorage extends Storage {
// 4. Write version file under <SD>/current
clusterID = nsInfo.getClusterID();
- upgradeProperties(sd);
+ upgradeProperties(sd, conf);
// 5. Rename <SD>/previous.tmp to <SD>/previous
rename(tmpDir, prevDir);
LOG.info("Upgrade of " + sd.getRoot()+ " is complete");
}
- void upgradeProperties(StorageDirectory sd) throws IOException {
- createStorageID(sd, layoutVersion);
+ void upgradeProperties(StorageDirectory sd, Configuration conf)
+ throws IOException {
+ createStorageID(sd, layoutVersion, conf);
LOG.info("Updating layout version from " + layoutVersion
+ " to " + HdfsServerConstants.DATANODE_LAYOUT_VERSION
+ " for storage " + sd.getRoot());
@@ -989,7 +1007,7 @@ public class DataStorage extends Storage {
// then finalize it. Else finalize the corresponding BP.
for (StorageDirectory sd : getStorageDirs()) {
File prevDir = sd.getPreviousDir();
- if (prevDir.exists()) {
+ if (prevDir != null && prevDir.exists()) {
// data node level storage finalize
doFinalize(sd);
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 18188dd..655b5e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -105,7 +106,7 @@ public class DirectoryScanner implements Runnable {
* @param b whether to retain diffs
*/
@VisibleForTesting
- void setRetainDiffs(boolean b) {
+ public void setRetainDiffs(boolean b) {
retainDiffs = b;
}
@@ -215,7 +216,8 @@ public class DirectoryScanner implements Runnable {
* @param dataset the dataset to scan
* @param conf the Configuration object
*/
- DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
+ public DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset,
+ Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;
int interval = (int) conf.getTimeDuration(
@@ -369,15 +371,14 @@ public class DirectoryScanner implements Runnable {
* Reconcile differences between disk and in-memory blocks
*/
@VisibleForTesting
- void reconcile() throws IOException {
+ public void reconcile() throws IOException {
scan();
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
String bpid = entry.getKey();
LinkedList<ScanInfo> diff = entry.getValue();
for (ScanInfo info : diff) {
- dataset.checkAndUpdate(bpid, info.getBlockId(), info.getBlockFile(),
- info.getMetaFile(), info.getVolume());
+ dataset.checkAndUpdate(bpid, info);
}
}
if (!retainDiffs) clear();
@@ -429,11 +430,12 @@ public class DirectoryScanner implements Runnable {
}
// Block file and/or metadata file exists on the disk
// Block exists in memory
- if (info.getBlockFile() == null) {
+ if (info.getVolume().getStorageType() != StorageType.PROVIDED &&
+ info.getBlockFile() == null) {
// Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
- || info.getBlockFileLength() != memBlock.getNumBytes()) {
+ || info.getBlockLength() != memBlock.getNumBytes()) {
// Block metadata file is missing or has wrong generation stamp,
// or block file length is different than expected
statsRecord.mismatchBlocks++;
@@ -609,8 +611,8 @@ public class DirectoryScanner implements Runnable {
for (String bpid : bpList) {
LinkedList<ScanInfo> report = new LinkedList<>();
- perfTimer.start();
- throttleTimer.start();
+ perfTimer.reset().start();
+ throttleTimer.reset().start();
try {
result.put(bpid, volume.compileReport(bpid, report, this));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java
new file mode 100644
index 0000000..722d573
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+
+/**
+ * This class is used for provided replicas that are finalized.
+ */
+public class FinalizedProvidedReplica extends ProvidedReplica {
+
+ public FinalizedProvidedReplica(long blockId, URI fileURI,
+ long fileOffset, long blockLen, long genStamp,
+ FsVolumeSpi volume, Configuration conf) {
+ super(blockId, fileURI, fileOffset, blockLen, genStamp, volume, conf);
+ }
+
+ @Override
+ public ReplicaState getState() {
+ return ReplicaState.FINALIZED;
+ }
+
+ @Override
+ public long getBytesOnDisk() {
+ return getNumBytes();
+ }
+
+ @Override
+ public long getVisibleLength() {
+ return getNumBytes(); //all bytes are visible
+ }
+
+ @Override // Object
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override // Object
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return super.toString();
+ }
+
+ @Override
+ public ReplicaInfo getOriginalReplica() {
+ throw new UnsupportedOperationException("Replica of type " + getState() +
+ " does not support getOriginalReplica");
+ }
+
+ @Override
+ public long getRecoveryID() {
+ throw new UnsupportedOperationException("Replica of type " + getState() +
+ " does not support getRecoveryID");
+ }
+
+ @Override
+ public void setRecoveryID(long recoveryId) {
+ throw new UnsupportedOperationException("Replica of type " + getState() +
+ " does not support setRecoveryID");
+ }
+
+ @Override
+ public ReplicaRecoveryInfo createInfo() {
+ throw new UnsupportedOperationException("Replica of type " + getState() +
+ " does not support createInfo");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
new file mode 100644
index 0000000..b021ea2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This abstract class is used as a base class for provided replicas.
+ */
+public abstract class ProvidedReplica extends ReplicaInfo {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(ProvidedReplica.class);
+
+ // Null checksum information for provided replicas.
+ // Shared across all replicas.
+ static final byte[] NULL_CHECKSUM_ARRAY =
+ FsDatasetUtil.createNullChecksumByteArray();
+ private URI fileURI;
+ private long fileOffset;
+ private Configuration conf;
+ private FileSystem remoteFS;
+
+ /**
+ * Constructor.
+ * @param blockId block id
+ * @param fileURI remote URI this block is to be read from
+ * @param fileOffset the offset in the remote URI
+ * @param blockLen the length of the block
+ * @param genStamp the generation stamp of the block
+ * @param volume the volume this block belongs to
+ */
+ public ProvidedReplica(long blockId, URI fileURI, long fileOffset,
+ long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf) {
+ super(volume, blockId, blockLen, genStamp);
+ this.fileURI = fileURI;
+ this.fileOffset = fileOffset;
+ this.conf = conf;
+ try {
+ this.remoteFS = FileSystem.get(fileURI, this.conf);
+ } catch (IOException e) {
+ LOG.warn("Failed to obtain filesystem for " + fileURI);
+ this.remoteFS = null;
+ }
+ }
+
+ public ProvidedReplica(ProvidedReplica r) {
+ super(r);
+ this.fileURI = r.fileURI;
+ this.fileOffset = r.fileOffset;
+ this.conf = r.conf;
+ try {
+ this.remoteFS = FileSystem.newInstance(fileURI, this.conf);
+ } catch (IOException e) {
+ this.remoteFS = null;
+ }
+ }
+
+ @Override
+ public URI getBlockURI() {
+ return this.fileURI;
+ }
+
+ @Override
+ public InputStream getDataInputStream(long seekOffset) throws IOException {
+ if (remoteFS != null) {
+ FSDataInputStream ins = remoteFS.open(new Path(fileURI));
+ ins.seek(fileOffset + seekOffset);
+ return new FSDataInputStream(ins);
+ } else {
+ throw new IOException("Remote filesystem for provided replica " + this +
+ " does not exist");
+ }
+ }
+
+ @Override
+ public OutputStream getDataOutputStream(boolean append) throws IOException {
+ throw new UnsupportedOperationException(
+ "OutputDataStream is not implemented for ProvidedReplica");
+ }
+
+ @Override
+ public URI getMetadataURI() {
+ return null;
+ }
+
+ @Override
+ public OutputStream getMetadataOutputStream(boolean append)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean blockDataExists() {
+ if(remoteFS != null) {
+ try {
+ return remoteFS.exists(new Path(fileURI));
+ } catch (IOException e) {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean deleteBlockData() {
+ throw new UnsupportedOperationException(
+ "ProvidedReplica does not support deleting block data");
+ }
+
+ @Override
+ public long getBlockDataLength() {
+ return this.getNumBytes();
+ }
+
+ @Override
+ public LengthInputStream getMetadataInputStream(long offset)
+ throws IOException {
+ return new LengthInputStream(new ByteArrayInputStream(NULL_CHECKSUM_ARRAY),
+ NULL_CHECKSUM_ARRAY.length);
+ }
+
+ @Override
+ public boolean metadataExists() {
+ return NULL_CHECKSUM_ARRAY == null ? false : true;
+ }
+
+ @Override
+ public boolean deleteMetadata() {
+ throw new UnsupportedOperationException(
+ "ProvidedReplica does not support deleting metadata");
+ }
+
+ @Override
+ public long getMetadataLength() {
+ return NULL_CHECKSUM_ARRAY == null ? 0 : NULL_CHECKSUM_ARRAY.length;
+ }
+
+ @Override
+ public boolean renameMeta(URI destURI) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedReplica does not support renaming metadata");
+ }
+
+ @Override
+ public boolean renameData(URI destURI) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedReplica does not support renaming data");
+ }
+
+ @Override
+ public boolean getPinning(LocalFileSystem localFS) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setPinning(LocalFileSystem localFS) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedReplica does not support pinning");
+ }
+
+ @Override
+ public void bumpReplicaGS(long newGS) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedReplica does not yet support writes");
+ }
+
+ @Override
+ public boolean breakHardLinksIfNeeded() throws IOException {
+ return false;
+ }
+
+ @Override
+ public ReplicaRecoveryInfo createInfo()
+ throws UnsupportedOperationException {
+ throw new UnsupportedOperationException(
+ "ProvidedReplica does not yet support writes");
+ }
+
+ @Override
+ public int compareWith(ScanInfo info) {
+ //local scanning cannot find any provided blocks.
+ if (info.getFileRegion().equals(
+ new FileRegion(this.getBlockId(), new Path(fileURI),
+ fileOffset, this.getNumBytes(), this.getGenerationStamp()))) {
+ return 0;
+ } else {
+ return (int) (info.getBlockLength() - getNumBytes());
+ }
+ }
+
+ @Override
+ public void truncateBlock(long newLength) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedReplica does not yet support truncate");
+ }
+
+ @Override
+ public void updateWithReplica(StorageLocation replicaLocation) {
+ throw new UnsupportedOperationException(
+ "ProvidedReplica does not yet support update");
+ }
+
+ @Override
+ public void copyMetadata(URI destination) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedReplica does not yet support copy metadata");
+ }
+
+ @Override
+ public void copyBlockdata(URI destination) throws IOException {
+ throw new UnsupportedOperationException(
+ "ProvidedReplica does not yet support copy data");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
index 280aaa0..639467f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
@@ -18,9 +18,13 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
+import java.net.URI;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
/**
@@ -42,11 +46,20 @@ public class ReplicaBuilder {
private ReplicaInfo fromReplica;
+ private URI uri;
+ private long offset;
+ private Configuration conf;
+ private FileRegion fileRegion;
+
public ReplicaBuilder(ReplicaState state) {
volume = null;
writer = null;
block = null;
length = -1;
+ fileRegion = null;
+ conf = null;
+ fromReplica = null;
+ uri = null;
this.state = state;
}
@@ -105,6 +118,26 @@ public class ReplicaBuilder {
return this;
}
+ public ReplicaBuilder setURI(URI uri) {
+ this.uri = uri;
+ return this;
+ }
+
+ public ReplicaBuilder setConf(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public ReplicaBuilder setOffset(long offset) {
+ this.offset = offset;
+ return this;
+ }
+
+ public ReplicaBuilder setFileRegion(FileRegion fileRegion) {
+ this.fileRegion = fileRegion;
+ return this;
+ }
+
public LocalReplicaInPipeline buildLocalReplicaInPipeline()
throws IllegalArgumentException {
LocalReplicaInPipeline info = null;
@@ -176,7 +209,7 @@ public class ReplicaBuilder {
}
}
- private ReplicaInfo buildFinalizedReplica() throws IllegalArgumentException {
+ private LocalReplica buildFinalizedReplica() throws IllegalArgumentException {
if (null != fromReplica &&
fromReplica.getState() == ReplicaState.FINALIZED) {
return new FinalizedReplica((FinalizedReplica)fromReplica);
@@ -193,7 +226,7 @@ public class ReplicaBuilder {
}
}
- private ReplicaInfo buildRWR() throws IllegalArgumentException {
+ private LocalReplica buildRWR() throws IllegalArgumentException {
if (null != fromReplica && fromReplica.getState() == ReplicaState.RWR) {
return new ReplicaWaitingToBeRecovered(
@@ -211,7 +244,7 @@ public class ReplicaBuilder {
}
}
- private ReplicaInfo buildRUR() throws IllegalArgumentException {
+ private LocalReplica buildRUR() throws IllegalArgumentException {
if (null == fromReplica) {
throw new IllegalArgumentException(
"Missing a valid replica to recover from");
@@ -228,8 +261,53 @@ public class ReplicaBuilder {
}
}
- public ReplicaInfo build() throws IllegalArgumentException {
- ReplicaInfo info = null;
+ private ProvidedReplica buildProvidedFinalizedReplica()
+ throws IllegalArgumentException {
+ ProvidedReplica info = null;
+ if (fromReplica != null) {
+ throw new IllegalArgumentException("Finalized PROVIDED replica " +
+ "cannot be constructed from another replica");
+ }
+ if (fileRegion == null && uri == null) {
+ throw new IllegalArgumentException(
+ "Trying to construct a provided replica on " + volume +
+ " without enough information");
+ }
+ if (fileRegion == null) {
+ info = new FinalizedProvidedReplica(blockId, uri, offset,
+ length, genStamp, volume, conf);
+ } else {
+ info = new FinalizedProvidedReplica(fileRegion.getBlock().getBlockId(),
+ fileRegion.getPath().toUri(),
+ fileRegion.getOffset(),
+ fileRegion.getBlock().getNumBytes(),
+ fileRegion.getBlock().getGenerationStamp(),
+ volume, conf);
+ }
+ return info;
+ }
+
+ private ProvidedReplica buildProvidedReplica()
+ throws IllegalArgumentException {
+ ProvidedReplica info = null;
+ switch(this.state) {
+ case FINALIZED:
+ info = buildProvidedFinalizedReplica();
+ break;
+ case RWR:
+ case RUR:
+ case RBW:
+ case TEMPORARY:
+ default:
+ throw new IllegalArgumentException("Unknown replica state " +
+ state + " for PROVIDED replica");
+ }
+ return info;
+ }
+
+ private LocalReplica buildLocalReplica()
+ throws IllegalArgumentException {
+ LocalReplica info = null;
switch(this.state) {
case FINALIZED:
info = buildFinalizedReplica();
@@ -249,4 +327,16 @@ public class ReplicaBuilder {
}
return info;
}
+
+ public ReplicaInfo build() throws IllegalArgumentException {
+
+ ReplicaInfo info = null;
+ if(volume != null && volume.getStorageType() == StorageType.PROVIDED) {
+ info = buildProvidedReplica();
+ } else {
+ info = buildLocalReplica();
+ }
+
+ return info;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index 65e9ba7..3718799 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -50,6 +50,17 @@ abstract public class ReplicaInfo extends Block
new FileIoProvider(null, null);
/**
+ * Constructor.
+ * @param block a block
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ ReplicaInfo(Block block, FsVolumeSpi vol) {
+ this(vol, block.getBlockId(), block.getNumBytes(),
+ block.getGenerationStamp());
+ }
+
+ /**
* Constructor
* @param vol volume where replica is located
* @param blockId block id
@@ -62,7 +73,14 @@ abstract public class ReplicaInfo extends Block
}
/**
- * Get the volume where this replica is located on disk.
+ * Copy constructor.
+ * @param from where to copy from
+ */
+ ReplicaInfo(ReplicaInfo from) {
+ this(from, from.getVolume());
+ }
+
+ /**
* @return the volume where this replica is located on disk
*/
public FsVolumeSpi getVolume() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
index b4d5794..fb7acfd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
@@ -98,6 +98,16 @@ public class StorageLocation
public boolean matchesStorageDirectory(StorageDirectory sd,
String bpid) throws IOException {
+ if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED &&
+ storageType == StorageType.PROVIDED) {
+ return matchesStorageDirectory(sd);
+ }
+ if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED ||
+ storageType == StorageType.PROVIDED) {
+ //only one of these is PROVIDED; so it cannot be a match!
+ return false;
+ }
+ //both storage directories are local
return this.getBpURI(bpid, Storage.STORAGE_DIR_CURRENT).normalize()
.equals(sd.getRoot().toURI().normalize());
}
@@ -197,6 +207,10 @@ public class StorageLocation
if (conf == null) {
conf = new HdfsConfiguration();
}
+ if (storageType == StorageType.PROVIDED) {
+ //skip creation if the storage type is PROVIDED
+ return;
+ }
LocalFileSystem localFS = FileSystem.getLocal(conf);
FsPermission permission = new FsPermission(conf.get(
@@ -213,10 +227,14 @@ public class StorageLocation
@Override // Checkable
public VolumeCheckResult check(CheckContext context) throws IOException {
- DiskChecker.checkDir(
- context.localFileSystem,
- new Path(baseURI),
- context.expectedPermission);
+ //we assume provided storage locations are always healthy,
+ //and check only for local storages.
+ if (storageType != StorageType.PROVIDED) {
+ DiskChecker.checkDir(
+ context.localFileSystem,
+ new Path(baseURI),
+ context.expectedPermission);
+ }
return VolumeCheckResult.HEALTHY;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68e3c683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index d7e29cf..5a40847 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -252,8 +253,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* and, in case that they are not matched, update the record or mark it
* as corrupted.
*/
- void checkAndUpdate(String bpid, long blockId, File diskFile,
- File diskMetaFile, FsVolumeSpi vol) throws IOException;
+ void checkAndUpdate(String bpid, ScanInfo info) throws IOException;
/**
* @param b - the block
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org