You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2016/09/22 16:34:28 UTC
[40/50] [abbrv] hbase git commit: HBASE-14439 Move fs stuff out pt2 -
region fs
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe2918ba/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/HFileArchiver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/HFileArchiver.java
new file mode 100644
index 0000000..d682ccc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/HFileArchiver.java
@@ -0,0 +1,690 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.io.MultipleIOException;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+/**
+ * Utility class to handle the removal of HFiles (or the respective {@link StoreFile StoreFiles})
+ * for a HRegion from the {@link FileSystem}. The hfiles will be archived or deleted, depending on
+ * the state of the system.
+ */
+public class HFileArchiver {
+ private static final Log LOG = LogFactory.getLog(HFileArchiver.class);
+ private static final String SEPARATOR = ".";
+
+ /** Number of retries in case of fs operation failure */
+ private static final int DEFAULT_RETRIES_NUMBER = 3;
+
+ private HFileArchiver() {
+ // hidden ctor since this is just a util
+ }
+
+ /**
+ * Cleans up all the files for a HRegion by archiving the HFiles to the
+ * archive directory
+ * @param conf the configuration to use
+ * @param fs the file system object
+ * @param info HRegionInfo for region to be deleted
+ * @throws IOException
+ */
+ public static void archiveRegion(Configuration conf, FileSystem fs, HRegionInfo info)
+ throws IOException {
+ Path rootDir = FSUtils.getRootDir(conf);
+ archiveRegion(fs, rootDir, FSUtils.getTableDir(rootDir, info.getTable()),
+ HRegion.getRegionDir(rootDir, info));
+ }
+
+ /**
+ * Remove an entire region from the table directory via archiving the region's hfiles.
+ * @param fs {@link FileSystem} from which to remove the region
+ * @param rootdir {@link Path} to the root directory where hbase files are stored (for building
+ * the archive path)
+ * @param tableDir {@link Path} to where the table is being stored (for building the archive path)
+ * @param regionDir {@link Path} to where a region is being stored (for building the archive path)
+ * @return <tt>true</tt> if the region was sucessfully deleted. <tt>false</tt> if the filesystem
+ * operations could not complete.
+ * @throws IOException if the request cannot be completed
+ */
+ public static boolean archiveRegion(FileSystem fs, Path rootdir, Path tableDir, Path regionDir)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ARCHIVING " + regionDir.toString());
+ }
+
+ // otherwise, we archive the files
+ // make sure we can archive
+ if (tableDir == null || regionDir == null) {
+ LOG.error("No archive directory could be found because tabledir (" + tableDir
+ + ") or regiondir (" + regionDir + "was null. Deleting files instead.");
+ deleteRegionWithoutArchiving(fs, regionDir);
+ // we should have archived, but failed to. Doesn't matter if we deleted
+ // the archived files correctly or not.
+ return false;
+ }
+
+ // make sure the regiondir lives under the tabledir
+ Preconditions.checkArgument(regionDir.toString().startsWith(tableDir.toString()));
+ Path regionArchiveDir = HFileArchiveUtil.getRegionArchiveDir(rootdir,
+ FSUtils.getTableName(tableDir),
+ regionDir.getName());
+
+ FileStatusConverter getAsFile = new FileStatusConverter(fs);
+ // otherwise, we attempt to archive the store files
+
+ // build collection of just the store directories to archive
+ Collection<File> toArchive = new ArrayList<File>();
+ final PathFilter dirFilter = new FSUtils.DirFilter(fs);
+ PathFilter nonHidden = new PathFilter() {
+ @Override
+ public boolean accept(Path file) {
+ return dirFilter.accept(file) && !file.getName().toString().startsWith(".");
+ }
+ };
+ FileStatus[] storeDirs = FSUtils.listStatus(fs, regionDir, nonHidden);
+ // if there no files, we can just delete the directory and return;
+ if (storeDirs == null) {
+ LOG.debug("Region directory (" + regionDir + ") was empty, just deleting and returning!");
+ return deleteRegionWithoutArchiving(fs, regionDir);
+ }
+
+ // convert the files in the region to a File
+ toArchive.addAll(Lists.transform(Arrays.asList(storeDirs), getAsFile));
+ LOG.debug("Archiving " + toArchive);
+ boolean success = false;
+ try {
+ success = resolveAndArchive(fs, regionArchiveDir, toArchive);
+ } catch (IOException e) {
+ LOG.error("Failed to archive " + toArchive, e);
+ success = false;
+ }
+
+ // if that was successful, then we delete the region
+ if (success) {
+ return deleteRegionWithoutArchiving(fs, regionDir);
+ }
+
+ throw new IOException("Received error when attempting to archive files (" + toArchive
+ + "), cannot delete region directory. ");
+ }
+
+ /**
+ * Remove from the specified region the store files of the specified column family,
+ * either by archiving them or outright deletion
+ * @param fs the filesystem where the store files live
+ * @param conf {@link Configuration} to examine to determine the archive directory
+ * @param parent Parent region hosting the store files
+ * @param tableDir {@link Path} to where the table is being stored (for building the archive path)
+ * @param family the family hosting the store files
+ * @throws IOException if the files could not be correctly disposed.
+ */
+ public static void archiveFamily(FileSystem fs, Configuration conf,
+ HRegionInfo parent, Path tableDir, byte[] family) throws IOException {
+ Path familyDir = new Path(tableDir, new Path(parent.getEncodedName(), Bytes.toString(family)));
+ FileStatus[] storeFiles = FSUtils.listStatus(fs, familyDir);
+ if (storeFiles == null) {
+ LOG.debug("No store files to dispose for region=" + parent.getRegionNameAsString() +
+ ", family=" + Bytes.toString(family));
+ return;
+ }
+
+ FileStatusConverter getAsFile = new FileStatusConverter(fs);
+ Collection<File> toArchive = Lists.transform(Arrays.asList(storeFiles), getAsFile);
+ Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, tableDir, family);
+
+ // do the actual archive
+ if (!resolveAndArchive(fs, storeArchiveDir, toArchive)) {
+ throw new IOException("Failed to archive/delete all the files for region:"
+ + Bytes.toString(parent.getRegionName()) + ", family:" + Bytes.toString(family)
+ + " into " + storeArchiveDir + ". Something is probably awry on the filesystem.");
+ }
+ }
+
+ /**
+ * Remove the store files, either by archiving them or outright deletion
+ * @param conf {@link Configuration} to examine to determine the archive directory
+ * @param fs the filesystem where the store files live
+ * @param regionInfo {@link HRegionInfo} of the region hosting the store files
+ * @param family the family hosting the store files
+ * @param compactedFiles files to be disposed of. No further reading of these files should be
+ * attempted; otherwise likely to cause an {@link IOException}
+ * @throws IOException if the files could not be correctly disposed.
+ */
+ public static void archiveStoreFiles(Configuration conf, FileSystem fs, HRegionInfo regionInfo,
+ Path tableDir, byte[] family, Collection<StoreFile> compactedFiles) throws IOException {
+
+ // sometimes in testing, we don't have rss, so we need to check for that
+ if (fs == null) {
+ LOG.warn("Passed filesystem is null, so just deleting the files without archiving for region:"
+ + Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family));
+ deleteStoreFilesWithoutArchiving(compactedFiles);
+ return;
+ }
+
+ // short circuit if we don't have any files to delete
+ if (compactedFiles.size() == 0) {
+ LOG.debug("No store files to dispose, done!");
+ return;
+ }
+
+ // build the archive path
+ if (regionInfo == null || family == null) throw new IOException(
+ "Need to have a region and a family to archive from.");
+
+ Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
+
+ // make sure we don't archive if we can't and that the archive dir exists
+ if (!fs.mkdirs(storeArchiveDir)) {
+ throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"
+ + Bytes.toString(family) + ", deleting compacted files instead.");
+ }
+
+ // otherwise we attempt to archive the store files
+ if (LOG.isDebugEnabled()) LOG.debug("Archiving compacted store files.");
+
+ // Wrap the storefile into a File
+ StoreToFile getStorePath = new StoreToFile(fs);
+ Collection<File> storeFiles = Collections2.transform(compactedFiles, getStorePath);
+
+ // do the actual archive
+ if (!resolveAndArchive(fs, storeArchiveDir, storeFiles)) {
+ throw new IOException("Failed to archive/delete all the files for region:"
+ + Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family)
+ + " into " + storeArchiveDir + ". Something is probably awry on the filesystem.");
+ }
+ }
+
+ /**
+ * Archive the store file
+ * @param fs the filesystem where the store files live
+ * @param regionInfo region hosting the store files
+ * @param conf {@link Configuration} to examine to determine the archive directory
+ * @param tableDir {@link Path} to where the table is being stored (for building the archive path)
+ * @param family the family hosting the store files
+ * @param storeFile file to be archived
+ * @throws IOException if the files could not be correctly disposed.
+ */
+ public static void archiveStoreFile(Configuration conf, FileSystem fs, HRegionInfo regionInfo,
+ Path tableDir, byte[] family, Path storeFile) throws IOException {
+ Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
+ // make sure we don't archive if we can't and that the archive dir exists
+ if (!fs.mkdirs(storeArchiveDir)) {
+ throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"
+ + Bytes.toString(family) + ", deleting compacted files instead.");
+ }
+
+ // do the actual archive
+ long start = EnvironmentEdgeManager.currentTime();
+ File file = new FileablePath(fs, storeFile);
+ if (!resolveAndArchiveFile(storeArchiveDir, file, Long.toString(start))) {
+ throw new IOException("Failed to archive/delete the file for region:"
+ + regionInfo.getRegionNameAsString() + ", family:" + Bytes.toString(family)
+ + " into " + storeArchiveDir + ". Something is probably awry on the filesystem.");
+ }
+ }
+
+ /**
+ * Archive the given files and resolve any conflicts with existing files via appending the time
+ * archiving started (so all conflicts in the same group have the same timestamp appended).
+ * <p>
+ * If any of the passed files to archive are directories, archives all the files under that
+ * directory. Archive directory structure for children is the base archive directory name + the
+ * parent directory and is built recursively is passed files are directories themselves.
+ * @param fs {@link FileSystem} on which to archive the files
+ * @param baseArchiveDir base archive directory to archive the given files
+ * @param toArchive files to be archived
+ * @return <tt>true</tt> on success, <tt>false</tt> otherwise
+ * @throws IOException on unexpected failure
+ */
+ private static boolean resolveAndArchive(FileSystem fs, Path baseArchiveDir,
+ Collection<File> toArchive) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("Starting to archive " + toArchive);
+ long start = EnvironmentEdgeManager.currentTime();
+ List<File> failures = resolveAndArchive(fs, baseArchiveDir, toArchive, start);
+
+ // notify that some files were not archived.
+ // We can't delete the files otherwise snapshots or other backup system
+ // that relies on the archiver end up with data loss.
+ if (failures.size() > 0) {
+ LOG.warn("Failed to complete archive of: " + failures +
+ ". Those files are still in the original location, and they may slow down reads.");
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Resolve any conflict with an existing archive file via timestamp-append
+ * renaming of the existing file and then archive the passed in files.
+ * @param fs {@link FileSystem} on which to archive the files
+ * @param baseArchiveDir base archive directory to store the files. If any of
+ * the files to archive are directories, will append the name of the
+ * directory to the base archive directory name, creating a parallel
+ * structure.
+ * @param toArchive files/directories that need to be archvied
+ * @param start time the archiving started - used for resolving archive
+ * conflicts.
+ * @return the list of failed to archive files.
+ * @throws IOException if an unexpected file operation exception occured
+ */
+ private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
+ Collection<File> toArchive, long start) throws IOException {
+ // short circuit if no files to move
+ if (toArchive.size() == 0) return Collections.emptyList();
+
+ if (LOG.isTraceEnabled()) LOG.trace("moving files to the archive directory: " + baseArchiveDir);
+
+ // make sure the archive directory exists
+ if (!fs.exists(baseArchiveDir)) {
+ if (!fs.mkdirs(baseArchiveDir)) {
+ throw new IOException("Failed to create the archive directory:" + baseArchiveDir
+ + ", quitting archive attempt.");
+ }
+ if (LOG.isTraceEnabled()) LOG.trace("Created archive directory:" + baseArchiveDir);
+ }
+
+ List<File> failures = new ArrayList<File>();
+ String startTime = Long.toString(start);
+ for (File file : toArchive) {
+ // if its a file archive it
+ try {
+ if (LOG.isTraceEnabled()) LOG.trace("Archiving: " + file);
+ if (file.isFile()) {
+ // attempt to archive the file
+ if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
+ LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir);
+ failures.add(file);
+ }
+ } else {
+ // otherwise its a directory and we need to archive all files
+ if (LOG.isTraceEnabled()) LOG.trace(file + " is a directory, archiving children files");
+ // so we add the directory name to the one base archive
+ Path parentArchiveDir = new Path(baseArchiveDir, file.getName());
+ // and then get all the files from that directory and attempt to
+ // archive those too
+ Collection<File> children = file.getChildren();
+ failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, start));
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to archive " + file, e);
+ failures.add(file);
+ }
+ }
+ return failures;
+ }
+
+ /**
+ * Attempt to archive the passed in file to the archive directory.
+ * <p>
+ * If the same file already exists in the archive, it is moved to a timestamped directory under
+ * the archive directory and the new file is put in its place.
+ * @param archiveDir {@link Path} to the directory that stores the archives of the hfiles
+ * @param currentFile {@link Path} to the original HFile that will be archived
+ * @param archiveStartTime time the archiving started, to resolve naming conflicts
+ * @return <tt>true</tt> if the file is successfully archived. <tt>false</tt> if there was a
+ * problem, but the operation still completed.
+ * @throws IOException on failure to complete {@link FileSystem} operations.
+ */
+ private static boolean resolveAndArchiveFile(Path archiveDir, File currentFile,
+ String archiveStartTime) throws IOException {
+ // build path as it should be in the archive
+ String filename = currentFile.getName();
+ Path archiveFile = new Path(archiveDir, filename);
+ FileSystem fs = currentFile.getFileSystem();
+
+ // if the file already exists in the archive, move that one to a timestamped backup. This is a
+ // really, really unlikely situtation, where we get the same name for the existing file, but
+ // is included just for that 1 in trillion chance.
+ if (fs.exists(archiveFile)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File:" + archiveFile + " already exists in archive, moving to "
+ + "timestamped backup and overwriting current.");
+ }
+
+ // move the archive file to the stamped backup
+ Path backedupArchiveFile = new Path(archiveDir, filename + SEPARATOR + archiveStartTime);
+ if (!fs.rename(archiveFile, backedupArchiveFile)) {
+ LOG.error("Could not rename archive file to backup: " + backedupArchiveFile
+ + ", deleting existing file in favor of newer.");
+ // try to delete the exisiting file, if we can't rename it
+ if (!fs.delete(archiveFile, false)) {
+ throw new IOException("Couldn't delete existing archive file (" + archiveFile
+ + ") or rename it to the backup file (" + backedupArchiveFile
+ + ") to make room for similarly named file.");
+ }
+ }
+ LOG.debug("Backed up archive file from " + archiveFile);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("No existing file in archive for: " + archiveFile +
+ ", free to archive original file.");
+ }
+
+ // at this point, we should have a free spot for the archive file
+ boolean success = false;
+ for (int i = 0; !success && i < DEFAULT_RETRIES_NUMBER; ++i) {
+ if (i > 0) {
+ // Ensure that the archive directory exists.
+ // The previous "move to archive" operation has failed probably because
+ // the cleaner has removed our archive directory (HBASE-7643).
+ // (we're in a retry loop, so don't worry too much about the exception)
+ try {
+ if (!fs.exists(archiveDir)) {
+ if (fs.mkdirs(archiveDir)) {
+ LOG.debug("Created archive directory:" + archiveDir);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to create directory: " + archiveDir, e);
+ }
+ }
+
+ try {
+ success = currentFile.moveAndClose(archiveFile);
+ } catch (IOException e) {
+ LOG.warn("Failed to archive " + currentFile + " on try #" + i, e);
+ success = false;
+ }
+ }
+
+ if (!success) {
+ LOG.error("Failed to archive " + currentFile);
+ return false;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Finished archiving from " + currentFile + ", to " + archiveFile);
+ }
+ return true;
+ }
+
+ /**
+ * Without regard for backup, delete a region. Should be used with caution.
+ * @param regionDir {@link Path} to the region to be deleted.
+ * @param fs FileSystem from which to delete the region
+ * @return <tt>true</tt> on successful deletion, <tt>false</tt> otherwise
+ * @throws IOException on filesystem operation failure
+ */
+ private static boolean deleteRegionWithoutArchiving(FileSystem fs, Path regionDir)
+ throws IOException {
+ if (fs.delete(regionDir, true)) {
+ LOG.debug("Deleted all region files in: " + regionDir);
+ return true;
+ }
+ LOG.debug("Failed to delete region directory:" + regionDir);
+ return false;
+ }
+
+ /**
+ * Just do a simple delete of the given store files
+ * <p>
+ * A best effort is made to delete each of the files, rather than bailing on the first failure.
+ * <p>
+ * This method is preferable to {@link #deleteFilesWithoutArchiving(Collection)} since it consumes
+ * less resources, but is limited in terms of usefulness
+ * @param compactedFiles store files to delete from the file system.
+ * @throws IOException if a file cannot be deleted. All files will be attempted to deleted before
+ * throwing the exception, rather than failing at the first file.
+ */
+ private static void deleteStoreFilesWithoutArchiving(Collection<StoreFile> compactedFiles)
+ throws IOException {
+ LOG.debug("Deleting store files without archiving.");
+ List<IOException> errors = new ArrayList<IOException>(0);
+ for (StoreFile hsf : compactedFiles) {
+ try {
+ hsf.deleteReader();
+ } catch (IOException e) {
+ LOG.error("Failed to delete store file:" + hsf.getPath());
+ errors.add(e);
+ }
+ }
+ if (errors.size() > 0) {
+ throw MultipleIOException.createIOException(errors);
+ }
+ }
+
+ /**
+ * Adapt a type to match the {@link File} interface, which is used internally for handling
+ * archival/removal of files
+ * @param <T> type to adapt to the {@link File} interface
+ */
+ private static abstract class FileConverter<T> implements Function<T, File> {
+ protected final FileSystem fs;
+
+ public FileConverter(FileSystem fs) {
+ this.fs = fs;
+ }
+ }
+
+ /**
+ * Convert a FileStatus to something we can manage in the archiving
+ */
+ private static class FileStatusConverter extends FileConverter<FileStatus> {
+ public FileStatusConverter(FileSystem fs) {
+ super(fs);
+ }
+
+ @Override
+ public File apply(FileStatus input) {
+ return new FileablePath(fs, input.getPath());
+ }
+ }
+
+ /**
+ * Convert the {@link StoreFile} into something we can manage in the archive
+ * methods
+ */
+ private static class StoreToFile extends FileConverter<StoreFile> {
+ public StoreToFile(FileSystem fs) {
+ super(fs);
+ }
+
+ @Override
+ public File apply(StoreFile input) {
+ return new FileableStoreFile(fs, input);
+ }
+ }
+
+ /**
+ * Wrapper to handle file operations uniformly
+ */
+ private static abstract class File {
+ protected final FileSystem fs;
+
+ public File(FileSystem fs) {
+ this.fs = fs;
+ }
+
+ /**
+ * Delete the file
+ * @throws IOException on failure
+ */
+ abstract void delete() throws IOException;
+
+ /**
+ * Check to see if this is a file or a directory
+ * @return <tt>true</tt> if it is a file, <tt>false</tt> otherwise
+ * @throws IOException on {@link FileSystem} connection error
+ */
+ abstract boolean isFile() throws IOException;
+
+ /**
+ * @return if this is a directory, returns all the children in the
+ * directory, otherwise returns an empty list
+ * @throws IOException
+ */
+ abstract Collection<File> getChildren() throws IOException;
+
+ /**
+ * close any outside readers of the file
+ * @throws IOException
+ */
+ abstract void close() throws IOException;
+
+ /**
+ * @return the name of the file (not the full fs path, just the individual
+ * file name)
+ */
+ abstract String getName();
+
+ /**
+ * @return the path to this file
+ */
+ abstract Path getPath();
+
+ /**
+ * Move the file to the given destination
+ * @param dest
+ * @return <tt>true</tt> on success
+ * @throws IOException
+ */
+ public boolean moveAndClose(Path dest) throws IOException {
+ this.close();
+ Path p = this.getPath();
+ return FSUtils.renameAndSetModifyTime(fs, p, dest);
+ }
+
+ /**
+ * @return the {@link FileSystem} on which this file resides
+ */
+ public FileSystem getFileSystem() {
+ return this.fs;
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass() + ", file:" + getPath().toString();
+ }
+ }
+
+ /**
+ * A {@link File} that wraps a simple {@link Path} on a {@link FileSystem}.
+ */
+ private static class FileablePath extends File {
+ private final Path file;
+ private final FileStatusConverter getAsFile;
+
+ public FileablePath(FileSystem fs, Path file) {
+ super(fs);
+ this.file = file;
+ this.getAsFile = new FileStatusConverter(fs);
+ }
+
+ @Override
+ public void delete() throws IOException {
+ if (!fs.delete(file, true)) throw new IOException("Failed to delete:" + this.file);
+ }
+
+ @Override
+ public String getName() {
+ return file.getName();
+ }
+
+ @Override
+ public Collection<File> getChildren() throws IOException {
+ if (fs.isFile(file)) return Collections.emptyList();
+ return Collections2.transform(Arrays.asList(fs.listStatus(file)), getAsFile);
+ }
+
+ @Override
+ public boolean isFile() throws IOException {
+ return fs.isFile(file);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // NOOP - files are implicitly closed on removal
+ }
+
+ @Override
+ Path getPath() {
+ return file;
+ }
+ }
+
+ /**
+ * {@link File} adapter for a {@link StoreFile} living on a {@link FileSystem}
+ * .
+ */
+ private static class FileableStoreFile extends File {
+ StoreFile file;
+
+ public FileableStoreFile(FileSystem fs, StoreFile store) {
+ super(fs);
+ this.file = store;
+ }
+
+ @Override
+ public void delete() throws IOException {
+ file.deleteReader();
+ }
+
+ @Override
+ public String getName() {
+ return file.getPath().getName();
+ }
+
+ @Override
+ public boolean isFile() {
+ return true;
+ }
+
+ @Override
+ public Collection<File> getChildren() throws IOException {
+ // storefiles don't have children
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void close() throws IOException {
+ file.closeReader(true);
+ }
+
+ @Override
+ Path getPath() {
+ return file.getPath();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe2918ba/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyArchiver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyArchiver.java
new file mode 100644
index 0000000..1386fe4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyArchiver.java
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.fs.legacy;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+
+public final class LegacyArchiver {
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe2918ba/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyLayout.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyLayout.java
index 3af40fa..845ffce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyLayout.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyLayout.java
@@ -23,10 +23,20 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.mob.MobConstants;
public final class LegacyLayout {
/** Name of the region info file that resides just under the region directory. */
- public final static String REGION_INFO_FILE = ".regioninfo";
+ private final static String REGION_INFO_FILE = ".regioninfo";
+
+ /** Temporary subdirectory of the region directory used for merges. */
+ public static final String REGION_MERGES_DIR = ".merges";
+
+ /** Temporary subdirectory of the region directory used for splits. */
+ public static final String REGION_SPLITS_DIR = ".splits";
+
+ /** Temporary subdirectory of the region directory used for compaction output. */
+ private static final String REGION_TEMP_DIR = ".tmp";
private LegacyLayout() {}
@@ -63,7 +73,43 @@ public final class LegacyLayout {
return new Path(nsDir, table.getQualifierAsString());
}
- public static Path getRegionDir(Path baseDataDir, TableName table, HRegionInfo hri) {
- return new Path(getTableDir(baseDataDir, table), hri.getEncodedName());
+ public static Path getRegionDir(Path tableDir, HRegionInfo hri) {
+ return new Path(tableDir, hri.getEncodedName());
+ }
+
+ public static Path getFamilyDir(Path regionDir, String familyName) {
+ return new Path(regionDir, familyName);
+ }
+
+ public static Path getStoreFile(Path familyDir, String fileName) {
+ return new Path(familyDir, fileName);
+ }
+
+ public static Path getRegionInfoFile(Path regionDir) {
+ return new Path(regionDir, REGION_INFO_FILE);
+ }
+
+ public static Path getRegionTempDir(Path regionDir) {
+ return new Path(regionDir, REGION_TEMP_DIR);
+ }
+
+ public static Path getRegionMergesDir(Path regionDir) {
+ return new Path(regionDir, REGION_MERGES_DIR);
+ }
+
+ public static Path getRegionMergesDir(Path mergeDir, HRegionInfo hri) {
+ return new Path(mergeDir, hri.getEncodedName());
+ }
+
+ public static Path getRegionSplitsDir(Path regionDir) {
+ return new Path(regionDir, REGION_SPLITS_DIR);
+ }
+
+ public static Path getRegionSplitsDir(Path splitDir, HRegionInfo hri) {
+ return new Path(splitDir, hri.getEncodedName());
+ }
+
+ public static Path getMobDir(Path rootDir) {
+ return new Path(rootDir, MobConstants.MOB_DIR_NAME);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe2918ba/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyMasterFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyMasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyMasterFileSystem.java
index f330b6e..8f81bd4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyMasterFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyMasterFileSystem.java
@@ -28,6 +28,7 @@ import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -159,6 +160,32 @@ public class LegacyMasterFileSystem extends MasterFileSystem {
}
// ==========================================================================
+ // PUBLIC Methods - Table Regions related
+ // ==========================================================================
+ @Override
+ public Collection<HRegionInfo> getRegions(FsContext ctx, TableName tableName)
+ throws IOException {
+ FileStatus[] stats = FSUtils.listStatus(getFileSystem(),
+ getTableDir(ctx, tableName), new FSUtils.RegionDirFilter(getFileSystem()));
+ if (stats == null) return Collections.emptyList();
+
+ ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>(stats.length);
+ for (int i = 0; i < stats.length; ++i) {
+ regions.add(loadRegionInfo(stats[i].getPath()));
+ }
+ return regions;
+ }
+
+ protected HRegionInfo loadRegionInfo(Path regionDir) throws IOException {
+ FSDataInputStream in = getFileSystem().open(LegacyLayout.getRegionInfoFile(regionDir));
+ try {
+ return HRegionInfo.parseFrom(in);
+ } finally {
+ in.close();
+ }
+ }
+
+ // ==========================================================================
// PROTECTED Methods - Bootstrap
// ==========================================================================
@Override
@@ -242,7 +269,7 @@ public class LegacyMasterFileSystem extends MasterFileSystem {
}
protected Path getRegionDir(FsContext ctx, TableName table, HRegionInfo hri) {
- return LegacyLayout.getRegionDir(getBaseDirFromContext(ctx), table, hri);
+ return LegacyLayout.getRegionDir(getTableDir(ctx, table), hri);
}
public Path getTempDir() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe2918ba/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyRegionFileSystem.java
new file mode 100644
index 0000000..d07a1f1
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyRegionFileSystem.java
@@ -0,0 +1,758 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.fs.legacy;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableDescriptor;
+import org.apache.hadoop.hbase.fs.FSUtilsWithRetries;
+import org.apache.hadoop.hbase.fs.FsContext;
+import org.apache.hadoop.hbase.fs.RegionFileSystem;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.MetaUtils;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.regionserver.*;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+
+@InterfaceAudience.Private
+public class LegacyRegionFileSystem extends RegionFileSystem {
+ private static final Log LOG = LogFactory.getLog(LegacyRegionFileSystem.class);
+
+ private final Path tableDir;
+ private final Path regionDir;
+ private final Path mobDir;
+
+ // regionInfo for interacting with FS (getting encodedName, etc)
+ private final HRegionInfo regionInfoForFs;
+
+ private final FSUtilsWithRetries fsWithRetries;
+
+ public LegacyRegionFileSystem(Configuration conf, FileSystem fs, Path rootDir, HRegionInfo hri) {
+ super(conf, fs, rootDir, hri);
+
+ Path dataDir = LegacyLayout.getDataDir(rootDir);
+ this.tableDir = LegacyLayout.getTableDir(dataDir, hri.getTable());
+ this.regionDir = LegacyLayout.getRegionDir(tableDir, hri);
+ this.mobDir = LegacyLayout.getDataDir(LegacyLayout.getMobDir(rootDir));
+ this.fsWithRetries = new FSUtilsWithRetries(conf, fs);
+
+ this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(hri);
+ }
+
+ private Path getRegionDir() {
+ return regionDir;
+ }
+
+ // ==========================================================================
+ // PUBLIC Methods - Families Related
+ // ==========================================================================
+ @Override
+ public Collection<String> getFamilies() throws IOException {
+ FileSystem fs = getFileSystem();
+ FileStatus[] fds = FSUtils.listStatus(fs, regionDir, new FSUtils.FamilyDirFilter(fs));
+ if (fds == null) return null;
+
+ ArrayList<String> families = new ArrayList<String>(fds.length);
+ for (FileStatus status: fds) {
+ families.add(status.getPath().getName());
+ }
+ return families;
+ }
+
+ @Override
+ public void deleteFamily(String familyName, boolean hasMob) throws IOException {
+ // archive family store files
+ byte[] bFamilyName = Bytes.toBytes(familyName);
+
+ FileSystem fs = getFileSystem();
+ HFileArchiver.archiveFamily(fs, getConfiguration(), getRegionInfo(), tableDir, bFamilyName);
+
+ // delete the family folder
+ HRegionInfo region = getRegionInfo();
+ Path familyDir = new Path(tableDir, new Path(region.getEncodedName(), familyName));
+ if (!fsWithRetries.deleteDir(familyDir)) {
+ throw new IOException("Could not delete family "
+ + familyName + " from FileSystem for region "
+ + region.getRegionNameAsString() + "(" + region.getEncodedName()
+ + ")");
+ }
+
+ // archive and delete mob files
+ if (hasMob) {
+ Path mobTableDir = LegacyLayout.getTableDir(mobDir, getTable());
+ HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(getTable());
+ Path mobRegionDir = LegacyLayout.getRegionDir(mobTableDir, mobRegionInfo);
+ Path mobFamilyDir = LegacyLayout.getFamilyDir(mobRegionDir, familyName);
+ // archive mob family store files
+ MobUtils.archiveMobStoreFiles(getConfiguration(), getFileSystem(),
+ mobRegionInfo, mobFamilyDir, bFamilyName);
+
+ if (!fsWithRetries.deleteDir(mobFamilyDir)) {
+ throw new IOException("Could not delete mob store files for family "
+ + familyName + " from FileSystem region "
+ + mobRegionInfo.getRegionNameAsString() + "(" + mobRegionInfo.getEncodedName() + ")");
+ }
+ }
+ }
+
+ // ===========================================================================
+ // Temp Helpers
+ // ===========================================================================
+ /** @return {@link Path} to the region's temp directory, used for file creations */
+ Path getTempDir() {
+ return LegacyLayout.getRegionTempDir(regionDir);
+ }
+
+ /**
+ * Clean up any temp detritus that may have been left around from previous operation attempts.
+ */
+ void cleanupTempDir() throws IOException {
+ fsWithRetries.deleteDir(getTempDir());
+ }
+
+ // ===========================================================================
+ // Store/StoreFile Helpers
+ // ===========================================================================
+ /**
+ * Returns the directory path of the specified family
+ * @param familyName Column Family Name
+ * @return {@link Path} to the directory of the specified family
+ */
+ public Path getStoreDir(final String familyName) {
+ return LegacyLayout.getFamilyDir(getRegionDir(), familyName);
+ }
+
+ /**
+ * Create the store directory for the specified family name
+ * @param familyName Column Family Name
+ * @return {@link Path} to the directory of the specified family
+ * @throws IOException if the directory creation fails.
+ */
+ Path createStoreDir(final String familyName) throws IOException {
+ Path storeDir = getStoreDir(familyName);
+ if (!fsWithRetries.createDir(storeDir))
+ throw new IOException("Failed creating "+storeDir);
+ return storeDir;
+ }
+
+ // ==========================================================================
+ // PUBLIC Methods - Store Files related
+ // ==========================================================================
+
+ @Override
+ public Collection<StoreFileInfo> getStoreFiles(final String familyName, final boolean validate)
+ throws IOException {
+ Path familyDir = getStoreDir(familyName);
+ FileStatus[] files = FSUtils.listStatus(getFileSystem(), familyDir);
+ if (files == null) {
+ LOG.debug("No StoreFiles for: " + familyDir);
+ return null;
+ }
+
+ ArrayList<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(files.length);
+ for (FileStatus status: files) {
+ if (validate && !StoreFileInfo.isValid(status)) {
+ LOG.warn("Invalid StoreFile: " + status.getPath());
+ continue;
+ }
+ StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(getConfiguration(),
+ getFileSystem(), getRegionInfo(), regionInfoForFs, familyName, status.getPath());
+ storeFiles.add(info);
+
+ }
+ return storeFiles;
+ }
+
+
+ /**
+ * Return Qualified Path of the specified family/file
+ *
+ * @param familyName Column Family Name
+ * @param fileName File Name
+ * @return The qualified Path for the specified family/file
+ */
+ Path getStoreFilePath(final String familyName, final String fileName) {
+ Path familyDir = getStoreDir(familyName);
+ return LegacyLayout.getStoreFile(familyDir, fileName).makeQualified(getFileSystem());
+ }
+
+ /**
+ * Return the store file information of the specified family/file.
+ *
+ * @param familyName Column Family Name
+ * @param fileName File Name
+ * @return The {@link StoreFileInfo} for the specified family/file
+ */
+ StoreFileInfo getStoreFileInfo(final String familyName, final String fileName)
+ throws IOException {
+ Path familyDir = getStoreDir(familyName);
+ return ServerRegionReplicaUtil.getStoreFileInfo(getConfiguration(),
+ getFileSystem(), getRegionInfo(), regionInfoForFs, familyName,
+ LegacyLayout.getStoreFile(familyDir, fileName));
+ }
+
+ /**
+ * Returns true if the specified family has reference files
+ * @param familyName Column Family Name
+ * @return true if family contains reference files
+ * @throws IOException
+ */
+ public boolean hasReferences(final String familyName) throws IOException {
+ FileStatus[] files = FSUtils.listStatus(getFileSystem(),
+ getStoreDir(familyName), new FSUtils.ReferenceFileFilter(getFileSystem()));
+ return files != null && files.length > 0;
+ }
+
+ /**
+ * Check whether region has Reference file
+ * @param htd table desciptor of the region
+ * @return true if region has reference file
+ * @throws IOException
+ */
+ public boolean hasReferences(final HTableDescriptor htd) throws IOException {
+ for (HColumnDescriptor family : htd.getFamilies()) {
+ if (hasReferences(family.getNameAsString())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+ /**
+ * Generate a unique file name, used by createTempName() and commitStoreFile()
+ * @param suffix extra information to append to the generated name
+ * @return Unique file name
+ */
+ private static String generateUniqueName(final String suffix) {
+ String name = UUID.randomUUID().toString().replaceAll("-", "");
+ if (suffix != null) name += suffix;
+ return name;
+ }
+
+ /**
+ * Generate a unique temporary Path. Used in conjuction with commitStoreFile()
+ * to get a safer file creation.
+ * <code>
+ * Path file = fs.createTempName();
+ * ...StoreFile.Writer(file)...
+ * fs.commitStoreFile("family", file);
+ * </code>
+ *
+ * @return Unique {@link Path} of the temporary file
+ */
+ public Path createTempName() {
+ return createTempName(null);
+ }
+
+ /**
+ * Generate a unique temporary Path. Used in conjuction with commitStoreFile()
+ * to get a safer file creation.
+ * <code>
+ * Path file = fs.createTempName();
+ * ...StoreFile.Writer(file)...
+ * fs.commitStoreFile("family", file);
+ * </code>
+ *
+ * @param suffix extra information to append to the generated name
+ * @return Unique {@link Path} of the temporary file
+ */
+ public Path createTempName(final String suffix) {
+ return new Path(getTempDir(), generateUniqueName(suffix));
+ }
+
+ /**
+ * Move the file from a build/temp location to the main family store directory.
+ * @param familyName Family that will gain the file
+ * @param buildPath {@link Path} to the file to commit.
+ * @return The new {@link Path} of the committed file
+ * @throws IOException
+ */
+ public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException {
+ return commitStoreFile(familyName, buildPath, -1, false);
+ }
+
+ /**
+ * Move the file from a build/temp location to the main family store directory.
+ * @param familyName Family that will gain the file
+ * @param buildPath {@link Path} to the file to commit.
+ * @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number)
+ * @param generateNewName False if you want to keep the buildPath name
+ * @return The new {@link Path} of the committed file
+ * @throws IOException
+ */
+ private Path commitStoreFile(final String familyName, final Path buildPath,
+ final long seqNum, final boolean generateNewName) throws IOException {
+ Path storeDir = getStoreDir(familyName);
+ if(!fsWithRetries.createDir(storeDir))
+ throw new IOException("Failed creating " + storeDir);
+
+ String name = buildPath.getName();
+ if (generateNewName) {
+ name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_");
+ }
+ Path dstPath = new Path(storeDir, name);
+ if (!fsWithRetries.exists(buildPath)) {
+ throw new FileNotFoundException(buildPath.toString());
+ }
+ LOG.debug("Committing store file " + buildPath + " as " + dstPath);
+ // buildPath exists, therefore not doing an exists() check.
+ if (!fsWithRetries.rename(buildPath, dstPath)) {
+ throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
+ }
+ return dstPath;
+ }
+
+
+ /**
+ * Moves multiple store files to the relative region's family store directory.
+ * @param storeFiles list of store files divided by family
+ * @throws IOException
+ */
+ void commitStoreFiles(final Map<byte[], List<StoreFile>> storeFiles) throws IOException {
+ for (Map.Entry<byte[], List<StoreFile>> es: storeFiles.entrySet()) {
+ String familyName = Bytes.toString(es.getKey());
+ for (StoreFile sf: es.getValue()) {
+ commitStoreFile(familyName, sf.getPath());
+ }
+ }
+ }
+
+ /**
+ * Archives the specified store file from the specified family.
+ * @param familyName Family that contains the store files
+ * @param filePath {@link Path} to the store file to remove
+ * @throws IOException if the archiving fails
+ */
+ public void removeStoreFile(final String familyName, final Path filePath)
+ throws IOException {
+ HFileArchiver.archiveStoreFile(getConfiguration(), getFileSystem(), this.regionInfoForFs,
+ this.tableDir, Bytes.toBytes(familyName), filePath);
+ }
+
+ /**
+ * Closes and archives the specified store files from the specified family.
+ * @param familyName Family that contains the store files
+ * @param storeFiles set of store files to remove
+ * @throws IOException if the archiving fails
+ */
+ public void removeStoreFiles(final String familyName, final Collection<StoreFile> storeFiles)
+ throws IOException {
+ HFileArchiver.archiveStoreFiles(getConfiguration(), getFileSystem(), this.regionInfoForFs,
+ this.tableDir, Bytes.toBytes(familyName), storeFiles);
+ }
+
+ /**
+ * Bulk load: Add a specified store file to the specified family.
+ * If the source file is on the same different file-system is moved from the
+ * source location to the destination location, otherwise is copied over.
+ *
+ * @param familyName Family that will gain the file
+ * @param srcPath {@link Path} to the file to import
+ * @param seqNum Bulk Load sequence number
+ * @return The destination {@link Path} of the bulk loaded file
+ * @throws IOException
+ */
+ Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
+ throws IOException {
+ // Copy the file if it's on another filesystem
+ FileSystem fs = getFileSystem();
+ FileSystem srcFs = srcPath.getFileSystem(getConfiguration());
+ FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs;
+
+ // We can't compare FileSystem instances as equals() includes UGI instance
+ // as part of the comparison and won't work when doing SecureBulkLoad
+ // TODO deal with viewFS
+ if (!FSHDFSUtils.isSameHdfs(getConfiguration(), srcFs, desFs)) {
+ LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " +
+ "the destination store. Copying file over to destination filesystem.");
+ Path tmpPath = createTempName();
+ FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, getConfiguration());
+ LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
+ srcPath = tmpPath;
+ }
+
+ return commitStoreFile(familyName, srcPath, seqNum, true);
+ }
+
+ // ===========================================================================
+ // Splits Helpers
+ // ===========================================================================
+ /** @return {@link Path} to the temp directory used during split operations */
+ Path getSplitsDir() {
+ return LegacyLayout.getRegionSplitsDir(getRegionDir());
+ }
+
+ Path getSplitsDir(final HRegionInfo hri) {
+ return LegacyLayout.getRegionSplitsDir(getSplitsDir(), hri);
+ }
+
+ /**
+ * Clean up any split detritus that may have been left around from previous split attempts.
+ */
+ void cleanupSplitsDir() throws IOException {
+ fsWithRetries.deleteDir(getSplitsDir());
+ }
+
+ /**
+ * Clean up any split detritus that may have been left around from previous
+ * split attempts.
+ * Call this method on initial region deploy.
+ * @throws IOException
+ */
+ void cleanupAnySplitDetritus() throws IOException {
+ Path splitdir = this.getSplitsDir();
+ if (!fsWithRetries.exists(splitdir)) return;
+ // Look at the splitdir. It could have the encoded names of the daughter
+ // regions we tried to make. See if the daughter regions actually got made
+ // out under the tabledir. If here under splitdir still, then the split did
+ // not complete. Try and do cleanup. This code WILL NOT catch the case
+ // where we successfully created daughter a but regionserver crashed during
+ // the creation of region b. In this case, there'll be an orphan daughter
+ // dir in the filesystem. TOOD: Fix.
+ FileSystem fs = getFileSystem();
+ FileStatus[] daughters = FSUtils.listStatus(fs, splitdir, new FSUtils.DirFilter(fs));
+ if (daughters != null) {
+ for (int i = 0; i < daughters.length; ++i) {
+ Path daughterDir = new Path(this.tableDir, daughters[i].getPath().getName());
+ if (!fsWithRetries.deleteDir(daughterDir)) {
+ throw new IOException("Failed delete of " + daughterDir);
+ }
+ }
+ }
+ cleanupSplitsDir();
+ LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
+ }
+
+ /**
+ * Remove daughter region
+ * @param regionInfo daughter {@link HRegionInfo}
+ * @throws IOException
+ */
+ void cleanupDaughterRegion(final HRegionInfo regionInfo) throws IOException {
+ Path regionDir = LegacyLayout.getRegionDir(tableDir, regionInfo);
+ if (!fsWithRetries.deleteDir(regionDir)) {
+ throw new IOException("Failed delete of " + regionDir);
+ }
+ }
+
+ /**
+ * Commit a daughter region, moving it from the split temporary directory
+ * to the proper location in the filesystem.
+ *
+ * @param regionInfo daughter {@link org.apache.hadoop.hbase.HRegionInfo}
+ * @throws IOException
+ */
+ Path commitDaughterRegion(final HRegionInfo regionInfo)
+ throws IOException {
+ Path regionDir = LegacyLayout.getRegionDir(tableDir, regionInfo);
+ Path daughterTmpDir = this.getSplitsDir(regionInfo);
+
+ if (fsWithRetries.exists(daughterTmpDir)) {
+
+ // Write HRI to a file in case we need to recover hbase:meta
+ Path regionInfoFile = LegacyLayout.getRegionInfoFile(daughterTmpDir);
+ byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
+ writeRegionInfoFileContent(getConfiguration(), getFileSystem(), regionInfoFile, regionInfoContent);
+
+ // Move the daughter temp dir to the table dir
+ if (!fsWithRetries.rename(daughterTmpDir, regionDir)) {
+ throw new IOException("Unable to rename " + daughterTmpDir + " to " + regionDir);
+ }
+ }
+
+ return regionDir;
+ }
+
+ /**
+ * Create the region splits directory.
+ */
+ void createSplitsDir() throws IOException {
+ createTempDir(getSplitsDir());
+ }
+
+ void createTempDir(Path dir) throws IOException {
+ if (fsWithRetries.exists(dir)) {
+ LOG.info("The " + dir + " directory exists. Hence deleting it to recreate it");
+ if (!fsWithRetries.deleteDir(dir)) {
+ throw new IOException("Failed deletion of " + dir + " before creating them again.");
+ }
+ }
+ // dir doesn't exists now. No need to do an exists() call for it.
+ if (!fsWithRetries.createDir(dir)) {
+ throw new IOException("Failed create of " + dir);
+ }
+ }
+
+ /**
+ * Write out a split reference. Package local so it doesnt leak out of
+ * regionserver.
+ * @param hri {@link HRegionInfo} of the destination
+ * @param familyName Column Family Name
+ * @param f File to split.
+ * @param splitRow Split Row
+ * @param top True if we are referring to the top half of the hfile.
+ * @param splitPolicy
+ * @return Path to created reference.
+ * @throws IOException
+ */
+ Path splitStoreFile(final HRegionInfo hri, final String familyName, final StoreFile f,
+ final byte[] splitRow, final boolean top, RegionSplitPolicy splitPolicy)
+ throws IOException {
+
+ if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
+ // Check whether the split row lies in the range of the store file
+ // If it is outside the range, return directly.
+ try {
+ if (top) {
+ //check if larger than last key.
+ KeyValue splitKey = KeyValueUtil.createFirstOnRow(splitRow);
+ Cell lastKey = f.createReader().getLastKey();
+ // If lastKey is null means storefile is empty.
+ if (lastKey == null) {
+ return null;
+ }
+ if (f.getReader().getComparator().compare(splitKey, lastKey) > 0) {
+ return null;
+ }
+ } else {
+ //check if smaller than first key
+ KeyValue splitKey = KeyValueUtil.createLastOnRow(splitRow);
+ Cell firstKey = f.createReader().getFirstKey();
+ // If firstKey is null means storefile is empty.
+ if (firstKey == null) {
+ return null;
+ }
+ if (f.getReader().getComparator().compare(splitKey, firstKey) < 0) {
+ return null;
+ }
+ }
+ } finally {
+ f.closeReader(true);
+ }
+ }
+
+ Path splitDir = new Path(getSplitsDir(hri), familyName);
+ // A reference to the bottom half of the hsf store file.
+ Reference r =
+ top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
+ // Add the referred-to regions name as a dot separated suffix.
+ // See REF_NAME_REGEX regex above. The referred-to regions name is
+ // up in the path of the passed in <code>f</code> -- parentdir is family,
+ // then the directory above is the region name.
+ String parentRegionName = regionInfoForFs.getEncodedName();
+ // Write reference with same file id only with the other region name as
+ // suffix and into the new region location (under same family).
+ Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
+ return r.write(getFileSystem(), p);
+ }
+
+ // ===========================================================================
+ // Merge Helpers
+ // ===========================================================================
+ /** @return {@link Path} to the temp directory used during merge operations */
+ Path getMergesDir() {
+ return LegacyLayout.getRegionMergesDir(getRegionDir());
+ }
+
+ Path getMergesDir(final HRegionInfo hri) {
+ return LegacyLayout.getRegionMergesDir(getMergesDir(), hri);
+ }
+
+ /**
+ * Clean up any merge detritus that may have been left around from previous merge attempts.
+ */
+ void cleanupMergesDir() throws IOException {
+ fsWithRetries.deleteDir(getMergesDir());
+ }
+
+ /**
+ * Remove merged region
+ * @param mergedRegion {@link HRegionInfo}
+ * @throws IOException
+ */
+ void cleanupMergedRegion(final HRegionInfo mergedRegion) throws IOException {
+ Path regionDir = LegacyLayout.getRegionDir(tableDir, mergedRegion);
+ if (fsWithRetries.deleteDir(regionDir)) {
+ throw new IOException("Failed delete of " + regionDir);
+ }
+ }
+
+ /**
+ * Create the region merges directory.
+ * @throws IOException If merges dir already exists or we fail to create it.
+ * @see HRegionFileSystem#cleanupMergesDir()
+ */
+ void createMergesDir() throws IOException {
+ createTempDir(getMergesDir());
+ }
+
+ /**
+ * Write out a merge reference under the given merges directory. Package local
+ * so it doesnt leak out of regionserver.
+ * @param mergedRegion {@link HRegionInfo} of the merged region
+ * @param familyName Column Family Name
+ * @param f File to create reference.
+ * @param mergedDir
+ * @return Path to created reference.
+ * @throws IOException
+ */
+ Path mergeStoreFile(final HRegionInfo mergedRegion, final String familyName,
+ final StoreFile f, final Path mergedDir)
+ throws IOException {
+ Path referenceDir = new Path(new Path(mergedDir,
+ mergedRegion.getEncodedName()), familyName);
+ // A whole reference to the store file.
+ Reference r = Reference.createTopReference(regionInfoForFs.getStartKey());
+ // Add the referred-to regions name as a dot separated suffix.
+ // See REF_NAME_REGEX regex above. The referred-to regions name is
+ // up in the path of the passed in <code>f</code> -- parentdir is family,
+ // then the directory above is the region name.
+ String mergingRegionName = regionInfoForFs.getEncodedName();
+ // Write reference with same file id only with the other region name as
+ // suffix and into the new region location (under same family).
+ Path p = new Path(referenceDir, f.getPath().getName() + "."
+ + mergingRegionName);
+ return r.write(getFileSystem(), p);
+ }
+
+ /**
+ * Commit a merged region, moving it from the merges temporary directory to
+ * the proper location in the filesystem.
+ * @param mergedRegionInfo merged region {@link HRegionInfo}
+ * @throws IOException
+ */
+ void commitMergedRegion(final HRegionInfo mergedRegionInfo) throws IOException {
+ Path regionDir = new Path(this.tableDir, mergedRegionInfo.getEncodedName());
+ Path mergedRegionTmpDir = this.getMergesDir(mergedRegionInfo);
+ // Move the tmp dir in the expected location
+ if (mergedRegionTmpDir != null && fsWithRetries.exists(mergedRegionTmpDir)) {
+ if (!fsWithRetries.rename(mergedRegionTmpDir, regionDir)) {
+ throw new IOException("Unable to rename " + mergedRegionTmpDir + " to "
+ + regionDir);
+ }
+ }
+ }
+
+ // ===========================================================================
+ // Create/Open/Delete Helpers
+ // ===========================================================================
+ /**
+ * Log the current state of the region
+ * @param LOG log to output information
+ * @throws IOException if an unexpected exception occurs
+ */
+ void logFileSystemState(final Log LOG) throws IOException {
+ FSUtils.logFileSystemState(getFileSystem(), this.getRegionDir(), LOG);
+ }
+
+ /**
+ * @param hri
+ * @return Content of the file we write out to the filesystem under a region
+ * @throws IOException
+ */
+ private static byte[] getRegionInfoFileContent(final HRegionInfo hri) throws IOException {
+ return hri.toDelimitedByteArray();
+ }
+
+ /**
+ * Write the .regioninfo file on-disk.
+ */
+ private static void writeRegionInfoFileContent(final Configuration conf, final FileSystem fs,
+ final Path regionInfoFile, final byte[] content) throws IOException {
+ // First check to get the permissions
+ FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
+ // Write the RegionInfo file content
+ FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null);
+ try {
+ out.write(content);
+ } finally {
+ out.close();
+ }
+ }
+
+ // ==========================================================================
+ // PUBLIC bootstrap
+ // ==========================================================================
+ @Override
+ protected void bootstrap() throws IOException {
+ // Cleanup temporary directories
+ cleanupTempDir();
+ cleanupSplitsDir();
+ cleanupMergesDir();
+
+ // if it doesn't exists, Write HRI to a file, in case we need to recover hbase:meta
+ checkRegionInfoOnFilesystem();
+ }
+
+ private void checkRegionInfoOnFilesystem() throws IOException {
+ // TODO
+ }
+
+ @Override
+ protected void destroy() throws IOException {
+ fsWithRetries.deleteDir(regionDir);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe2918ba/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
index ba08a05..e875f30 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
@@ -677,10 +677,9 @@ public class RegionStates {
// This is RPC to meta table. It is done while we have a synchronize on
// regionstates. No progress will be made if meta is not available at this time.
// This is a cleanup task. Not critical.
- if (MetaTableAccessor.getRegion(server.getConnection(), hri.getEncodedNameAsBytes()) ==
- null) {
+ if (MetaTableAccessor.getRegion(server.getConnection(), hri.getEncodedNameAsBytes()) == null) {
regionOffline(hri);
- FSUtils.deleteRegionDir(server.getConfiguration(), hri);
+ server.getMasterFileSystem().deleteRegion(hri);
}
} catch (IOException e) {
LOG.warn("Got exception while deleting " + hri + " directories from file system.", e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe2918ba/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index 3c12045..d022c99 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -277,19 +278,48 @@ public class StoreFileInfo {
return reader;
}
+ private interface ComputeFileInfo<T> {
+ T compute(FileSystem fs, FileStatus status, long offset, long length)
+ throws IOException;
+ }
+
/**
* Compute the HDFS Block Distribution for this StoreFile
*/
public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs)
throws IOException {
+ return computeFileInfo(fs, new ComputeFileInfo<HDFSBlocksDistribution>() {
+ @Override
+ public HDFSBlocksDistribution compute(FileSystem fs, FileStatus status,
+ long offset, long length) throws IOException {
+ return FSUtils.computeHDFSBlocksDistribution(fs, status, offset, length);
+ }
+ });
+ }
+ public BlockLocation[] getFileBlockLocations(final FileSystem fs)
+ throws IOException {
+ return computeFileInfo(fs, new ComputeFileInfo<BlockLocation[]>() {
+ @Override
+ public BlockLocation[] compute(FileSystem fs, FileStatus status,
+ long offset, long length) throws IOException {
+ return fs.getFileBlockLocations(status, offset, length);
+ }
+ });
+ }
+
+ /**
+ * Compute the HDFS Block Distribution for this StoreFile
+ */
+ private <T> T computeFileInfo(final FileSystem fs,
+ final ComputeFileInfo<T> computeObj) throws IOException {
// guard against the case where we get the FileStatus from link, but by the time we
// call compute the file is moved again
if (this.link != null) {
FileNotFoundException exToThrow = null;
for (int i = 0; i < this.link.getLocations().length; i++) {
try {
- return computeHDFSBlocksDistributionInternal(fs);
+ return computeFileInfoInternal(fs, computeObj);
} catch (FileNotFoundException ex) {
// try the other location
exToThrow = ex;
@@ -297,21 +327,52 @@ public class StoreFileInfo {
}
throw exToThrow;
} else {
- return computeHDFSBlocksDistributionInternal(fs);
+ return computeFileInfoInternal(fs, computeObj);
}
}
- private HDFSBlocksDistribution computeHDFSBlocksDistributionInternal(final FileSystem fs)
+ private <T> T computeFileInfoInternal(final FileSystem fs, final ComputeFileInfo<T> computeObj)
throws IOException {
FileStatus status = getReferencedFileStatus(fs);
if (this.reference != null) {
- return computeRefFileHDFSBlockDistribution(fs, reference, status);
+ return computeRefFileInfo(fs, reference, status, computeObj);
} else {
- return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+ return computeObj.compute(fs, status, 0, status.getLen());
}
}
/**
+ * helper function to compute HDFS blocks distribution of a given reference
+ * file.For reference file, we don't compute the exact value. We use some
+ * estimate instead given it might be good enough. we assume bottom part
+ * takes the first half of reference file, top part takes the second half
+ * of the reference file. This is just estimate, given
+ * midkey ofregion != midkey of HFile, also the number and size of keys vary.
+ * If this estimate isn't good enough, we can improve it later.
+ * @param fs The FileSystem
+ * @param reference The reference
+ * @param status The reference FileStatus
+ * @return HDFS blocks distribution
+ */
+ private static <T> T computeRefFileInfo(final FileSystem fs, final Reference reference,
+ final FileStatus status, final ComputeFileInfo<T> computeObj) throws IOException {
+ if (status == null) {
+ return null;
+ }
+
+ long start = 0;
+ long length = 0;
+ if (Reference.isTopFileRegion(reference.getFileRegion())) {
+ start = status.getLen()/2;
+ length = status.getLen() - status.getLen()/2;
+ } else {
+ start = 0;
+ length = status.getLen()/2;
+ }
+ return computeObj.compute(fs, status, start, length);
+ }
+
+ /**
* Get the {@link FileStatus} of the file referenced by this StoreFileInfo
* @param fs The current file system to use.
* @return The {@link FileStatus} of the file referenced by this StoreFileInfo
@@ -496,39 +557,6 @@ public class StoreFileInfo {
return validateStoreFileName(p.getName());
}
- /**
- * helper function to compute HDFS blocks distribution of a given reference
- * file.For reference file, we don't compute the exact value. We use some
- * estimate instead given it might be good enough. we assume bottom part
- * takes the first half of reference file, top part takes the second half
- * of the reference file. This is just estimate, given
- * midkey ofregion != midkey of HFile, also the number and size of keys vary.
- * If this estimate isn't good enough, we can improve it later.
- * @param fs The FileSystem
- * @param reference The reference
- * @param status The reference FileStatus
- * @return HDFS blocks distribution
- */
- private static HDFSBlocksDistribution computeRefFileHDFSBlockDistribution(
- final FileSystem fs, final Reference reference, final FileStatus status)
- throws IOException {
- if (status == null) {
- return null;
- }
-
- long start = 0;
- long length = 0;
-
- if (Reference.isTopFileRegion(reference.getFileRegion())) {
- start = status.getLen()/2;
- length = status.getLen() - status.getLen()/2;
- } else {
- start = 0;
- length = status.getLen()/2;
- }
- return FSUtils.computeHDFSBlocksDistribution(fs, status, start, length);
- }
-
@Override
public boolean equals(Object that) {
if (this == that) return true;
http://git-wip-us.apache.org/repos/asf/hbase/blob/fe2918ba/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSRegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSRegionScanner.java
index b0af52b..d38d5c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSRegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSRegionScanner.java
@@ -25,7 +25,13 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.fs.RegionFileSystem;
+import org.apache.hadoop.hbase.fs.RegionFileSystem.StoreFileVisitor;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -41,30 +47,18 @@ import org.apache.hadoop.hbase.util.FSUtils;
class FSRegionScanner implements Runnable {
static private final Log LOG = LogFactory.getLog(FSRegionScanner.class);
- private Path regionPath;
-
- /**
- * The file system used
- */
- private FileSystem fs;
-
- /**
- * Maps each region to the RS with highest locality for that region.
- */
- private Map<String,String> regionToBestLocalityRSMapping;
+ private final RegionFileSystem rfs;
/**
* Maps region encoded names to maps of hostnames to fractional locality of
* that region on that host.
*/
- private Map<String, Map<String, Float>> regionDegreeLocalityMapping;
+ private final Map<String, Map<String, Float>> regionDegreeLocalityMapping;
- FSRegionScanner(FileSystem fs, Path regionPath,
- Map<String, String> regionToBestLocalityRSMapping,
- Map<String, Map<String, Float>> regionDegreeLocalityMapping) {
- this.fs = fs;
- this.regionPath = regionPath;
- this.regionToBestLocalityRSMapping = regionToBestLocalityRSMapping;
+ FSRegionScanner(Configuration conf, HRegionInfo hri,
+ Map<String, Map<String, Float>> regionDegreeLocalityMapping)
+ throws IOException {
+ this.rfs = RegionFileSystem.open(conf, hri, true);
this.regionDegreeLocalityMapping = regionDegreeLocalityMapping;
}
@@ -72,38 +66,17 @@ class FSRegionScanner implements Runnable {
public void run() {
try {
// empty the map for each region
- Map<String, AtomicInteger> blockCountMap = new HashMap<String, AtomicInteger>();
-
- //get table name
- String tableName = regionPath.getParent().getName();
- int totalBlkCount = 0;
+ final Map<String, AtomicInteger> blockCountMap = new HashMap<String, AtomicInteger>();
+ final AtomicInteger totalBlkCount = new AtomicInteger(0);
- // ignore null
- FileStatus[] cfList = fs.listStatus(regionPath, new FSUtils.FamilyDirFilter(fs));
- if (null == cfList) {
- return;
- }
-
- // for each cf, get all the blocks information
- for (FileStatus cfStatus : cfList) {
- if (!cfStatus.isDirectory()) {
- // skip because this is not a CF directory
- continue;
- }
-
- FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath());
- if (null == storeFileLists) {
- continue;
- }
-
- for (FileStatus storeFile : storeFileLists) {
- BlockLocation[] blkLocations =
- fs.getFileBlockLocations(storeFile, 0, storeFile.getLen());
- if (null == blkLocations) {
- continue;
- }
+ rfs.visitStoreFiles(new StoreFileVisitor() {
+ @Override
+ public void storeFile(HRegionInfo region, String family, StoreFileInfo storeFile)
+ throws IOException {
+ BlockLocation[] blkLocations = storeFile.getFileBlockLocations(rfs.getFileSystem());
+ if (blkLocations == null) return;
- totalBlkCount += blkLocations.length;
+ totalBlkCount.addAndGet(blkLocations.length);
for(BlockLocation blk: blkLocations) {
for (String host: blk.getHosts()) {
AtomicInteger count = blockCountMap.get(host);
@@ -115,36 +88,9 @@ class FSRegionScanner implements Runnable {
}
}
}
- }
-
- if (regionToBestLocalityRSMapping != null) {
- int largestBlkCount = 0;
- String hostToRun = null;
- for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) {
- String host = entry.getKey();
-
- int tmp = entry.getValue().get();
- if (tmp > largestBlkCount) {
- largestBlkCount = tmp;
- hostToRun = host;
- }
- }
-
- // empty regions could make this null
- if (null == hostToRun) {
- return;
- }
-
- if (hostToRun.endsWith(".")) {
- hostToRun = hostToRun.substring(0, hostToRun.length()-1);
- }
- String name = tableName + ":" + regionPath.getName();
- synchronized (regionToBestLocalityRSMapping) {
- regionToBestLocalityRSMapping.put(name, hostToRun);
- }
- }
+ });
- if (regionDegreeLocalityMapping != null && totalBlkCount > 0) {
+ if (regionDegreeLocalityMapping != null && totalBlkCount.get() > 0) {
Map<String, Float> hostLocalityMap = new HashMap<String, Float>();
for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) {
String host = entry.getKey();
@@ -152,12 +98,12 @@ class FSRegionScanner implements Runnable {
host = host.substring(0, host.length() - 1);
}
// Locality is fraction of blocks local to this host.
- float locality = ((float)entry.getValue().get()) / totalBlkCount;
+ float locality = ((float)entry.getValue().get()) / totalBlkCount.get();
hostLocalityMap.put(host, locality);
}
// Put the locality map into the result map, keyed by the encoded name
// of the region.
- regionDegreeLocalityMapping.put(regionPath.getName(), hostLocalityMap);
+ regionDegreeLocalityMapping.put(rfs.getRegionInfo().getEncodedName(), hostLocalityMap);
}
} catch (IOException e) {
LOG.warn("Problem scanning file system", e);