You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2017/12/15 18:39:10 UTC
[24/50] [abbrv] hadoop git commit: HDFS-11902. [READ] Merge
BlockFormatProvider and FileRegionProvider.
HDFS-11902. [READ] Merge BlockFormatProvider and FileRegionProvider.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5ae7ea55
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5ae7ea55
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5ae7ea55
Branch: refs/heads/HDFS-9806
Commit: 5ae7ea559f0a8a10f13b5d3ee7704f3cc78a2ab7
Parents: 9286e66
Author: Virajith Jalaparti <vi...@apache.org>
Authored: Fri Nov 3 13:45:56 2017 -0700
Committer: Virajith Jalaparti <vi...@apache.org>
Committed: Fri Dec 15 10:18:27 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 17 +-
.../blockmanagement/BlockFormatProvider.java | 91 ----
.../server/blockmanagement/BlockProvider.java | 75 ----
.../blockmanagement/ProvidedStorageMap.java | 63 ++-
.../hadoop/hdfs/server/common/BlockFormat.java | 82 ----
.../hdfs/server/common/FileRegionProvider.java | 37 --
.../server/common/TextFileRegionFormat.java | 442 ------------------
.../server/common/TextFileRegionProvider.java | 88 ----
.../common/blockaliasmap/BlockAliasMap.java | 88 ++++
.../impl/TextFileRegionAliasMap.java | 445 +++++++++++++++++++
.../common/blockaliasmap/package-info.java | 27 ++
.../fsdataset/impl/ProvidedVolumeImpl.java | 76 ++--
.../src/main/resources/hdfs-default.xml | 34 +-
.../blockmanagement/TestProvidedStorageMap.java | 41 +-
.../hdfs/server/common/TestTextBlockFormat.java | 160 -------
.../impl/TestTextBlockAliasMap.java | 161 +++++++
.../fsdataset/impl/TestProvidedImpl.java | 75 ++--
.../hdfs/server/namenode/FileSystemImage.java | 4 +-
.../hdfs/server/namenode/ImageWriter.java | 25 +-
.../hdfs/server/namenode/NullBlockAliasMap.java | 86 ++++
.../hdfs/server/namenode/NullBlockFormat.java | 87 ----
.../hadoop/hdfs/server/namenode/TreePath.java | 8 +-
.../TestNameNodeProvidedImplementation.java | 25 +-
23 files changed, 994 insertions(+), 1243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 7449987..cb57675 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -331,22 +331,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_PROVIDED_ENABLED = "dfs.namenode.provided.enabled";
public static final boolean DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT = false;
- public static final String DFS_NAMENODE_BLOCK_PROVIDER_CLASS = "dfs.namenode.block.provider.class";
-
- public static final String DFS_PROVIDER_CLASS = "dfs.provider.class";
public static final String DFS_PROVIDER_DF_CLASS = "dfs.provided.df.class";
public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id";
public static final String DFS_PROVIDER_STORAGEUUID_DEFAULT = "DS-PROVIDED";
- public static final String DFS_PROVIDER_BLK_FORMAT_CLASS = "dfs.provided.blockformat.class";
+ public static final String DFS_PROVIDED_ALIASMAP_CLASS = "dfs.provided.aliasmap.class";
- public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER = "dfs.provided.textprovider.delimiter";
- public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT = ",";
+ public static final String DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER = "dfs.provided.aliasmap.text.delimiter";
+ public static final String DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT = ",";
- public static final String DFS_PROVIDED_BLOCK_MAP_READ_PATH = "dfs.provided.textprovider.read.path";
- public static final String DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT = "file:///tmp/blocks.csv";
+ public static final String DFS_PROVIDED_ALIASMAP_TEXT_READ_PATH = "dfs.provided.aliasmap.text.read.path";
+ public static final String DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT = "file:///tmp/blocks.csv";
- public static final String DFS_PROVIDED_BLOCK_MAP_CODEC = "dfs.provided.textprovider.read.codec";
- public static final String DFS_PROVIDED_BLOCK_MAP_WRITE_PATH = "dfs.provided.textprovider.write.path";
+ public static final String DFS_PROVIDED_ALIASMAP_TEXT_CODEC = "dfs.provided.aliasmap.text.codec";
+ public static final String DFS_PROVIDED_ALIASMAP_TEXT_WRITE_PATH = "dfs.provided.aliasmap.text.write.path";
public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java
deleted file mode 100644
index 930263d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.common.BlockAlias;
-import org.apache.hadoop.hdfs.server.common.BlockFormat;
-import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Loads provided blocks from a {@link BlockFormat}.
- */
-public class BlockFormatProvider extends BlockProvider
- implements Configurable {
-
- private Configuration conf;
- private BlockFormat<? extends BlockAlias> blockFormat;
- public static final Logger LOG =
- LoggerFactory.getLogger(BlockFormatProvider.class);
-
- @Override
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public void setConf(Configuration conf) {
- Class<? extends BlockFormat> c = conf.getClass(
- DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
- TextFileRegionFormat.class, BlockFormat.class);
- blockFormat = ReflectionUtils.newInstance(c, conf);
- LOG.info("Loaded BlockFormat class : " + c.getClass().getName());
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public Iterator<Block> iterator() {
- try {
- final BlockFormat.Reader<? extends BlockAlias> reader =
- blockFormat.getReader(null);
-
- return new Iterator<Block>() {
-
- private final Iterator<? extends BlockAlias> inner = reader.iterator();
-
- @Override
- public boolean hasNext() {
- return inner.hasNext();
- }
-
- @Override
- public Block next() {
- return inner.next().getBlock();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- } catch (IOException e) {
- throw new RuntimeException("Failed to read provided blocks", e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java
deleted file mode 100644
index 2214868..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import java.io.IOException;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap.ProvidedBlockList;
-import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.util.RwLock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to load provided blocks in the {@link BlockManager}.
- */
-public abstract class BlockProvider implements Iterable<Block> {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(ProvidedStorageMap.class);
-
- private RwLock lock;
- private BlockManager bm;
- private DatanodeStorageInfo storage;
- private boolean hasDNs = false;
-
- /**
- * @param lock the namesystem lock
- * @param bm block manager
- * @param storage storage for provided blocks
- */
- void init(RwLock lock, BlockManager bm, DatanodeStorageInfo storage) {
- this.bm = bm;
- this.lock = lock;
- this.storage = storage;
- }
-
- /**
- * start the processing of block report for provided blocks.
- * @throws IOException
- */
- void start(BlockReportContext context) throws IOException {
- assert lock.hasWriteLock() : "Not holding write lock";
- if (hasDNs) {
- return;
- }
- if (storage.getBlockReportCount() == 0) {
- LOG.info("Calling process first blk report from storage: " + storage);
- // first pass; periodic refresh should call bm.processReport
- bm.processFirstBlockReport(storage, new ProvidedBlockList(iterator()));
- } else {
- bm.processReport(storage, new ProvidedBlockList(iterator()), context);
- }
- hasDNs = true;
- }
-
- void stop() {
- assert lock.hasWriteLock() : "Not holding write lock";
- hasDNs = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
index 5717e0c..a848d50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
@@ -40,7 +40,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
+import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.common.BlockAlias;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.util.RwLock;
@@ -61,7 +64,11 @@ public class ProvidedStorageMap {
LoggerFactory.getLogger(ProvidedStorageMap.class);
// limit to a single provider for now
- private final BlockProvider blockProvider;
+ private RwLock lock;
+ private BlockManager bm;
+ private boolean hasDNs = false;
+ private BlockAliasMap aliasMap;
+
private final String storageId;
private final ProvidedDescriptor providedDescriptor;
private final DatanodeStorageInfo providedStorageInfo;
@@ -79,7 +86,7 @@ public class ProvidedStorageMap {
if (!providedEnabled) {
// disable mapping
- blockProvider = null;
+ aliasMap = null;
providedDescriptor = null;
providedStorageInfo = null;
return;
@@ -90,15 +97,17 @@ public class ProvidedStorageMap {
providedDescriptor = new ProvidedDescriptor();
providedStorageInfo = providedDescriptor.createProvidedStorage(ds);
+ this.bm = bm;
+ this.lock = lock;
+
// load block reader into storage
- Class<? extends BlockProvider> fmt = conf.getClass(
- DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS,
- BlockFormatProvider.class, BlockProvider.class);
-
- blockProvider = ReflectionUtils.newInstance(fmt, conf);
- blockProvider.init(lock, bm, providedStorageInfo);
- LOG.info("Loaded block provider class: " +
- blockProvider.getClass() + " storage: " + providedStorageInfo);
+ Class<? extends BlockAliasMap> aliasMapClass = conf.getClass(
+ DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
+ TextFileRegionAliasMap.class, BlockAliasMap.class);
+ aliasMap = ReflectionUtils.newInstance(aliasMapClass, conf);
+
+ LOG.info("Loaded alias map class: " +
+ aliasMap.getClass() + " storage: " + providedStorageInfo);
}
/**
@@ -114,8 +123,7 @@ public class ProvidedStorageMap {
BlockReportContext context) throws IOException {
if (providedEnabled && storageId.equals(s.getStorageID())) {
if (StorageType.PROVIDED.equals(s.getStorageType())) {
- // poll service, initiate
- blockProvider.start(context);
+ processProvidedStorageReport(context);
dn.injectStorage(providedStorageInfo);
return providedDescriptor.getProvidedStorage(dn, s);
}
@@ -124,6 +132,26 @@ public class ProvidedStorageMap {
return dn.getStorageInfo(s.getStorageID());
}
+ private void processProvidedStorageReport(BlockReportContext context)
+ throws IOException {
+ assert lock.hasWriteLock() : "Not holding write lock";
+ if (hasDNs) {
+ return;
+ }
+ if (providedStorageInfo.getBlockReportCount() == 0) {
+ LOG.info("Calling process first blk report from storage: "
+ + providedStorageInfo);
+ // first pass; periodic refresh should call bm.processReport
+ bm.processFirstBlockReport(providedStorageInfo,
+ new ProvidedBlockList(aliasMap.getReader(null).iterator()));
+ } else {
+ bm.processReport(providedStorageInfo,
+ new ProvidedBlockList(aliasMap.getReader(null).iterator()),
+ context);
+ }
+ hasDNs = true;
+ }
+
@VisibleForTesting
public DatanodeStorageInfo getProvidedStorageInfo() {
return providedStorageInfo;
@@ -137,10 +165,11 @@ public class ProvidedStorageMap {
}
public void removeDatanode(DatanodeDescriptor dnToRemove) {
- if (providedDescriptor != null) {
+ if (providedEnabled) {
+ assert lock.hasWriteLock() : "Not holding write lock";
int remainingDatanodes = providedDescriptor.remove(dnToRemove);
if (remainingDatanodes == 0) {
- blockProvider.stop();
+ hasDNs = false;
}
}
}
@@ -443,9 +472,9 @@ public class ProvidedStorageMap {
*/
static class ProvidedBlockList extends BlockListAsLongs {
- private final Iterator<Block> inner;
+ private final Iterator<BlockAlias> inner;
- ProvidedBlockList(Iterator<Block> inner) {
+ ProvidedBlockList(Iterator<BlockAlias> inner) {
this.inner = inner;
}
@@ -454,7 +483,7 @@ public class ProvidedStorageMap {
return new Iterator<BlockReportReplica>() {
@Override
public BlockReportReplica next() {
- return new BlockReportReplica(inner.next());
+ return new BlockReportReplica(inner.next().getBlock());
}
@Override
public boolean hasNext() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java
deleted file mode 100644
index 66e7fdf..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.common;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.hadoop.hdfs.protocol.Block;
-
-/**
- * An abstract class used to read and write block maps for provided blocks.
- */
-public abstract class BlockFormat<T extends BlockAlias> {
-
- /**
- * An abstract class that is used to read {@link BlockAlias}es
- * for provided blocks.
- */
- public static abstract class Reader<U extends BlockAlias>
- implements Iterable<U>, Closeable {
-
- /**
- * reader options.
- */
- public interface Options { }
-
- public abstract U resolve(Block ident) throws IOException;
-
- }
-
- /**
- * Returns the reader for the provided block map.
- * @param opts reader options
- * @return {@link Reader} to the block map.
- * @throws IOException
- */
- public abstract Reader<T> getReader(Reader.Options opts) throws IOException;
-
- /**
- * An abstract class used as a writer for the provided block map.
- */
- public static abstract class Writer<U extends BlockAlias>
- implements Closeable {
- /**
- * writer options.
- */
- public interface Options { }
-
- public abstract void store(U token) throws IOException;
-
- }
-
- /**
- * Returns the writer for the provided block map.
- * @param opts writer options.
- * @return {@link Writer} to the block map.
- * @throws IOException
- */
- public abstract Writer<T> getWriter(Writer.Options opts) throws IOException;
-
- /**
- * Refresh based on the underlying block map.
- * @throws IOException
- */
- public abstract void refresh() throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java
deleted file mode 100644
index 2e94239..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.common;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-
-/**
- * This class is a stub for reading file regions from the block map.
- */
-public class FileRegionProvider implements Iterable<FileRegion> {
- @Override
- public Iterator<FileRegion> iterator() {
- return Collections.emptyListIterator();
- }
-
- public void refresh() throws IOException {
- return;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java
deleted file mode 100644
index eacd08f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java
+++ /dev/null
@@ -1,442 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.common;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Collections;
-import java.util.IdentityHashMap;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * This class is used for block maps stored as text files,
- * with a specified delimiter.
- */
-public class TextFileRegionFormat
- extends BlockFormat<FileRegion> implements Configurable {
-
- private Configuration conf;
- private ReaderOptions readerOpts = TextReader.defaults();
- private WriterOptions writerOpts = TextWriter.defaults();
-
- public static final Logger LOG =
- LoggerFactory.getLogger(TextFileRegionFormat.class);
- @Override
- public void setConf(Configuration conf) {
- readerOpts.setConf(conf);
- writerOpts.setConf(conf);
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public Reader<FileRegion> getReader(Reader.Options opts)
- throws IOException {
- if (null == opts) {
- opts = readerOpts;
- }
- if (!(opts instanceof ReaderOptions)) {
- throw new IllegalArgumentException("Invalid options " + opts.getClass());
- }
- ReaderOptions o = (ReaderOptions) opts;
- Configuration readerConf = (null == o.getConf())
- ? new Configuration()
- : o.getConf();
- return createReader(o.file, o.delim, readerConf);
- }
-
- @VisibleForTesting
- TextReader createReader(Path file, String delim, Configuration cfg)
- throws IOException {
- FileSystem fs = file.getFileSystem(cfg);
- if (fs instanceof LocalFileSystem) {
- fs = ((LocalFileSystem)fs).getRaw();
- }
- CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
- CompressionCodec codec = factory.getCodec(file);
- return new TextReader(fs, file, codec, delim);
- }
-
- @Override
- public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException {
- if (null == opts) {
- opts = writerOpts;
- }
- if (!(opts instanceof WriterOptions)) {
- throw new IllegalArgumentException("Invalid options " + opts.getClass());
- }
- WriterOptions o = (WriterOptions) opts;
- Configuration cfg = (null == o.getConf())
- ? new Configuration()
- : o.getConf();
- if (o.codec != null) {
- CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
- CompressionCodec codec = factory.getCodecByName(o.codec);
- String name = o.file.getName() + codec.getDefaultExtension();
- o.filename(new Path(o.file.getParent(), name));
- return createWriter(o.file, codec, o.delim, cfg);
- }
- return createWriter(o.file, null, o.delim, conf);
- }
-
- @VisibleForTesting
- TextWriter createWriter(Path file, CompressionCodec codec, String delim,
- Configuration cfg) throws IOException {
- FileSystem fs = file.getFileSystem(cfg);
- if (fs instanceof LocalFileSystem) {
- fs = ((LocalFileSystem)fs).getRaw();
- }
- OutputStream tmp = fs.create(file);
- java.io.Writer out = new BufferedWriter(new OutputStreamWriter(
- (null == codec) ? tmp : codec.createOutputStream(tmp), "UTF-8"));
- return new TextWriter(out, delim);
- }
-
- /**
- * Class specifying reader options for the {@link TextFileRegionFormat}.
- */
- public static class ReaderOptions
- implements TextReader.Options, Configurable {
-
- private Configuration conf;
- private String delim =
- DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT;
- private Path file = new Path(
- new File(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT)
- .toURI().toString());
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- String tmpfile = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_READ_PATH,
- DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
- file = new Path(tmpfile);
- delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER,
- DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT);
- LOG.info("TextFileRegionFormat: read path " + tmpfile.toString());
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public ReaderOptions filename(Path file) {
- this.file = file;
- return this;
- }
-
- @Override
- public ReaderOptions delimiter(String delim) {
- this.delim = delim;
- return this;
- }
- }
-
- /**
- * Class specifying writer options for the {@link TextFileRegionFormat}.
- */
- public static class WriterOptions
- implements TextWriter.Options, Configurable {
-
- private Configuration conf;
- private String codec = null;
- private Path file =
- new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
- private String delim =
- DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT;
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- String tmpfile = conf.get(
- DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_WRITE_PATH, file.toString());
- file = new Path(tmpfile);
- codec = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_CODEC);
- delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER,
- DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT);
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public WriterOptions filename(Path file) {
- this.file = file;
- return this;
- }
-
- public String getCodec() {
- return codec;
- }
-
- public Path getFile() {
- return file;
- }
-
- @Override
- public WriterOptions codec(String codec) {
- this.codec = codec;
- return this;
- }
-
- @Override
- public WriterOptions delimiter(String delim) {
- this.delim = delim;
- return this;
- }
-
- }
-
- /**
- * This class is used as a reader for block maps which
- * are stored as delimited text files.
- */
- public static class TextReader extends Reader<FileRegion> {
-
- /**
- * Options for {@link TextReader}.
- */
- public interface Options extends Reader.Options {
- Options filename(Path file);
- Options delimiter(String delim);
- }
-
- static ReaderOptions defaults() {
- return new ReaderOptions();
- }
-
- private final Path file;
- private final String delim;
- private final FileSystem fs;
- private final CompressionCodec codec;
- private final Map<FRIterator, BufferedReader> iterators;
-
- protected TextReader(FileSystem fs, Path file, CompressionCodec codec,
- String delim) {
- this(fs, file, codec, delim,
- new IdentityHashMap<FRIterator, BufferedReader>());
- }
-
- TextReader(FileSystem fs, Path file, CompressionCodec codec, String delim,
- Map<FRIterator, BufferedReader> iterators) {
- this.fs = fs;
- this.file = file;
- this.codec = codec;
- this.delim = delim;
- this.iterators = Collections.synchronizedMap(iterators);
- }
-
- @Override
- public FileRegion resolve(Block ident) throws IOException {
- // consider layering index w/ composable format
- Iterator<FileRegion> i = iterator();
- try {
- while (i.hasNext()) {
- FileRegion f = i.next();
- if (f.getBlock().equals(ident)) {
- return f;
- }
- }
- } finally {
- BufferedReader r = iterators.remove(i);
- if (r != null) {
- // null on last element
- r.close();
- }
- }
- return null;
- }
-
- class FRIterator implements Iterator<FileRegion> {
-
- private FileRegion pending;
-
- @Override
- public boolean hasNext() {
- return pending != null;
- }
-
- @Override
- public FileRegion next() {
- if (null == pending) {
- throw new NoSuchElementException();
- }
- FileRegion ret = pending;
- try {
- pending = nextInternal(this);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return ret;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- private FileRegion nextInternal(Iterator<FileRegion> i) throws IOException {
- BufferedReader r = iterators.get(i);
- if (null == r) {
- throw new IllegalStateException();
- }
- String line = r.readLine();
- if (null == line) {
- iterators.remove(i);
- return null;
- }
- String[] f = line.split(delim);
- if (f.length != 6) {
- throw new IOException("Invalid line: " + line);
- }
- return new FileRegion(Long.parseLong(f[0]), new Path(f[1]),
- Long.parseLong(f[2]), Long.parseLong(f[3]), f[5],
- Long.parseLong(f[4]));
- }
-
- public InputStream createStream() throws IOException {
- InputStream i = fs.open(file);
- if (codec != null) {
- i = codec.createInputStream(i);
- }
- return i;
- }
-
- @Override
- public Iterator<FileRegion> iterator() {
- FRIterator i = new FRIterator();
- try {
- BufferedReader r =
- new BufferedReader(new InputStreamReader(createStream(), "UTF-8"));
- iterators.put(i, r);
- i.pending = nextInternal(i);
- } catch (IOException e) {
- iterators.remove(i);
- throw new RuntimeException(e);
- }
- return i;
- }
-
- @Override
- public void close() throws IOException {
- ArrayList<IOException> ex = new ArrayList<>();
- synchronized (iterators) {
- for (Iterator<BufferedReader> i = iterators.values().iterator();
- i.hasNext();) {
- try {
- BufferedReader r = i.next();
- r.close();
- } catch (IOException e) {
- ex.add(e);
- } finally {
- i.remove();
- }
- }
- iterators.clear();
- }
- if (!ex.isEmpty()) {
- throw MultipleIOException.createIOException(ex);
- }
- }
-
- }
-
- /**
- * This class is used as a writer for block maps which
- * are stored as delimited text files.
- */
- public static class TextWriter extends Writer<FileRegion> {
-
- /**
- * Interface for Writer options.
- */
- public interface Options extends Writer.Options {
- Options codec(String codec);
- Options filename(Path file);
- Options delimiter(String delim);
- }
-
- public static WriterOptions defaults() {
- return new WriterOptions();
- }
-
- private final String delim;
- private final java.io.Writer out;
-
- public TextWriter(java.io.Writer out, String delim) {
- this.out = out;
- this.delim = delim;
- }
-
- @Override
- public void store(FileRegion token) throws IOException {
- out.append(String.valueOf(token.getBlock().getBlockId())).append(delim);
- out.append(token.getPath().toString()).append(delim);
- out.append(Long.toString(token.getOffset())).append(delim);
- out.append(Long.toString(token.getLength())).append(delim);
- out.append(Long.toString(token.getGenerationStamp())).append(delim);
- out.append(token.getBlockPoolId()).append("\n");
- }
-
- @Override
- public void close() throws IOException {
- out.close();
- }
-
- }
-
- @Override
- public void refresh() throws IOException {
- //nothing to do;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java
deleted file mode 100644
index 0fa667e..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.common;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * This class is used to read file regions from block maps
- * specified using delimited text.
- */
-public class TextFileRegionProvider
- extends FileRegionProvider implements Configurable {
-
- private Configuration conf;
- private BlockFormat<FileRegion> fmt;
-
- @SuppressWarnings("unchecked")
- @Override
- public void setConf(Configuration conf) {
- fmt = ReflectionUtils.newInstance(
- conf.getClass(DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
- TextFileRegionFormat.class,
- BlockFormat.class),
- conf);
- ((Configurable)fmt).setConf(conf); //redundant?
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public Iterator<FileRegion> iterator() {
- try {
- final BlockFormat.Reader<FileRegion> r = fmt.getReader(null);
- return new Iterator<FileRegion>() {
-
- private final Iterator<FileRegion> inner = r.iterator();
-
- @Override
- public boolean hasNext() {
- return inner.hasNext();
- }
-
- @Override
- public FileRegion next() {
- return inner.next();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- } catch (IOException e) {
- throw new RuntimeException("Failed to read provided blocks", e);
- }
- }
-
- @Override
- public void refresh() throws IOException {
- fmt.refresh();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java
new file mode 100644
index 0000000..d276fb5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common.blockaliasmap;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.BlockAlias;
+
+/**
+ * An abstract class used to read and write block maps for provided blocks.
+ */
+public abstract class BlockAliasMap<T extends BlockAlias> {
+
+ /**
+ * An abstract class that is used to read {@link BlockAlias}es
+ * for provided blocks.
+ */
+ public static abstract class Reader<U extends BlockAlias>
+ implements Iterable<U>, Closeable {
+
+ /**
+ * reader options.
+ */
+ public interface Options { }
+
+ /**
+ * @param ident block to resolve
+ * @return BlockAlias correspoding to the provided block.
+ * @throws IOException
+ */
+ public abstract U resolve(Block ident) throws IOException;
+
+ }
+
+ /**
+ * Returns a reader to the alias map.
+ * @param opts reader options
+ * @return {@link Reader} to the alias map.
+ * @throws IOException
+ */
+ public abstract Reader<T> getReader(Reader.Options opts) throws IOException;
+
+ /**
+ * An abstract class used as a writer for the provided block map.
+ */
+ public static abstract class Writer<U extends BlockAlias>
+ implements Closeable {
+ /**
+ * writer options.
+ */
+ public interface Options { }
+
+ public abstract void store(U token) throws IOException;
+
+ }
+
+ /**
+ * Returns the writer for the alias map.
+ * @param opts writer options.
+ * @return {@link Writer} to the alias map.
+ * @throws IOException
+ */
+ public abstract Writer<T> getWriter(Writer.Options opts) throws IOException;
+
+ /**
+ * Refresh the alias map.
+ * @throws IOException
+ */
+ public abstract void refresh() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java
new file mode 100644
index 0000000..80f48c1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class is used for block maps stored as text files,
+ * with a specified delimiter.
+ */
+public class TextFileRegionAliasMap
+ extends BlockAliasMap<FileRegion> implements Configurable {
+
+ private Configuration conf;
+ private ReaderOptions readerOpts = TextReader.defaults();
+ private WriterOptions writerOpts = TextWriter.defaults();
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TextFileRegionAliasMap.class);
+ @Override
+ public void setConf(Configuration conf) {
+ readerOpts.setConf(conf);
+ writerOpts.setConf(conf);
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public Reader<FileRegion> getReader(Reader.Options opts)
+ throws IOException {
+ if (null == opts) {
+ opts = readerOpts;
+ }
+ if (!(opts instanceof ReaderOptions)) {
+ throw new IllegalArgumentException("Invalid options " + opts.getClass());
+ }
+ ReaderOptions o = (ReaderOptions) opts;
+ Configuration readerConf = (null == o.getConf())
+ ? new Configuration()
+ : o.getConf();
+ return createReader(o.file, o.delim, readerConf);
+ }
+
+ @VisibleForTesting
+ TextReader createReader(Path file, String delim, Configuration cfg)
+ throws IOException {
+ FileSystem fs = file.getFileSystem(cfg);
+ if (fs instanceof LocalFileSystem) {
+ fs = ((LocalFileSystem)fs).getRaw();
+ }
+ CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
+ CompressionCodec codec = factory.getCodec(file);
+ return new TextReader(fs, file, codec, delim);
+ }
+
+ @Override
+ public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException {
+ if (null == opts) {
+ opts = writerOpts;
+ }
+ if (!(opts instanceof WriterOptions)) {
+ throw new IllegalArgumentException("Invalid options " + opts.getClass());
+ }
+ WriterOptions o = (WriterOptions) opts;
+ Configuration cfg = (null == o.getConf())
+ ? new Configuration()
+ : o.getConf();
+ if (o.codec != null) {
+ CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
+ CompressionCodec codec = factory.getCodecByName(o.codec);
+ String name = o.file.getName() + codec.getDefaultExtension();
+ o.filename(new Path(o.file.getParent(), name));
+ return createWriter(o.file, codec, o.delim, cfg);
+ }
+ return createWriter(o.file, null, o.delim, conf);
+ }
+
+ @VisibleForTesting
+ TextWriter createWriter(Path file, CompressionCodec codec, String delim,
+ Configuration cfg) throws IOException {
+ FileSystem fs = file.getFileSystem(cfg);
+ if (fs instanceof LocalFileSystem) {
+ fs = ((LocalFileSystem)fs).getRaw();
+ }
+ OutputStream tmp = fs.create(file);
+ java.io.Writer out = new BufferedWriter(new OutputStreamWriter(
+ (null == codec) ? tmp : codec.createOutputStream(tmp), "UTF-8"));
+ return new TextWriter(out, delim);
+ }
+
+ /**
+ * Class specifying reader options for the {@link TextFileRegionAliasMap}.
+ */
+ public static class ReaderOptions
+ implements TextReader.Options, Configurable {
+
+ private Configuration conf;
+ private String delim =
+ DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT;
+ private Path file = new Path(
+ new File(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT).toURI()
+ .toString());
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ String tmpfile =
+ conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_PATH,
+ DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT);
+ file = new Path(tmpfile);
+ delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER,
+ DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT);
+ LOG.info("TextFileRegionAliasMap: read path " + tmpfile.toString());
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public ReaderOptions filename(Path file) {
+ this.file = file;
+ return this;
+ }
+
+ @Override
+ public ReaderOptions delimiter(String delim) {
+ this.delim = delim;
+ return this;
+ }
+ }
+
+ /**
+ * Class specifying writer options for the {@link TextFileRegionAliasMap}.
+ */
+ public static class WriterOptions
+ implements TextWriter.Options, Configurable {
+
+ private Configuration conf;
+ private String codec = null;
+ private Path file =
+ new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT);;
+ private String delim =
+ DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ String tmpfile = conf.get(
+ DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_PATH, file.toString());
+ file = new Path(tmpfile);
+ codec = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_CODEC);
+ delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER,
+ DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public WriterOptions filename(Path file) {
+ this.file = file;
+ return this;
+ }
+
+ public String getCodec() {
+ return codec;
+ }
+
+ public Path getFile() {
+ return file;
+ }
+
+ @Override
+ public WriterOptions codec(String codec) {
+ this.codec = codec;
+ return this;
+ }
+
+ @Override
+ public WriterOptions delimiter(String delim) {
+ this.delim = delim;
+ return this;
+ }
+
+ }
+
+ /**
+ * This class is used as a reader for block maps which
+ * are stored as delimited text files.
+ */
+ public static class TextReader extends Reader<FileRegion> {
+
+ /**
+ * Options for {@link TextReader}.
+ */
+ public interface Options extends Reader.Options {
+ Options filename(Path file);
+ Options delimiter(String delim);
+ }
+
+ static ReaderOptions defaults() {
+ return new ReaderOptions();
+ }
+
+ private final Path file;
+ private final String delim;
+ private final FileSystem fs;
+ private final CompressionCodec codec;
+ private final Map<FRIterator, BufferedReader> iterators;
+
+ protected TextReader(FileSystem fs, Path file, CompressionCodec codec,
+ String delim) {
+ this(fs, file, codec, delim,
+ new IdentityHashMap<FRIterator, BufferedReader>());
+ }
+
+ TextReader(FileSystem fs, Path file, CompressionCodec codec, String delim,
+ Map<FRIterator, BufferedReader> iterators) {
+ this.fs = fs;
+ this.file = file;
+ this.codec = codec;
+ this.delim = delim;
+ this.iterators = Collections.synchronizedMap(iterators);
+ }
+
+ @Override
+ public FileRegion resolve(Block ident) throws IOException {
+ // consider layering index w/ composable format
+ Iterator<FileRegion> i = iterator();
+ try {
+ while (i.hasNext()) {
+ FileRegion f = i.next();
+ if (f.getBlock().equals(ident)) {
+ return f;
+ }
+ }
+ } finally {
+ BufferedReader r = iterators.remove(i);
+ if (r != null) {
+ // null on last element
+ r.close();
+ }
+ }
+ return null;
+ }
+
+ class FRIterator implements Iterator<FileRegion> {
+
+ private FileRegion pending;
+
+ @Override
+ public boolean hasNext() {
+ return pending != null;
+ }
+
+ @Override
+ public FileRegion next() {
+ if (null == pending) {
+ throw new NoSuchElementException();
+ }
+ FileRegion ret = pending;
+ try {
+ pending = nextInternal(this);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private FileRegion nextInternal(Iterator<FileRegion> i) throws IOException {
+ BufferedReader r = iterators.get(i);
+ if (null == r) {
+ throw new IllegalStateException();
+ }
+ String line = r.readLine();
+ if (null == line) {
+ iterators.remove(i);
+ return null;
+ }
+ String[] f = line.split(delim);
+ if (f.length != 6) {
+ throw new IOException("Invalid line: " + line);
+ }
+ return new FileRegion(Long.parseLong(f[0]), new Path(f[1]),
+ Long.parseLong(f[2]), Long.parseLong(f[3]), f[5],
+ Long.parseLong(f[4]));
+ }
+
+ public InputStream createStream() throws IOException {
+ InputStream i = fs.open(file);
+ if (codec != null) {
+ i = codec.createInputStream(i);
+ }
+ return i;
+ }
+
+ @Override
+ public Iterator<FileRegion> iterator() {
+ FRIterator i = new FRIterator();
+ try {
+ BufferedReader r =
+ new BufferedReader(new InputStreamReader(createStream(), "UTF-8"));
+ iterators.put(i, r);
+ i.pending = nextInternal(i);
+ } catch (IOException e) {
+ iterators.remove(i);
+ throw new RuntimeException(e);
+ }
+ return i;
+ }
+
+ @Override
+ public void close() throws IOException {
+ ArrayList<IOException> ex = new ArrayList<>();
+ synchronized (iterators) {
+ for (Iterator<BufferedReader> i = iterators.values().iterator();
+ i.hasNext();) {
+ try {
+ BufferedReader r = i.next();
+ r.close();
+ } catch (IOException e) {
+ ex.add(e);
+ } finally {
+ i.remove();
+ }
+ }
+ iterators.clear();
+ }
+ if (!ex.isEmpty()) {
+ throw MultipleIOException.createIOException(ex);
+ }
+ }
+
+ }
+
+ /**
+ * This class is used as a writer for block maps which
+ * are stored as delimited text files.
+ */
+ public static class TextWriter extends Writer<FileRegion> {
+
+ /**
+ * Interface for Writer options.
+ */
+ public interface Options extends Writer.Options {
+ Options codec(String codec);
+ Options filename(Path file);
+ Options delimiter(String delim);
+ }
+
+ public static WriterOptions defaults() {
+ return new WriterOptions();
+ }
+
+ private final String delim;
+ private final java.io.Writer out;
+
+ public TextWriter(java.io.Writer out, String delim) {
+ this.out = out;
+ this.delim = delim;
+ }
+
+ @Override
+ public void store(FileRegion token) throws IOException {
+ out.append(String.valueOf(token.getBlock().getBlockId())).append(delim);
+ out.append(token.getPath().toString()).append(delim);
+ out.append(Long.toString(token.getOffset())).append(delim);
+ out.append(Long.toString(token.getLength())).append(delim);
+ out.append(Long.toString(token.getGenerationStamp())).append(delim);
+ out.append(token.getBlockPoolId()).append("\n");
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+
+ }
+
+ @Override
+ public void refresh() throws IOException {
+ //nothing to do;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/package-info.java
new file mode 100644
index 0000000..b906791
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+package org.apache.hadoop.hdfs.server.common.blockaliasmap;
+
+/**
+ * The AliasMap defines mapping of PROVIDED HDFS blocks to data in remote
+ * storage systems.
+ */
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
index d1a7015..092672d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
@@ -35,9 +35,9 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.FileRegion;
-import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider;
+import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
+import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
@@ -68,7 +68,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
static class ProvidedBlockPoolSlice {
private ProvidedVolumeImpl providedVolume;
- private FileRegionProvider provider;
+ private BlockAliasMap<FileRegion> aliasMap;
private Configuration conf;
private String bpid;
private ReplicaMap bpVolumeMap;
@@ -77,29 +77,35 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
Configuration conf) {
this.providedVolume = volume;
bpVolumeMap = new ReplicaMap(new AutoCloseableLock());
- Class<? extends FileRegionProvider> fmt =
- conf.getClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
- TextFileRegionProvider.class, FileRegionProvider.class);
- provider = ReflectionUtils.newInstance(fmt, conf);
+ Class<? extends BlockAliasMap> fmt =
+ conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
+ TextFileRegionAliasMap.class, BlockAliasMap.class);
+ aliasMap = ReflectionUtils.newInstance(fmt, conf);
this.conf = conf;
this.bpid = bpid;
bpVolumeMap.initBlockPool(bpid);
- LOG.info("Created provider: " + provider.getClass());
+ LOG.info("Created alias map using class: " + aliasMap.getClass());
}
- FileRegionProvider getFileRegionProvider() {
- return provider;
+ BlockAliasMap<FileRegion> getBlockAliasMap() {
+ return aliasMap;
}
@VisibleForTesting
- void setFileRegionProvider(FileRegionProvider newProvider) {
- this.provider = newProvider;
+ void setFileRegionProvider(BlockAliasMap<FileRegion> blockAliasMap) {
+ this.aliasMap = blockAliasMap;
}
public void getVolumeMap(ReplicaMap volumeMap,
RamDiskReplicaTracker ramDiskReplicaMap, FileSystem remoteFS)
throws IOException {
- Iterator<FileRegion> iter = provider.iterator();
+ BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null);
+ if (reader == null) {
+ LOG.warn("Got null reader from BlockAliasMap " + aliasMap
+ + "; no blocks will be populated");
+ return;
+ }
+ Iterator<FileRegion> iter = reader.iterator();
while (iter.hasNext()) {
FileRegion region = iter.next();
if (region.getBlockPoolId() != null
@@ -140,14 +146,20 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
public void compileReport(LinkedList<ScanInfo> report,
ReportCompiler reportCompiler)
throws IOException, InterruptedException {
- /* refresh the provider and return the list of blocks found.
+ /* refresh the aliasMap and return the list of blocks found.
* the assumption here is that the block ids in the external
* block map, after the refresh, are consistent with those
* from before the refresh, i.e., for blocks which did not change,
* the ids remain the same.
*/
- provider.refresh();
- Iterator<FileRegion> iter = provider.iterator();
+ aliasMap.refresh();
+ BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null);
+ if (reader == null) {
+ LOG.warn("Got null reader from BlockAliasMap " + aliasMap
+ + "; no blocks will be populated in scan report");
+ return;
+ }
+ Iterator<FileRegion> iter = reader.iterator();
while(iter.hasNext()) {
reportCompiler.throttle();
FileRegion region = iter.next();
@@ -284,15 +296,15 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
private String bpid;
private String name;
- private FileRegionProvider provider;
+ private BlockAliasMap<FileRegion> blockAliasMap;
private Iterator<FileRegion> blockIterator;
private ProvidedBlockIteratorState state;
ProviderBlockIteratorImpl(String bpid, String name,
- FileRegionProvider provider) {
+ BlockAliasMap<FileRegion> blockAliasMap) {
this.bpid = bpid;
this.name = name;
- this.provider = provider;
+ this.blockAliasMap = blockAliasMap;
rewind();
}
@@ -330,7 +342,17 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
@Override
public void rewind() {
- blockIterator = provider.iterator();
+ BlockAliasMap.Reader<FileRegion> reader = null;
+ try {
+ reader = blockAliasMap.getReader(null);
+ } catch (IOException e) {
+ LOG.warn("Exception in getting reader from provided alias map");
+ }
+ if (reader != null) {
+ blockIterator = reader.iterator();
+ } else {
+ blockIterator = null;
+ }
state = new ProvidedBlockIteratorState();
}
@@ -372,14 +394,14 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
@Override
public BlockIterator newBlockIterator(String bpid, String name) {
return new ProviderBlockIteratorImpl(bpid, name,
- bpSlices.get(bpid).getFileRegionProvider());
+ bpSlices.get(bpid).getBlockAliasMap());
}
@Override
public BlockIterator loadBlockIterator(String bpid, String name)
throws IOException {
ProviderBlockIteratorImpl iter = new ProviderBlockIteratorImpl(bpid, name,
- bpSlices.get(bpid).getFileRegionProvider());
+ bpSlices.get(bpid).getBlockAliasMap());
iter.load();
return iter;
}
@@ -425,8 +447,8 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
}
@VisibleForTesting
- FileRegionProvider getFileRegionProvider(String bpid) throws IOException {
- return getProvidedBlockPoolSlice(bpid).getFileRegionProvider();
+ BlockAliasMap<FileRegion> getBlockFormat(String bpid) throws IOException {
+ return getProvidedBlockPoolSlice(bpid).getBlockAliasMap();
}
@Override
@@ -571,12 +593,12 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
}
@VisibleForTesting
- void setFileRegionProvider(String bpid, FileRegionProvider provider)
- throws IOException {
+ void setFileRegionProvider(String bpid,
+ BlockAliasMap<FileRegion> blockAliasMap) throws IOException {
ProvidedBlockPoolSlice bp = bpSlices.get(bpid);
if (bp == null) {
throw new IOException("block pool " + bpid + " is not found");
}
- bp.setFileRegionProvider(provider);
+ bp.setFileRegionProvider(blockAliasMap);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 0f1407a..835d8c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4630,26 +4630,6 @@
</property>
<property>
- <name>dfs.namenode.block.provider.class</name>
- <value>org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider</value>
- <description>
- The class that is used to load provided blocks in the Namenode.
- </description>
- </property>
-
- <property>
- <name>dfs.provider.class</name>
- <value>org.apache.hadoop.hdfs.server.common.TextFileRegionProvider</value>
- <description>
- The class that is used to load information about blocks stored in
- provided storages.
- org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider
- is used as the default, which expects the blocks to be specified
- using a delimited text file.
- </description>
- </property>
-
- <property>
<name>dfs.provided.df.class</name>
<value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.DefaultProvidedVolumeDF</value>
<description>
@@ -4666,12 +4646,12 @@
</property>
<property>
- <name>dfs.provided.blockformat.class</name>
- <value>org.apache.hadoop.hdfs.server.common.TextFileRegionFormat</value>
+ <name>dfs.provided.aliasmap.class</name>
+ <value>org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap</value>
<description>
The class that is used to specify the input format of the blocks on
provided storages. The default is
- org.apache.hadoop.hdfs.server.common.TextFileRegionFormat which uses
+ org.apache.hadoop.hdfs.server.common.TextFileRegionAliasMap which uses
file regions to describe blocks. The file regions are specified as a
delimited text file. Each file region is a 6-tuple containing the
block id, remote file path, offset into file, length of block, the
@@ -4681,7 +4661,7 @@
</property>
<property>
- <name>dfs.provided.textprovider.delimiter</name>
+ <name>dfs.provided.aliasmap.text.delimiter</name>
<value>,</value>
<description>
The delimiter used when the provided block map is specified as
@@ -4690,7 +4670,7 @@
</property>
<property>
- <name>dfs.provided.textprovider.read.path</name>
+ <name>dfs.provided.aliasmap.text.read.path</name>
<value></value>
<description>
The path specifying the provided block map as a text file, specified as
@@ -4699,7 +4679,7 @@
</property>
<property>
- <name>dfs.provided.textprovider.read.codec</name>
+ <name>dfs.provided.aliasmap.text.codec</name>
<value></value>
<description>
The codec used to de-compress the provided block map.
@@ -4707,7 +4687,7 @@
</property>
<property>
- <name>dfs.provided.textprovider.write.path</name>
+ <name>dfs.provided.aliasmap.text.write.path</name>
<value></value>
<description>
The path to which the provided block map should be written as a text
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java
index 2296c82..89741b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java
@@ -17,20 +17,19 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestProvidedImpl;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.util.RwLock;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.util.Iterator;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -47,37 +46,6 @@ public class TestProvidedStorageMap {
private RwLock nameSystemLock;
private String providedStorageID;
- static class TestBlockProvider extends BlockProvider
- implements Configurable {
-
- @Override
- public void setConf(Configuration conf) {
- }
-
- @Override
- public Configuration getConf() {
- return null;
- }
-
- @Override
- public Iterator<Block> iterator() {
- return new Iterator<Block>() {
- @Override
- public boolean hasNext() {
- return false;
- }
- @Override
- public Block next() {
- return null;
- }
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- }
-
@Before
public void setup() {
providedStorageID = DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT;
@@ -85,8 +53,9 @@ public class TestProvidedStorageMap {
conf.set(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
providedStorageID);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true);
- conf.setClass(DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS,
- TestBlockProvider.class, BlockProvider.class);
+ conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
+ TestProvidedImpl.TestFileRegionBlockAliasMap.class,
+ BlockAliasMap.class);
bm = mock(BlockManager.class);
nameSystemLock = mock(RwLock.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java
deleted file mode 100644
index eaaac22..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.common;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStreamWriter;
-import java.util.Iterator;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat.*;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.compress.CompressionCodec;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-/**
- * Test for the text based block format for provided block maps.
- */
-public class TestTextBlockFormat {
-
- static final Path OUTFILE = new Path("hdfs://dummyServer:0000/dummyFile.txt");
-
- void check(TextWriter.Options opts, final Path vp,
- final Class<? extends CompressionCodec> vc) throws IOException {
- TextFileRegionFormat mFmt = new TextFileRegionFormat() {
- @Override
- public TextWriter createWriter(Path file, CompressionCodec codec,
- String delim, Configuration conf) throws IOException {
- assertEquals(vp, file);
- if (null == vc) {
- assertNull(codec);
- } else {
- assertEquals(vc, codec.getClass());
- }
- return null; // ignored
- }
- };
- mFmt.getWriter(opts);
- }
-
- @Test
- public void testWriterOptions() throws Exception {
- TextWriter.Options opts = TextWriter.defaults();
- assertTrue(opts instanceof WriterOptions);
- WriterOptions wopts = (WriterOptions) opts;
- Path def = new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
- assertEquals(def, wopts.getFile());
- assertNull(wopts.getCodec());
-
- opts.filename(OUTFILE);
- check(opts, OUTFILE, null);
-
- opts.filename(OUTFILE);
- opts.codec("gzip");
- Path cp = new Path(OUTFILE.getParent(), OUTFILE.getName() + ".gz");
- check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class);
-
- }
-
- @Test
- public void testCSVReadWrite() throws Exception {
- final DataOutputBuffer out = new DataOutputBuffer();
- FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
- FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
- FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
- try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), ",")) {
- csv.store(r1);
- csv.store(r2);
- csv.store(r3);
- }
- Iterator<FileRegion> i3;
- try (TextReader csv = new TextReader(null, null, null, ",") {
- @Override
- public InputStream createStream() {
- DataInputBuffer in = new DataInputBuffer();
- in.reset(out.getData(), 0, out.getLength());
- return in;
- }}) {
- Iterator<FileRegion> i1 = csv.iterator();
- assertEquals(r1, i1.next());
- Iterator<FileRegion> i2 = csv.iterator();
- assertEquals(r1, i2.next());
- assertEquals(r2, i2.next());
- assertEquals(r3, i2.next());
- assertEquals(r2, i1.next());
- assertEquals(r3, i1.next());
-
- assertFalse(i1.hasNext());
- assertFalse(i2.hasNext());
- i3 = csv.iterator();
- }
- try {
- i3.next();
- } catch (IllegalStateException e) {
- return;
- }
- fail("Invalid iterator");
- }
-
- @Test
- public void testCSVReadWriteTsv() throws Exception {
- final DataOutputBuffer out = new DataOutputBuffer();
- FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
- FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
- FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
- try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), "\t")) {
- csv.store(r1);
- csv.store(r2);
- csv.store(r3);
- }
- Iterator<FileRegion> i3;
- try (TextReader csv = new TextReader(null, null, null, "\t") {
- @Override
- public InputStream createStream() {
- DataInputBuffer in = new DataInputBuffer();
- in.reset(out.getData(), 0, out.getLength());
- return in;
- }}) {
- Iterator<FileRegion> i1 = csv.iterator();
- assertEquals(r1, i1.next());
- Iterator<FileRegion> i2 = csv.iterator();
- assertEquals(r1, i2.next());
- assertEquals(r2, i2.next());
- assertEquals(r3, i2.next());
- assertEquals(r2, i1.next());
- assertEquals(r3, i1.next());
-
- assertFalse(i1.hasNext());
- assertFalse(i2.hasNext());
- i3 = csv.iterator();
- }
- try {
- i3.next();
- } catch (IllegalStateException e) {
- return;
- }
- fail("Invalid iterator");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ae7ea55/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestTextBlockAliasMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestTextBlockAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestTextBlockAliasMap.java
new file mode 100644
index 0000000..79308a3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestTextBlockAliasMap.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap.*;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test for the text based block format for provided block maps.
+ */
+public class TestTextBlockAliasMap {
+
+ static final Path OUTFILE = new Path("hdfs://dummyServer:0000/dummyFile.txt");
+
+ void check(TextWriter.Options opts, final Path vp,
+ final Class<? extends CompressionCodec> vc) throws IOException {
+ TextFileRegionAliasMap mFmt = new TextFileRegionAliasMap() {
+ @Override
+ public TextWriter createWriter(Path file, CompressionCodec codec,
+ String delim, Configuration conf) throws IOException {
+ assertEquals(vp, file);
+ if (null == vc) {
+ assertNull(codec);
+ } else {
+ assertEquals(vc, codec.getClass());
+ }
+ return null; // ignored
+ }
+ };
+ mFmt.getWriter(opts);
+ }
+
+ @Test
+ public void testWriterOptions() throws Exception {
+ TextWriter.Options opts = TextWriter.defaults();
+ assertTrue(opts instanceof WriterOptions);
+ WriterOptions wopts = (WriterOptions) opts;
+ Path def = new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT);
+ assertEquals(def, wopts.getFile());
+ assertNull(wopts.getCodec());
+
+ opts.filename(OUTFILE);
+ check(opts, OUTFILE, null);
+
+ opts.filename(OUTFILE);
+ opts.codec("gzip");
+ Path cp = new Path(OUTFILE.getParent(), OUTFILE.getName() + ".gz");
+ check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class);
+
+ }
+
+ @Test
+ public void testCSVReadWrite() throws Exception {
+ final DataOutputBuffer out = new DataOutputBuffer();
+ FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
+ FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
+ FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
+ try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), ",")) {
+ csv.store(r1);
+ csv.store(r2);
+ csv.store(r3);
+ }
+ Iterator<FileRegion> i3;
+ try (TextReader csv = new TextReader(null, null, null, ",") {
+ @Override
+ public InputStream createStream() {
+ DataInputBuffer in = new DataInputBuffer();
+ in.reset(out.getData(), 0, out.getLength());
+ return in;
+ }}) {
+ Iterator<FileRegion> i1 = csv.iterator();
+ assertEquals(r1, i1.next());
+ Iterator<FileRegion> i2 = csv.iterator();
+ assertEquals(r1, i2.next());
+ assertEquals(r2, i2.next());
+ assertEquals(r3, i2.next());
+ assertEquals(r2, i1.next());
+ assertEquals(r3, i1.next());
+
+ assertFalse(i1.hasNext());
+ assertFalse(i2.hasNext());
+ i3 = csv.iterator();
+ }
+ try {
+ i3.next();
+ } catch (IllegalStateException e) {
+ return;
+ }
+ fail("Invalid iterator");
+ }
+
+ @Test
+ public void testCSVReadWriteTsv() throws Exception {
+ final DataOutputBuffer out = new DataOutputBuffer();
+ FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
+ FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
+ FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
+ try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), "\t")) {
+ csv.store(r1);
+ csv.store(r2);
+ csv.store(r3);
+ }
+ Iterator<FileRegion> i3;
+ try (TextReader csv = new TextReader(null, null, null, "\t") {
+ @Override
+ public InputStream createStream() {
+ DataInputBuffer in = new DataInputBuffer();
+ in.reset(out.getData(), 0, out.getLength());
+ return in;
+ }}) {
+ Iterator<FileRegion> i1 = csv.iterator();
+ assertEquals(r1, i1.next());
+ Iterator<FileRegion> i2 = csv.iterator();
+ assertEquals(r1, i2.next());
+ assertEquals(r2, i2.next());
+ assertEquals(r3, i2.next());
+ assertEquals(r2, i1.next());
+ assertEquals(r3, i1.next());
+
+ assertFalse(i1.hasNext());
+ assertFalse(i2.hasNext());
+ i3 = csv.iterator();
+ }
+ try {
+ i3.next();
+ } catch (IllegalStateException e) {
+ return;
+ }
+ fail("Invalid iterator");
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org