You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/04/02 19:38:58 UTC
svn commit: r1308437 [3/3] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/
src/main/java/org/apache/hadoop/...
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1308437&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Mon Apr 2 17:38:56 2012
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+
+/**
+ * The underlying volume used to store replica.
+ *
+ * It uses the {@link FsDatasetImpl} object for synchronization.
+ */
+@InterfaceAudience.Private
+class FsVolumeImpl implements FsVolumeSpi {
+ private final FsDatasetImpl dataset;
+ private final String storageID;
+ private final Map<String, BlockPoolSlice> bpSlices
+ = new HashMap<String, BlockPoolSlice>();
+ private final File currentDir; // <StorageDirectory>/current
+ private final DF usage;
+ private final long reserved;
+
+ FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
+ Configuration conf) throws IOException {
+ this.dataset = dataset;
+ this.storageID = storageID;
+ this.reserved = conf.getLong(
+ DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
+ DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
+ this.currentDir = currentDir;
+ File parent = currentDir.getParentFile();
+ this.usage = new DF(parent, conf);
+ }
+
+ File getCurrentDir() {
+ return currentDir;
+ }
+
+ File getRbwDir(String bpid) throws IOException {
+ return getBlockPoolSlice(bpid).getRbwDir();
+ }
+
+ void decDfsUsed(String bpid, long value) {
+ synchronized(dataset) {
+ BlockPoolSlice bp = bpSlices.get(bpid);
+ if (bp != null) {
+ bp.decDfsUsed(value);
+ }
+ }
+ }
+
+ long getDfsUsed() throws IOException {
+ long dfsUsed = 0;
+ synchronized(dataset) {
+ for(BlockPoolSlice s : bpSlices.values()) {
+ dfsUsed += s.getDfsUsed();
+ }
+ }
+ return dfsUsed;
+ }
+
+ long getBlockPoolUsed(String bpid) throws IOException {
+ return getBlockPoolSlice(bpid).getDfsUsed();
+ }
+
+ /**
+ * Calculate the capacity of the filesystem, after removing any
+ * reserved capacity.
+ * @return the unreserved number of bytes left in this filesystem. May be zero.
+ */
+ long getCapacity() {
+ long remaining = usage.getCapacity() - reserved;
+ return remaining > 0 ? remaining : 0;
+ }
+
+ @Override
+ public long getAvailable() throws IOException {
+ long remaining = getCapacity()-getDfsUsed();
+ long available = usage.getAvailable();
+ if (remaining > available) {
+ remaining = available;
+ }
+ return (remaining > 0) ? remaining : 0;
+ }
+
+ long getReserved(){
+ return reserved;
+ }
+
+ BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
+ BlockPoolSlice bp = bpSlices.get(bpid);
+ if (bp == null) {
+ throw new IOException("block pool " + bpid + " is not found");
+ }
+ return bp;
+ }
+
+ @Override
+ public String getPath(String bpid) throws IOException {
+ return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
+ }
+
+ @Override
+ public File getFinalizedDir(String bpid) throws IOException {
+ return getBlockPoolSlice(bpid).getFinalizedDir();
+ }
+
+ /**
+ * Make a deep copy of the list of currently active BPIDs
+ */
+ @Override
+ public String[] getBlockPoolList() {
+ return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);
+ }
+
+ /**
+ * Temporary files. They get moved to the finalized block directory when
+ * the block is finalized.
+ */
+ File createTmpFile(String bpid, Block b) throws IOException {
+ return getBlockPoolSlice(bpid).createTmpFile(b);
+ }
+
+ /**
+ * RBW files. They get moved to the finalized block directory when
+ * the block is finalized.
+ */
+ File createRbwFile(String bpid, Block b) throws IOException {
+ return getBlockPoolSlice(bpid).createRbwFile(b);
+ }
+
+ File addBlock(String bpid, Block b, File f) throws IOException {
+ return getBlockPoolSlice(bpid).addBlock(b, f);
+ }
+
+ void checkDirs() throws DiskErrorException {
+ // TODO:FEDERATION valid synchronization
+ for(BlockPoolSlice s : bpSlices.values()) {
+ s.checkDirs();
+ }
+ }
+
+ void getVolumeMap(ReplicaMap volumeMap) throws IOException {
+ for(BlockPoolSlice s : bpSlices.values()) {
+ s.getVolumeMap(volumeMap);
+ }
+ }
+
+ void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
+ getBlockPoolSlice(bpid).getVolumeMap(volumeMap);
+ }
+
+ /**
+ * Add replicas under the given directory to the volume map
+ * @param volumeMap the replicas map
+ * @param dir an input directory
+ * @param isFinalized true if the directory has finalized replicas;
+ * false if the directory has rbw replicas
+ * @throws IOException
+ */
+ void addToReplicasMap(String bpid, ReplicaMap volumeMap,
+ File dir, boolean isFinalized) throws IOException {
+ BlockPoolSlice bp = getBlockPoolSlice(bpid);
+ // TODO move this up
+ // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
+ bp.addToReplicasMap(volumeMap, dir, isFinalized);
+ }
+
+ void clearPath(String bpid, File f) throws IOException {
+ getBlockPoolSlice(bpid).clearPath(f);
+ }
+
+ @Override
+ public String toString() {
+ return currentDir.getAbsolutePath();
+ }
+
+ void shutdown() {
+ Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
+ for (Entry<String, BlockPoolSlice> entry : set) {
+ entry.getValue().shutdown();
+ }
+ }
+
+ void addBlockPool(String bpid, Configuration conf) throws IOException {
+ File bpdir = new File(currentDir, bpid);
+ BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
+ bpSlices.put(bpid, bp);
+ }
+
+ void shutdownBlockPool(String bpid) {
+ BlockPoolSlice bp = bpSlices.get(bpid);
+ if (bp != null) {
+ bp.shutdown();
+ }
+ bpSlices.remove(bpid);
+ }
+
+ boolean isBPDirEmpty(String bpid) throws IOException {
+ File volumeCurrentDir = this.getCurrentDir();
+ File bpDir = new File(volumeCurrentDir, bpid);
+ File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
+ File finalizedDir = new File(bpCurrentDir,
+ DataStorage.STORAGE_DIR_FINALIZED);
+ File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
+ if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) {
+ return false;
+ }
+ if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
+ return false;
+ }
+ return true;
+ }
+
+ void deleteBPDirectories(String bpid, boolean force) throws IOException {
+ File volumeCurrentDir = this.getCurrentDir();
+ File bpDir = new File(volumeCurrentDir, bpid);
+ if (!bpDir.isDirectory()) {
+ // nothing to be deleted
+ return;
+ }
+ File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
+ File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
+ File finalizedDir = new File(bpCurrentDir,
+ DataStorage.STORAGE_DIR_FINALIZED);
+ File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
+ if (force) {
+ FileUtil.fullyDelete(bpDir);
+ } else {
+ if (!rbwDir.delete()) {
+ throw new IOException("Failed to delete " + rbwDir);
+ }
+ if (!finalizedDir.delete()) {
+ throw new IOException("Failed to delete " + finalizedDir);
+ }
+ FileUtil.fullyDelete(tmpDir);
+ for (File f : FileUtil.listFiles(bpCurrentDir)) {
+ if (!f.delete()) {
+ throw new IOException("Failed to delete " + f);
+ }
+ }
+ if (!bpCurrentDir.delete()) {
+ throw new IOException("Failed to delete " + bpCurrentDir);
+ }
+ for (File f : FileUtil.listFiles(bpDir)) {
+ if (!f.delete()) {
+ throw new IOException("Failed to delete " + f);
+ }
+ }
+ if (!bpDir.delete()) {
+ throw new IOException("Failed to delete " + bpDir);
+ }
+ }
+ }
+
+ String getStorageID() {
+ return storageID;
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1308437&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Mon Apr 2 17:38:56 2012
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+
+class FsVolumeList {
+ /**
+ * Read access to this unmodifiable list is not synchronized.
+ * This list is replaced on modification holding "this" lock.
+ */
+ volatile List<FsVolumeImpl> volumes = null;
+
+ private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
+ private volatile int numFailedVolumes;
+
+ FsVolumeList(List<FsVolumeImpl> volumes, int failedVols,
+ VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
+ this.volumes = Collections.unmodifiableList(volumes);
+ this.blockChooser = blockChooser;
+ this.numFailedVolumes = failedVols;
+ }
+
+ int numberOfFailedVolumes() {
+ return numFailedVolumes;
+ }
+
+ /**
+ * Get next volume. Synchronized to ensure {@link #curVolume} is updated
+ * by a single thread and next volume is chosen with no concurrent
+ * update to {@link #volumes}.
+ * @param blockSize free space needed on the volume
+ * @return next volume to store the block in.
+ */
+ synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
+ return blockChooser.chooseVolume(volumes, blockSize);
+ }
+
+ long getDfsUsed() throws IOException {
+ long dfsUsed = 0L;
+ for (FsVolumeImpl v : volumes) {
+ dfsUsed += v.getDfsUsed();
+ }
+ return dfsUsed;
+ }
+
+ long getBlockPoolUsed(String bpid) throws IOException {
+ long dfsUsed = 0L;
+ for (FsVolumeImpl v : volumes) {
+ dfsUsed += v.getBlockPoolUsed(bpid);
+ }
+ return dfsUsed;
+ }
+
+ long getCapacity() {
+ long capacity = 0L;
+ for (FsVolumeImpl v : volumes) {
+ capacity += v.getCapacity();
+ }
+ return capacity;
+ }
+
+ long getRemaining() throws IOException {
+ long remaining = 0L;
+ for (FsVolumeSpi vol : volumes) {
+ remaining += vol.getAvailable();
+ }
+ return remaining;
+ }
+
+ void getVolumeMap(ReplicaMap volumeMap) throws IOException {
+ for (FsVolumeImpl v : volumes) {
+ v.getVolumeMap(volumeMap);
+ }
+ }
+
+ void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
+ for (FsVolumeImpl v : volumes) {
+ v.getVolumeMap(bpid, volumeMap);
+ }
+ }
+
+ /**
+ * Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any
+ * volumes from the active list that result in a DiskErrorException.
+ *
+ * This method is synchronized to allow only one instance of checkDirs()
+ * call
+ * @return list of all the removed volumes.
+ */
+ synchronized List<FsVolumeImpl> checkDirs() {
+ ArrayList<FsVolumeImpl> removedVols = null;
+
+ // Make a copy of volumes for performing modification
+ final List<FsVolumeImpl> volumeList = new ArrayList<FsVolumeImpl>(volumes);
+
+ for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
+ final FsVolumeImpl fsv = i.next();
+ try {
+ fsv.checkDirs();
+ } catch (DiskErrorException e) {
+ FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ",e);
+ if (removedVols == null) {
+ removedVols = new ArrayList<FsVolumeImpl>(1);
+ }
+ removedVols.add(fsv);
+ fsv.shutdown();
+ i.remove(); // Remove the volume
+ numFailedVolumes++;
+ }
+ }
+
+ if (removedVols != null && removedVols.size() > 0) {
+ // Replace volume list
+ volumes = Collections.unmodifiableList(volumeList);
+ FsDatasetImpl.LOG.info("Completed checkDirs. Removed " + removedVols.size()
+ + " volumes. Current volumes: " + this);
+ }
+
+ return removedVols;
+ }
+
+ @Override
+ public String toString() {
+ return volumes.toString();
+ }
+
+
+ void addBlockPool(String bpid, Configuration conf) throws IOException {
+ for (FsVolumeImpl v : volumes) {
+ v.addBlockPool(bpid, conf);
+ }
+ }
+
+ void removeBlockPool(String bpid) {
+ for (FsVolumeImpl v : volumes) {
+ v.shutdownBlockPool(bpid);
+ }
+ }
+
+ void shutdown() {
+ for (FsVolumeImpl volume : volumes) {
+ if(volume != null) {
+ volume.shutdown();
+ }
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LDir.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LDir.java?rev=1308437&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LDir.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LDir.java Mon Apr 2 17:38:56 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+
+/**
+ * A node type that can be built into a tree reflecting the
+ * hierarchy of replicas on the local disk.
+ */
+class LDir {
+ final File dir;
+ final int maxBlocksPerDir;
+
+ private int numBlocks = 0;
+ private LDir[] children = null;
+ private int lastChildIdx = 0;
+
+ LDir(File dir, int maxBlocksPerDir) throws IOException {
+ this.dir = dir;
+ this.maxBlocksPerDir = maxBlocksPerDir;
+
+ if (!dir.exists()) {
+ if (!dir.mkdirs()) {
+ throw new IOException("Failed to mkdirs " + dir);
+ }
+ } else {
+ File[] files = FileUtil.listFiles(dir);
+ List<LDir> dirList = new ArrayList<LDir>();
+ for (int idx = 0; idx < files.length; idx++) {
+ if (files[idx].isDirectory()) {
+ dirList.add(new LDir(files[idx], maxBlocksPerDir));
+ } else if (Block.isBlockFilename(files[idx])) {
+ numBlocks++;
+ }
+ }
+ if (dirList.size() > 0) {
+ children = dirList.toArray(new LDir[dirList.size()]);
+ }
+ }
+ }
+
+ File addBlock(Block b, File src) throws IOException {
+ //First try without creating subdirectories
+ File file = addBlock(b, src, false, false);
+ return (file != null) ? file : addBlock(b, src, true, true);
+ }
+
+ private File addBlock(Block b, File src, boolean createOk, boolean resetIdx
+ ) throws IOException {
+ if (numBlocks < maxBlocksPerDir) {
+ final File dest = FsDatasetImpl.moveBlockFiles(b, src, dir);
+ numBlocks += 1;
+ return dest;
+ }
+
+ if (lastChildIdx < 0 && resetIdx) {
+ //reset so that all children will be checked
+ lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
+ }
+
+ if (lastChildIdx >= 0 && children != null) {
+ //Check if any child-tree has room for a block.
+ for (int i=0; i < children.length; i++) {
+ int idx = (lastChildIdx + i)%children.length;
+ File file = children[idx].addBlock(b, src, false, resetIdx);
+ if (file != null) {
+ lastChildIdx = idx;
+ return file;
+ }
+ }
+ lastChildIdx = -1;
+ }
+
+ if (!createOk) {
+ return null;
+ }
+
+ if (children == null || children.length == 0) {
+ children = new LDir[maxBlocksPerDir];
+ for (int idx = 0; idx < maxBlocksPerDir; idx++) {
+ final File sub = new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx);
+ children[idx] = new LDir(sub, maxBlocksPerDir);
+ }
+ }
+
+ //now pick a child randomly for creating a new set of subdirs.
+ lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
+ return children[ lastChildIdx ].addBlock(b, src, true, false);
+ }
+
+ void getVolumeMap(String bpid, ReplicaMap volumeMap, FsVolumeImpl volume
+ ) throws IOException {
+ if (children != null) {
+ for (int i = 0; i < children.length; i++) {
+ children[i].getVolumeMap(bpid, volumeMap, volume);
+ }
+ }
+
+ recoverTempUnlinkedBlock();
+ volume.addToReplicasMap(bpid, volumeMap, dir, true);
+ }
+
+ /**
+ * Recover unlinked tmp files on datanode restart. If the original block
+ * does not exist, then the tmp file is renamed to be the
+ * original file name; otherwise the tmp file is deleted.
+ */
+ private void recoverTempUnlinkedBlock() throws IOException {
+ File files[] = FileUtil.listFiles(dir);
+ for (File file : files) {
+ if (!FsDatasetUtil.isUnlinkTmpFile(file)) {
+ continue;
+ }
+ File blockFile = FsDatasetUtil.getOrigFile(file);
+ if (blockFile.exists()) {
+ // If the original block file still exists, then no recovery is needed.
+ if (!file.delete()) {
+ throw new IOException("Unable to cleanup unlinked tmp file " + file);
+ }
+ } else {
+ if (!file.renameTo(blockFile)) {
+ throw new IOException("Unable to cleanup detached file " + file);
+ }
+ }
+ }
+ }
+
+ /**
+ * check if a data diretory is healthy
+ * @throws DiskErrorException
+ */
+ void checkDirTree() throws DiskErrorException {
+ DiskChecker.checkDir(dir);
+
+ if (children != null) {
+ for (int i = 0; i < children.length; i++) {
+ children[i].checkDirTree();
+ }
+ }
+ }
+
+ void clearPath(File f) {
+ String root = dir.getAbsolutePath();
+ String dir = f.getAbsolutePath();
+ if (dir.startsWith(root)) {
+ String[] dirNames = dir.substring(root.length()).
+ split(File.separator + DataStorage.BLOCK_SUBDIR_PREFIX);
+ if (clearPath(f, dirNames, 1))
+ return;
+ }
+ clearPath(f, null, -1);
+ }
+
+ /**
+ * dirNames is an array of string integers derived from
+ * usual directory structure data/subdirN/subdirXY/subdirM ...
+ * If dirName array is non-null, we only check the child at
+ * the children[dirNames[idx]]. This avoids iterating over
+ * children in common case. If directory structure changes
+ * in later versions, we need to revisit this.
+ */
+ private boolean clearPath(File f, String[] dirNames, int idx) {
+ if ((dirNames == null || idx == dirNames.length) &&
+ dir.compareTo(f) == 0) {
+ numBlocks--;
+ return true;
+ }
+
+ if (dirNames != null) {
+ //guess the child index from the directory name
+ if (idx > (dirNames.length - 1) || children == null) {
+ return false;
+ }
+ int childIdx;
+ try {
+ childIdx = Integer.parseInt(dirNames[idx]);
+ } catch (NumberFormatException ignored) {
+ // layout changed? we could print a warning.
+ return false;
+ }
+ return (childIdx >= 0 && childIdx < children.length) ?
+ children[childIdx].clearPath(f, dirNames, idx+1) : false;
+ }
+
+ //guesses failed. back to blind iteration.
+ if (children != null) {
+ for(int i=0; i < children.length; i++) {
+ if (children[i].clearPath(f, null, -1)){
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "FSDir{dir=" + dir + ", children="
+ + (children == null ? null : Arrays.asList(children)) + "}";
+ }
+}
\ No newline at end of file
Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java (from r1308436, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java&r1=1308436&r2=1308437&rev=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java Mon Apr 2 17:38:56 2012
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.Collection;
import java.util.HashMap;
@@ -23,11 +23,12 @@ import java.util.Map;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
/**
- * Maintains the replicas map.
+ * Maintains the replica map.
*/
-class ReplicasMap {
+class ReplicaMap {
// Object using which this class is synchronized
private final Object mutex;
@@ -35,7 +36,7 @@ class ReplicasMap {
private Map<String, Map<Long, ReplicaInfo>> map =
new HashMap<String, Map<Long, ReplicaInfo>>();
- ReplicasMap(Object mutex) {
+ ReplicaMap(Object mutex) {
if (mutex == null) {
throw new HadoopIllegalArgumentException(
"Object to synchronize on cannot be null");
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java?rev=1308437&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java Mon Apr 2 17:38:56 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
+
+class RollingLogsImpl implements RollingLogs {
+ private static final String CURR_SUFFIX = ".curr";
+ private static final String PREV_SUFFIX = ".prev";
+
+ static boolean isFilePresent(String dir, String filePrefix) {
+ return new File(dir, filePrefix + CURR_SUFFIX).exists() ||
+ new File(dir, filePrefix + PREV_SUFFIX).exists();
+ }
+
+ private final File curr;
+ private final File prev;
+ private PrintStream out; //require synchronized access
+
+ private Appender appender = new Appender() {
+ @Override
+ public Appendable append(CharSequence csq) {
+ synchronized(RollingLogsImpl.this) {
+ if (out == null) {
+ throw new IllegalStateException(RollingLogsImpl.this
+ + " is not yet opened.");
+ }
+ out.print(csq);
+ }
+ return this;
+ }
+
+ @Override
+ public Appendable append(char c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Appendable append(CharSequence csq, int start, int end) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ synchronized(RollingLogsImpl.this) {
+ if (out != null) {
+ out.close();
+ out = null;
+ }
+ }
+ }
+ };
+
+
+ private final AtomicInteger numReaders = new AtomicInteger();
+
+ RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{
+ curr = new File(dir, filePrefix + CURR_SUFFIX);
+ prev = new File(dir, filePrefix + PREV_SUFFIX);
+ out = new PrintStream(new FileOutputStream(curr, true));
+ }
+
+ @Override
+ public Reader iterator(boolean skipPrevFile) throws IOException {
+ numReaders.incrementAndGet();
+ return new Reader(skipPrevFile);
+ }
+
+ @Override
+ public Appender appender() {
+ return appender;
+ }
+
+ @Override
+ public boolean roll() throws IOException {
+ if (numReaders.get() > 0) {
+ return false;
+ }
+ if (!prev.delete() && prev.exists()) {
+ throw new IOException("Failed to delete " + prev);
+ }
+
+ synchronized(this) {
+ appender.close();
+ final boolean renamed = curr.renameTo(prev);
+ out = new PrintStream(new FileOutputStream(curr, true));
+ if (!renamed) {
+ throw new IOException("Failed to rename " + curr + " to " + prev);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return curr.toString();
+ }
+
+ /**
+ * This is used to read the lines in order.
+ * If the data is not read completely (i.e, untill hasNext() returns
+ * false), it needs to be explicitly
+ */
+ private class Reader implements RollingLogs.LineIterator {
+ private File file;
+ private BufferedReader reader;
+ private String line;
+ private boolean closed = false;
+
+ private Reader(boolean skipPrevFile) throws IOException {
+ reader = null;
+ file = skipPrevFile? curr : prev;
+ readNext();
+ }
+
+ @Override
+ public boolean isPrevious() {
+ return file == prev;
+ }
+
+ private boolean openFile() throws IOException {
+
+ for(int i=0; i<2; i++) {
+ if (reader != null || i > 0) {
+ // move to next file
+ file = isPrevious()? curr : null;
+ }
+ if (file == null) {
+ return false;
+ }
+ if (file.exists()) {
+ break;
+ }
+ }
+
+ if (reader != null ) {
+ reader.close();
+ reader = null;
+ }
+
+ reader = new BufferedReader(new FileReader(file));
+ return true;
+ }
+
+ // read next line if possible.
+ private void readNext() throws IOException {
+ line = null;
+ try {
+ if (reader != null && (line = reader.readLine()) != null) {
+ return;
+ }
+ if (line == null) {
+ // move to the next file.
+ if (openFile()) {
+ readNext();
+ }
+ }
+ } finally {
+ if (!hasNext()) {
+ close();
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return line != null;
+ }
+
+ @Override
+ public String next() {
+ String curLine = line;
+ try {
+ readNext();
+ } catch (IOException e) {
+ DataBlockScanner.LOG.warn("Failed to read next line.", e);
+ }
+ return curLine;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ try {
+ if (reader != null) {
+ reader.close();
+ }
+ } finally {
+ file = null;
+ reader = null;
+ closed = true;
+ final int n = numReaders.decrementAndGet();
+ assert(n >= 0);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java Mon Apr 2 17:38:56 2012
@@ -28,7 +28,7 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java Mon Apr 2 17:38:56 2012
@@ -22,12 +22,16 @@ package org.apache.hadoop.hdfs.server.da
import java.io.File;
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.mockito.Mockito;
import com.google.common.base.Preconditions;
@@ -100,6 +104,19 @@ public class DataNodeTestUtils {
return spy;
}
+ public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
+ DataNode dn, DatanodeID datanodeid, final Configuration conf
+ ) throws IOException {
+ return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf,
+ dn.getDnConf().socketTimeout);
+ }
+
+ public static void shutdownBlockScanner(DataNode dn) {
+ if (dn.blockScanner != null) {
+ dn.blockScanner.shutdown();
+ }
+ }
+
/**
* This method is used for testing.
* Examples are adding and deleting blocks directly.
@@ -111,26 +128,22 @@ public class DataNodeTestUtils {
return dn.getFSDataset();
}
- public static FSDataset getFsDatasetImpl(DataNode dn) {
- return (FSDataset)dn.getFSDataset();
- }
-
public static File getFile(DataNode dn, String bpid, long bid) {
- return getFsDatasetImpl(dn).getFile(bpid, bid);
+ return FsDatasetTestUtil.getFile(dn.getFSDataset(), bpid, bid);
}
public static File getBlockFile(DataNode dn, String bpid, Block b
) throws IOException {
- return getFsDatasetImpl(dn).getBlockFile(bpid, b);
+ return FsDatasetTestUtil.getBlockFile(dn.getFSDataset(), bpid, b);
}
- public static boolean unlinkBlock(DataNode dn, ExtendedBlock block, int numLinks
+ public static boolean unlinkBlock(DataNode dn, ExtendedBlock bk, int numLinks
) throws IOException {
- return getFsDatasetImpl(dn).getReplicaInfo(block).unlinkBlock(numLinks);
+ return FsDatasetTestUtil.unlinkBlock(dn.getFSDataset(), bk, numLinks);
}
public static long getPendingAsyncDeletions(DataNode dn) {
- return getFsDatasetImpl(dn).asyncDiskService.countPendingDeletions();
+ return FsDatasetTestUtil.getPendingAsyncDeletions(dn.getFSDataset());
}
/**
@@ -142,6 +155,6 @@ public class DataNodeTestUtils {
*/
public static ReplicaInfo fetchReplicaInfo(final DataNode dn,
final String bpid, final long blkId) {
- return getFsDatasetImpl(dn).fetchReplicaInfo(bpid, blkId);
+ return FsDatasetTestUtil.fetchReplicaInfo(dn.getFSDataset(), bpid, blkId);
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Mon Apr 2 17:38:56 2012
@@ -613,14 +613,13 @@ public class TestBlockReport {
// Look about specified DN for the replica of the block from 1st DN
final DataNode dn1 = cluster.getDataNodes().get(DN_N1);
- final FSDataset dataset1 = (FSDataset)DataNodeTestUtils.getFSDataset(dn1);
String bpid = cluster.getNamesystem().getBlockPoolId();
- Replica r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId());
+ Replica r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
long start = System.currentTimeMillis();
int count = 0;
while (r == null) {
waitTil(5);
- r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId());
+ r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
long waiting_period = System.currentTimeMillis() - start;
if (count++ % 100 == 0)
if(LOG.isDebugEnabled()) {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Mon Apr 2 17:38:56 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -376,7 +377,7 @@ public class TestDataNodeVolumeFailure {
new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("blk_") &&
- name.endsWith(DatanodeUtil.METADATA_EXTENSION);
+ name.endsWith(Block.METADATA_EXTENSION);
}
}
);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java Mon Apr 2 17:38:56 2012
@@ -38,7 +38,9 @@ import org.apache.hadoop.hdfs.HdfsConfig
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
-import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
/**
* Tests {@link DirectoryScanner} handling of differences
@@ -51,7 +53,7 @@ public class TestDirectoryScanner extend
private MiniDFSCluster cluster;
private String bpid;
- private FSDataset fds = null;
+ private FsDatasetSpi<? extends FsVolumeSpi> fds = null;
private DirectoryScanner scanner = null;
private Random rand = new Random();
private Random r = new Random();
@@ -72,7 +74,7 @@ public class TestDirectoryScanner extend
/** Truncate a block file */
private long truncateBlockFile() throws IOException {
synchronized (fds) {
- for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
+ for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = b.getBlockFile();
File mf = b.getMetaFile();
// Truncate a block file that has a corresponding metadata file
@@ -91,7 +93,7 @@ public class TestDirectoryScanner extend
/** Delete a block file */
private long deleteBlockFile() {
synchronized(fds) {
- for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
+ for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = b.getBlockFile();
File mf = b.getMetaFile();
// Delete a block file that has corresponding metadata file
@@ -107,7 +109,7 @@ public class TestDirectoryScanner extend
/** Delete block meta file */
private long deleteMetaFile() {
synchronized(fds) {
- for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
+ for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File file = b.getMetaFile();
// Delete a metadata file
if (file.exists() && file.delete()) {
@@ -124,7 +126,7 @@ public class TestDirectoryScanner extend
long id = rand.nextLong();
while (true) {
id = rand.nextLong();
- if (fds.fetchReplicaInfo(bpid, id) == null) {
+ if (FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, id) == null) {
break;
}
}
@@ -142,7 +144,7 @@ public class TestDirectoryScanner extend
/** Create a block file in a random volume*/
private long createBlockFile() throws IOException {
- List<FSVolume> volumes = fds.getVolumes();
+ List<? extends FsVolumeSpi> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
@@ -155,7 +157,7 @@ public class TestDirectoryScanner extend
/** Create a metafile in a random volume*/
private long createMetaFile() throws IOException {
- List<FSVolume> volumes = fds.getVolumes();
+ List<? extends FsVolumeSpi> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
@@ -168,7 +170,7 @@ public class TestDirectoryScanner extend
/** Create block file and corresponding metafile in a rondom volume */
private long createBlockMetaFile() throws IOException {
- List<FSVolume> volumes = fds.getVolumes();
+ List<? extends FsVolumeSpi> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
@@ -228,8 +230,7 @@ public class TestDirectoryScanner extend
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
- fds = (FSDataset)DataNodeTestUtils.getFSDataset(
- cluster.getDataNodes().get(0));
+ fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
parallelism);
DataNode dn = cluster.getDataNodes().get(0);
@@ -348,12 +349,13 @@ public class TestDirectoryScanner extend
private void verifyAddition(long blockId, long genStamp, long size) {
final ReplicaInfo replicainfo;
- replicainfo = fds.fetchReplicaInfo(bpid, blockId);
+ replicainfo = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
assertNotNull(replicainfo);
// Added block has the same file as the one created by the test
File file = new File(getBlockFile(blockId));
- assertEquals(file.getName(), fds.getFile(bpid, blockId).getName());
+ assertEquals(file.getName(),
+ FsDatasetTestUtil.getFile(fds, bpid, blockId).getName());
// Generation stamp is same as that of created file
assertEquals(genStamp, replicainfo.getGenerationStamp());
@@ -364,12 +366,12 @@ public class TestDirectoryScanner extend
private void verifyDeletion(long blockId) {
// Ensure block does not exist in memory
- assertNull(fds.fetchReplicaInfo(bpid, blockId));
+ assertNull(FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId));
}
private void verifyGenStamp(long blockId, long genStamp) {
final ReplicaInfo memBlock;
- memBlock = fds.fetchReplicaInfo(bpid, blockId);
+ memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
assertNotNull(memBlock);
assertEquals(genStamp, memBlock.getGenerationStamp());
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java Mon Apr 2 17:38:56 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.util.DataChecksum;
/**
@@ -90,7 +91,7 @@ public class TestSimulatedFSDataset exte
public void testFSDatasetFactory() {
final Configuration conf = new Configuration();
FsDatasetSpi.Factory<?> f = FsDatasetSpi.Factory.getFactory(conf);
- assertEquals(FSDataset.Factory.class, f.getClass());
+ assertEquals(FsDatasetFactory.class, f.getClass());
assertFalse(f.isSimulated());
SimulatedFSDataset.setFactory(conf);
@@ -243,7 +244,7 @@ public class TestSimulatedFSDataset exte
}
}
- public void checkInvalidBlock(ExtendedBlock b) throws IOException {
+ public void checkInvalidBlock(ExtendedBlock b) {
final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
assertFalse(fsdataset.isValidBlock(b));
try {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java Mon Apr 2 17:38:56 2012
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.log4j.Level;
import org.junit.Assert;
@@ -58,8 +59,8 @@ public class TestTransferRbw {
}
private static ReplicaInPipeline getReplica(final DataNode datanode,
final String bpid, final ReplicaState expectedState) throws InterruptedException {
- final FSDataset dataset = ((FSDataset)datanode.data);
- final Collection<ReplicaInfo> replicas = dataset.volumeMap.replicas(bpid);
+ final Collection<ReplicaInfo> replicas = FsDatasetTestUtil.getReplicas(
+ datanode.getFSDataset(), bpid);
for(int i = 0; i < 5 && replicas.size() == 0; i++) {
LOG.info("wait since replicas.size() == 0; i=" + i);
Thread.sleep(1000);
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java?rev=1308437&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java Mon Apr 2 17:38:56 2012
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+
+public class FsDatasetTestUtil {
+
+ public static File getFile(FsDatasetSpi<?> fsd, String bpid, long bid) {
+ return ((FsDatasetImpl)fsd).getFile(bpid, bid);
+ }
+
+ public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b
+ ) throws IOException {
+ return ((FsDatasetImpl)fsd).getBlockFile(bpid, b);
+ }
+
+ public static boolean unlinkBlock(FsDatasetSpi<?> fsd,
+ ExtendedBlock block, int numLinks) throws IOException {
+ final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);
+ return info.unlinkBlock(numLinks);
+ }
+
+ public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi<?> fsd,
+ final String bpid, final long blockId) {
+ return ((FsDatasetImpl)fsd).fetchReplicaInfo(bpid, blockId);
+ }
+
+ public static long getPendingAsyncDeletions(FsDatasetSpi<?> fsd) {
+ return ((FsDatasetImpl)fsd).asyncDiskService.countPendingDeletions();
+ }
+
+ public static Collection<ReplicaInfo> getReplicas(FsDatasetSpi<?> fsd,
+ String bpid) {
+ return ((FsDatasetImpl)fsd).volumeMap.replicas(bpid);
+ }
+}
Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java (from r1308436, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java&r1=1308436&r2=1308437&rev=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java Mon Apr 2 17:38:56 2012
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.FileInputStream;
@@ -36,7 +36,10 @@ import org.apache.hadoop.hdfs.HdfsConfig
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
@@ -98,8 +101,8 @@ public class TestDatanodeRestart {
out.write(writeBuf);
out.hflush();
DataNode dn = cluster.getDataNodes().get(0);
- for (FsVolumeSpi v : dn.data.getVolumes()) {
- FSVolume volume = (FSVolume)v;
+ for (FsVolumeSpi v : dataset(dn).getVolumes()) {
+ final FsVolumeImpl volume = (FsVolumeImpl)v;
File currentDir = volume.getCurrentDir().getParentFile().getParentFile();
File rbwDir = new File(currentDir, "rbw");
for (File file : rbwDir.listFiles()) {
@@ -114,7 +117,7 @@ public class TestDatanodeRestart {
// check volumeMap: one rwr replica
String bpid = cluster.getNamesystem().getBlockPoolId();
- ReplicasMap replicas = ((FSDataset)(dn.data)).volumeMap;
+ ReplicaMap replicas = dataset(dn).volumeMap;
Assert.assertEquals(1, replicas.size(bpid));
ReplicaInfo replica = replicas.replicas(bpid).iterator().next();
Assert.assertEquals(ReplicaState.RWR, replica.getState());
@@ -123,7 +126,7 @@ public class TestDatanodeRestart {
} else {
Assert.assertEquals(fileLen, replica.getNumBytes());
}
- dn.data.invalidate(bpid, new Block[]{replica});
+ dataset(dn).invalidate(bpid, new Block[]{replica});
} finally {
IOUtils.closeStream(out);
if (fs.exists(src)) {
@@ -151,7 +154,7 @@ public class TestDatanodeRestart {
String bpid = cluster.getNamesystem().getBlockPoolId();
DataNode dn = cluster.getDataNodes().get(0);
Iterator<ReplicaInfo> replicasItor =
- ((FSDataset)dn.data).volumeMap.replicas(bpid).iterator();
+ dataset(dn).volumeMap.replicas(bpid).iterator();
ReplicaInfo replica = replicasItor.next();
createUnlinkTmpFile(replica, true, true); // rename block file
createUnlinkTmpFile(replica, false, true); // rename meta file
@@ -167,8 +170,7 @@ public class TestDatanodeRestart {
dn = cluster.getDataNodes().get(0);
// check volumeMap: 4 finalized replica
- Collection<ReplicaInfo> replicas =
- ((FSDataset)(dn.data)).volumeMap.replicas(bpid);
+ Collection<ReplicaInfo> replicas = dataset(dn).volumeMap.replicas(bpid);
Assert.assertEquals(4, replicas.size());
replicasItor = replicas.iterator();
while (replicasItor.hasNext()) {
@@ -180,6 +182,10 @@ public class TestDatanodeRestart {
}
}
+ private static FsDatasetImpl dataset(DataNode dn) {
+ return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
+ }
+
private static void createUnlinkTmpFile(ReplicaInfo replicaInfo,
boolean changeBlockFile,
boolean isRename) throws IOException {
Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java (from r1308436, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java&r1=1308436&r2=1308437&rev=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java Mon Apr 2 17:38:56 2012
@@ -15,22 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
-import java.util.List;
import java.net.InetSocketAddress;
-
import java.net.SocketTimeoutException;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -41,16 +34,28 @@ import org.apache.hadoop.hdfs.HdfsConfig
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -103,14 +108,14 @@ public class TestInterDatanodeProtocol {
}
public static void checkMetaInfo(ExtendedBlock b, DataNode dn) throws IOException {
- Block metainfo = dn.data.getStoredBlock(b.getBlockPoolId(), b.getBlockId());
+ Block metainfo = DataNodeTestUtils.getFSDataset(dn).getStoredBlock(
+ b.getBlockPoolId(), b.getBlockId());
Assert.assertEquals(b.getBlockId(), metainfo.getBlockId());
Assert.assertEquals(b.getNumBytes(), metainfo.getNumBytes());
}
public static LocatedBlock getLastLocatedBlock(
- ClientProtocol namenode, String src
- ) throws IOException {
+ ClientProtocol namenode, String src) throws IOException {
//get block info for the last block
LocatedBlocks locations = namenode.getBlockLocations(src, 0, Long.MAX_VALUE);
List<LocatedBlock> blocks = locations.getLocatedBlocks();
@@ -148,13 +153,11 @@ public class TestInterDatanodeProtocol {
//connect to a data node
DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
- InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
- datanodeinfo[0], conf, datanode.getDnConf().socketTimeout);
+ InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy(
+ datanode, datanodeinfo[0], conf);
//stop block scanner, so we could compare lastScanTime
- if (datanode.blockScanner != null) {
- datanode.blockScanner.shutdown();
- }
+ DataNodeTestUtils.shutdownBlockScanner(datanode);
//verify BlockMetaDataInfo
ExtendedBlock b = locatedblock.getBlock();
@@ -187,14 +190,14 @@ public class TestInterDatanodeProtocol {
}
/** Test
- * {@link FSDataset#initReplicaRecovery(String, ReplicasMap, Block, long)}
+ * {@link FsDatasetImpl#initReplicaRecovery(String, ReplicaMap, Block, long)}
*/
@Test
public void testInitReplicaRecovery() throws IOException {
final long firstblockid = 10000L;
final long gs = 7777L;
final long length = 22L;
- final ReplicasMap map = new ReplicasMap(this);
+ final ReplicaMap map = new ReplicaMap(this);
String bpid = "BP-TEST";
final Block[] blocks = new Block[5];
for(int i = 0; i < blocks.length; i++) {
@@ -208,7 +211,8 @@ public class TestInterDatanodeProtocol {
final ReplicaInfo originalInfo = map.get(bpid, b);
final long recoveryid = gs + 1;
- final ReplicaRecoveryInfo recoveryInfo = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid);
+ final ReplicaRecoveryInfo recoveryInfo = FsDatasetImpl.initReplicaRecovery(
+ bpid, map, blocks[0], recoveryid);
assertEquals(originalInfo, recoveryInfo);
final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b);
@@ -217,7 +221,7 @@ public class TestInterDatanodeProtocol {
//recover one more time
final long recoveryid2 = gs + 2;
- final ReplicaRecoveryInfo recoveryInfo2 = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid2);
+ final ReplicaRecoveryInfo recoveryInfo2 = FsDatasetImpl.initReplicaRecovery(bpid, map, blocks[0], recoveryid2);
assertEquals(originalInfo, recoveryInfo2);
final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b);
@@ -226,7 +230,7 @@ public class TestInterDatanodeProtocol {
//case RecoveryInProgressException
try {
- FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
+ FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
Assert.fail();
}
catch(RecoveryInProgressException ripe) {
@@ -237,7 +241,7 @@ public class TestInterDatanodeProtocol {
{ // BlockRecoveryFI_01: replica not found
final long recoveryid = gs + 1;
final Block b = new Block(firstblockid - 1, length, gs);
- ReplicaRecoveryInfo r = FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
+ ReplicaRecoveryInfo r = FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
Assert.assertNull("Data-node should not have this replica.", r);
}
@@ -245,7 +249,7 @@ public class TestInterDatanodeProtocol {
final long recoveryid = gs - 1;
final Block b = new Block(firstblockid + 1, length, gs);
try {
- FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
+ FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
Assert.fail();
}
catch(IOException ioe) {
@@ -258,7 +262,7 @@ public class TestInterDatanodeProtocol {
final long recoveryid = gs + 1;
final Block b = new Block(firstblockid, length, gs+1);
try {
- FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
+ FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
fail("InitReplicaRecovery should fail because replica's " +
"gs is less than the block's gs");
} catch (IOException e) {
@@ -270,7 +274,7 @@ public class TestInterDatanodeProtocol {
/**
* Test for
- * {@link FSDataset#updateReplicaUnderRecovery(ExtendedBlock, long, long)}
+ * {@link FsDatasetImpl#updateReplicaUnderRecovery(ExtendedBlock, long, long)}
* */
@Test
public void testUpdateReplicaUnderRecovery() throws IOException {
@@ -296,22 +300,22 @@ public class TestInterDatanodeProtocol {
//get DataNode and FSDataset objects
final DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
Assert.assertTrue(datanode != null);
- Assert.assertTrue(datanode.data instanceof FSDataset);
- final FSDataset fsdataset = (FSDataset)datanode.data;
//initReplicaRecovery
final ExtendedBlock b = locatedblock.getBlock();
final long recoveryid = b.getGenerationStamp() + 1;
final long newlength = b.getNumBytes() - 1;
+ final FsDatasetSpi<?> fsdataset = DataNodeTestUtils.getFSDataset(datanode);
final ReplicaRecoveryInfo rri = fsdataset.initReplicaRecovery(
new RecoveringBlock(b, null, recoveryid));
//check replica
- final ReplicaInfo replica = fsdataset.fetchReplicaInfo(bpid, b.getBlockId());
+ final ReplicaInfo replica = FsDatasetTestUtil.fetchReplicaInfo(
+ fsdataset, bpid, b.getBlockId());
Assert.assertEquals(ReplicaState.RUR, replica.getState());
//check meta data before update
- FSDataset.checkReplicaFiles(replica);
+ FsDatasetImpl.checkReplicaFiles(replica);
//case "THIS IS NOT SUPPOSED TO HAPPEN"
//with (block length) != (stored replica's on disk length).
Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java (from r1308436, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java&r1=1308436&r2=1308437&rev=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java Mon Apr 2 17:38:56 2012
@@ -15,21 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReplicaMap;
import org.junit.Before;
import org.junit.Test;
/**
* Unit test for ReplicasMap class
*/
-public class TestReplicasMap {
- private final ReplicasMap map = new ReplicasMap(TestReplicasMap.class);
+public class TestReplicaMap {
+ private final ReplicaMap map = new ReplicaMap(TestReplicaMap.class);
private final String bpid = "BP-TEST";
private final Block block = new Block(1234, 1234, 1234);
Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java (from r1308436, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java&r1=1308436&r2=1308437&rev=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java Mon Apr 2 17:38:56 2012
@@ -15,14 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Assert;
import org.junit.Test;
@@ -45,7 +54,7 @@ public class TestWriteToReplica {
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
- FSDataset dataSet = (FSDataset)dn.data;
+ FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -66,7 +75,7 @@ public class TestWriteToReplica {
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
- FSDataset dataSet = (FSDataset)dn.data;
+ FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -86,7 +95,7 @@ public class TestWriteToReplica {
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
- FSDataset dataSet = (FSDataset)dn.data;
+ FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -106,7 +115,7 @@ public class TestWriteToReplica {
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
- FSDataset dataSet = (FSDataset)dn.data;
+ FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -128,7 +137,7 @@ public class TestWriteToReplica {
* @return Contrived blocks for further testing.
* @throws IOException
*/
- private ExtendedBlock[] setup(String bpid, FSDataset dataSet) throws IOException {
+ private ExtendedBlock[] setup(String bpid, FsDatasetImpl dataSet) throws IOException {
// setup replicas map
ExtendedBlock[] blocks = new ExtendedBlock[] {
@@ -137,8 +146,8 @@ public class TestWriteToReplica {
new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
};
- ReplicasMap replicasMap = dataSet.volumeMap;
- FSVolume vol = dataSet.volumes.getNextVolume(0);
+ ReplicaMap replicasMap = dataSet.volumeMap;
+ FsVolumeImpl vol = dataSet.volumes.getNextVolume(0);
ReplicaInfo replicaInfo = new FinalizedReplica(
blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
replicasMap.add(bpid, replicaInfo);
@@ -165,9 +174,9 @@ public class TestWriteToReplica {
return blocks;
}
- private void testAppend(String bpid, FSDataset dataSet, ExtendedBlock[] blocks) throws IOException {
+ private void testAppend(String bpid, FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
long newGS = blocks[FINALIZED].getGenerationStamp()+1;
- final FSVolume v = (FSVolume)dataSet.volumeMap.get(
+ final FsVolumeImpl v = (FsVolumeImpl)dataSet.volumeMap.get(
bpid, blocks[FINALIZED].getLocalBlock()).getVolume();
long available = v.getCapacity()-v.getDfsUsed();
long expectedLen = blocks[FINALIZED].getNumBytes();
@@ -285,7 +294,7 @@ public class TestWriteToReplica {
}
}
- private void testClose(FSDataset dataSet, ExtendedBlock [] blocks) throws IOException {
+ private void testClose(FsDatasetImpl dataSet, ExtendedBlock [] blocks) throws IOException {
long newGS = blocks[FINALIZED].getGenerationStamp()+1;
dataSet.recoverClose(blocks[FINALIZED], newGS,
blocks[FINALIZED].getNumBytes()); // successful
@@ -335,7 +344,7 @@ public class TestWriteToReplica {
}
}
- private void testWriteToRbw(FSDataset dataSet, ExtendedBlock[] blocks) throws IOException {
+ private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
try {
dataSet.recoverRbw(blocks[FINALIZED],
blocks[FINALIZED].getGenerationStamp()+1,
@@ -428,7 +437,7 @@ public class TestWriteToReplica {
dataSet.createRbw(blocks[NON_EXISTENT]);
}
- private void testWriteToTemporary(FSDataset dataSet, ExtendedBlock[] blocks) throws IOException {
+ private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
try {
dataSet.createTemporary(blocks[FINALIZED]);
Assert.fail("Should not have created a temporary replica that was " +