You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/03 02:53:51 UTC
[2/2] hbase git commit: HBASE-18784 if available,
query underlying outputstream capabilities where we need hflush/hsync.
HBASE-18784 if available, query underlying outputstream capabilities where we need hflush/hsync.
* pull things that don't rely on HDFS in hbase-server/FSUtils into hbase-common/CommonFSUtils
* refactor setStoragePolicy so that it can move into hbase-common/CommonFSUtils, as a side effect update it for Hadoop 2.8,3.0+
* refactor WALProcedureStore so that it handles its own FS interactions
* add a reflection-based lookup of stream capabilities
* call said lookup in places where we make WALs to make sure hflush/hsync is available.
* javadoc / checkstyle cleanup on changes as flagged by yetus
Signed-off-by: Chia-Ping Tsai <ch...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e79a007d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e79a007d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e79a007d
Branch: refs/heads/master
Commit: e79a007dd9810b33cd508986037e17d45b55a705
Parents: 0ff9dab
Author: Sean Busbey <bu...@apache.org>
Authored: Thu Oct 12 10:59:43 2017 -0500
Committer: Sean Busbey <bu...@apache.org>
Committed: Thu Nov 2 21:29:20 2017 -0500
----------------------------------------------------------------------
.../apache/hadoop/hbase/util/CommonFSUtils.java | 890 +++++++++++++++++++
.../hadoop/hbase/util/TestCommonFSUtils.java | 164 ++++
.../procedure2/store/wal/WALProcedureStore.java | 70 +-
.../procedure2/ProcedureTestingUtility.java | 12 +-
.../hbase/procedure2/TestChildProcedures.java | 2 +-
.../hbase/procedure2/TestProcedureEvents.java | 2 +-
.../procedure2/TestProcedureExecution.java | 2 +-
.../hbase/procedure2/TestProcedureMetrics.java | 2 +-
.../hbase/procedure2/TestProcedureNonce.java | 2 +-
.../hbase/procedure2/TestProcedureRecovery.java | 2 +-
.../procedure2/TestProcedureReplayOrder.java | 2 +-
.../procedure2/TestStateMachineProcedure.java | 2 +-
.../hbase/procedure2/TestYieldProcedures.java | 2 +-
...ProcedureWALLoaderPerformanceEvaluation.java | 2 +-
.../wal/ProcedureWALPerformanceEvaluation.java | 9 +-
.../store/wal/TestStressWALProcedureStore.java | 2 +-
.../store/wal/TestWALProcedureStore.java | 4 +-
.../org/apache/hadoop/hbase/fs/HFileSystem.java | 3 -
.../hbase/io/asyncfs/AsyncFSOutputHelper.java | 11 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 18 +-
.../hadoop/hbase/master/MasterFileSystem.java | 10 +-
.../procedure/MasterProcedureConstants.java | 3 -
.../hbase/regionserver/wal/AbstractFSWAL.java | 29 +-
.../wal/AbstractProtobufLogWriter.java | 5 +-
.../wal/AsyncProtobufLogWriter.java | 5 +-
.../regionserver/wal/ProtobufLogWriter.java | 8 +-
.../org/apache/hadoop/hbase/util/FSUtils.java | 668 +-------------
.../hadoop/hbase/wal/AsyncFSWALProvider.java | 22 +-
.../apache/hadoop/hbase/wal/FSHLogProvider.java | 22 +-
.../hbase/io/asyncfs/TestLocalAsyncOutput.java | 4 +-
.../master/assignment/MockMasterServices.java | 5 +-
.../procedure/TestMasterProcedureWalLease.java | 4 +-
.../procedure/TestWALProcedureStoreOnHDFS.java | 2 +-
.../regionserver/wal/AbstractTestFSWAL.java | 45 +-
.../regionserver/wal/AbstractTestWALReplay.java | 11 +-
.../apache/hadoop/hbase/util/TestFSUtils.java | 187 ++--
.../apache/hadoop/hbase/wal/IOTestProvider.java | 14 +-
37 files changed, 1344 insertions(+), 903 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
new file mode 100644
index 0000000..bdf148e
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
@@ -0,0 +1,890 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Utility methods for interacting with the underlying file system.
+ */
+@InterfaceAudience.Private
+public abstract class CommonFSUtils {
+ private static final Log LOG = LogFactory.getLog(CommonFSUtils.class);
+
+ /** Parameter name for HBase WAL directory */
+ public static final String HBASE_WAL_DIR = "hbase.wal.dir";
+
+ /** Full access permissions (starting point for a umask) */
+ public static final String FULL_RWX_PERMISSIONS = "777";
+
+ protected CommonFSUtils() {
+ super();
+ }
+
+ /**
+ * Compare of path component. Does not consider schema; i.e. if schemas
+ * different but <code>path</code> starts with <code>rootPath</code>,
+ * then the function returns true
+ * @param rootPath value to check for
+ * @param path subject to check
+ * @return True if <code>path</code> starts with <code>rootPath</code>
+ */
+ public static boolean isStartingWithPath(final Path rootPath, final String path) {
+ String uriRootPath = rootPath.toUri().getPath();
+ String tailUriPath = (new Path(path)).toUri().getPath();
+ return tailUriPath.startsWith(uriRootPath);
+ }
+
+ /**
+ * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
+ * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
+ * the two will equate.
+ * @param pathToSearch Path we will be trying to match against.
+ * @param pathTail what to match
+ * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
+ */
+ public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
+ return isMatchingTail(pathToSearch, new Path(pathTail));
+ }
+
+ /**
+ * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
+ * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider
+ * schema; i.e. if schemas different but path or subpath matches, the two will equate.
+ * @param pathToSearch Path we will be trying to match agains against
+ * @param pathTail what to match
+ * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
+ */
+ public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
+ if (pathToSearch.depth() != pathTail.depth()) {
+ return false;
+ }
+ Path tailPath = pathTail;
+ String tailName;
+ Path toSearch = pathToSearch;
+ String toSearchName;
+ boolean result = false;
+ do {
+ tailName = tailPath.getName();
+ if (tailName == null || tailName.length() <= 0) {
+ result = true;
+ break;
+ }
+ toSearchName = toSearch.getName();
+ if (toSearchName == null || toSearchName.length() <= 0) {
+ break;
+ }
+ // Move up a parent on each path for next go around. Path doesn't let us go off the end.
+ tailPath = tailPath.getParent();
+ toSearch = toSearch.getParent();
+ } while(tailName.equals(toSearchName));
+ return result;
+ }
+
+ /**
+ * Delete if exists.
+ * @param fs filesystem object
+ * @param dir directory to delete
+ * @return True if deleted <code>dir</code>
+ * @throws IOException e
+ */
+ public static boolean deleteDirectory(final FileSystem fs, final Path dir)
+ throws IOException {
+ return fs.exists(dir) && fs.delete(dir, true);
+ }
+
+ /**
+ * Return the number of bytes that large input files should be optimally
+ * be split into to minimize i/o time.
+ *
+ * use reflection to search for getDefaultBlockSize(Path f)
+ * if the method doesn't exist, fall back to using getDefaultBlockSize()
+ *
+ * @param fs filesystem object
+ * @return the default block size for the path's filesystem
+ * @throws IOException e
+ */
+ public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
+ Method m = null;
+ Class<? extends FileSystem> cls = fs.getClass();
+ try {
+ m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
+ } catch (NoSuchMethodException e) {
+ LOG.info("FileSystem doesn't support getDefaultBlockSize");
+ } catch (SecurityException e) {
+ LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
+ m = null; // could happen on setAccessible()
+ }
+ if (m == null) {
+ return fs.getDefaultBlockSize(path);
+ } else {
+ try {
+ Object ret = m.invoke(fs, path);
+ return ((Long)ret).longValue();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ /*
+ * Get the default replication.
+ *
+ * use reflection to search for getDefaultReplication(Path f)
+ * if the method doesn't exist, fall back to using getDefaultReplication()
+ *
+ * @param fs filesystem object
+ * @param f path of file
+ * @return default replication for the path's filesystem
+ * @throws IOException e
+ */
+ public static short getDefaultReplication(final FileSystem fs, final Path path)
+ throws IOException {
+ Method m = null;
+ Class<? extends FileSystem> cls = fs.getClass();
+ try {
+ m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
+ } catch (NoSuchMethodException e) {
+ LOG.info("FileSystem doesn't support getDefaultReplication");
+ } catch (SecurityException e) {
+ LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
+ m = null; // could happen on setAccessible()
+ }
+ if (m == null) {
+ return fs.getDefaultReplication(path);
+ } else {
+ try {
+ Object ret = m.invoke(fs, path);
+ return ((Number)ret).shortValue();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ /**
+ * Returns the default buffer size to use during writes.
+ *
+ * The size of the buffer should probably be a multiple of hardware
+ * page size (4096 on Intel x86), and it determines how much data is
+ * buffered during read and write operations.
+ *
+ * @param fs filesystem object
+ * @return default buffer size to use during writes
+ */
+ public static int getDefaultBufferSize(final FileSystem fs) {
+ return fs.getConf().getInt("io.file.buffer.size", 4096);
+ }
+
+ /**
+ * Create the specified file on the filesystem. By default, this will:
+ * <ol>
+ * <li>apply the umask in the configuration (if it is enabled)</li>
+ * <li>use the fs configured buffer size (or 4096 if not set)</li>
+ * <li>use the default replication</li>
+ * <li>use the default block size</li>
+ * <li>not track progress</li>
+ * </ol>
+ *
+ * @param fs {@link FileSystem} on which to write the file
+ * @param path {@link Path} to the file to write
+ * @param perm intial permissions
+ * @param overwrite Whether or not the created file should be overwritten.
+ * @return output stream to the created file
+ * @throws IOException if the file cannot be created
+ */
+ public static FSDataOutputStream create(FileSystem fs, Path path,
+ FsPermission perm, boolean overwrite) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
+ }
+ return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
+ getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
+ }
+
+ /**
+ * Get the file permissions specified in the configuration, if they are
+ * enabled.
+ *
+ * @param fs filesystem that the file will be created on.
+ * @param conf configuration to read for determining if permissions are
+ * enabled and which to use
+ * @param permssionConfKey property key in the configuration to use when
+ * finding the permission
+ * @return the permission to use when creating a new file on the fs. If
+ * special permissions are not specified in the configuration, then
+ * the default permissions on the the fs will be returned.
+ */
+ public static FsPermission getFilePermissions(final FileSystem fs,
+ final Configuration conf, final String permssionConfKey) {
+ boolean enablePermissions = conf.getBoolean(
+ HConstants.ENABLE_DATA_FILE_UMASK, false);
+
+ if (enablePermissions) {
+ try {
+ FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
+ // make sure that we have a mask, if not, go default.
+ String mask = conf.get(permssionConfKey);
+ if (mask == null) {
+ return FsPermission.getFileDefault();
+ }
+ // appy the umask
+ FsPermission umask = new FsPermission(mask);
+ return perm.applyUMask(umask);
+ } catch (IllegalArgumentException e) {
+ LOG.warn(
+ "Incorrect umask attempted to be created: "
+ + conf.get(permssionConfKey)
+ + ", using default file permissions.", e);
+ return FsPermission.getFileDefault();
+ }
+ }
+ return FsPermission.getFileDefault();
+ }
+
+ /**
+ * Verifies root directory path is a valid URI with a scheme
+ *
+ * @param root root directory path
+ * @return Passed <code>root</code> argument.
+ * @throws IOException if not a valid URI with a scheme
+ */
+ public static Path validateRootPath(Path root) throws IOException {
+ try {
+ URI rootURI = new URI(root.toString());
+ String scheme = rootURI.getScheme();
+ if (scheme == null) {
+ throw new IOException("Root directory does not have a scheme");
+ }
+ return root;
+ } catch (URISyntaxException e) {
+ IOException io = new IOException("Root directory path is not a valid " +
+ "URI -- check your " + HConstants.HBASE_DIR + " configuration");
+ io.initCause(e);
+ throw io;
+ }
+ }
+
+ /**
+ * Checks for the presence of the WAL log root path (using the provided conf object) in the given
+ * path. If it exists, this method removes it and returns the String representation of remaining
+ * relative path.
+ * @param path must not be null
+ * @param conf must not be null
+ * @return String representation of the remaining relative path
+ * @throws IOException from underlying filesystem
+ */
+ public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
+ Path root = getWALRootDir(conf);
+ String pathStr = path.toString();
+ // check that the path is absolute... it has the root path in it.
+ if (!pathStr.startsWith(root.toString())) {
+ return pathStr;
+ }
+ // if not, return as it is.
+ return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
+ }
+
+ /**
+ * Return the 'path' component of a Path. In Hadoop, Path is an URI. This
+ * method returns the 'path' component of a Path's URI: e.g. If a Path is
+ * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
+ * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
+ * This method is useful if you want to print out a Path without qualifying
+ * Filesystem instance.
+ * @param p Filesystem Path whose 'path' component we are to return.
+ * @return Path portion of the Filesystem
+ */
+ public static String getPath(Path p) {
+ return p.toUri().getPath();
+ }
+
+ /**
+ * @param c configuration
+ * @return {@link Path} to hbase root directory from
+ * configuration as a qualified Path.
+ * @throws IOException e
+ */
+ public static Path getRootDir(final Configuration c) throws IOException {
+ Path p = new Path(c.get(HConstants.HBASE_DIR));
+ FileSystem fs = p.getFileSystem(c);
+ return p.makeQualified(fs);
+ }
+
+ public static void setRootDir(final Configuration c, final Path root) throws IOException {
+ c.set(HConstants.HBASE_DIR, root.toString());
+ }
+
+ public static void setFsDefault(final Configuration c, final Path root) throws IOException {
+ c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+
+ }
+
+ public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
+ Path p = getRootDir(c);
+ return p.getFileSystem(c);
+ }
+
+ /**
+ * @param c configuration
+ * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
+ * configuration as a qualified Path. Defaults to HBase root dir.
+ * @throws IOException e
+ */
+ public static Path getWALRootDir(final Configuration c) throws IOException {
+ Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
+ if (!isValidWALRootDir(p, c)) {
+ return getRootDir(c);
+ }
+ FileSystem fs = p.getFileSystem(c);
+ return p.makeQualified(fs);
+ }
+
+ @VisibleForTesting
+ public static void setWALRootDir(final Configuration c, final Path root) throws IOException {
+ c.set(HBASE_WAL_DIR, root.toString());
+ }
+
+ public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
+ Path p = getWALRootDir(c);
+ return p.getFileSystem(c);
+ }
+
+ private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
+ Path rootDir = getRootDir(c);
+ if (walDir != rootDir) {
+ if (walDir.toString().startsWith(rootDir.toString() + "/")) {
+ throw new IllegalStateException("Illegal WAL directory specified. " +
+ "WAL directories are not permitted to be under the root directory if set.");
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
+ * path rootdir
+ *
+ * @param rootdir qualified path of HBase root directory
+ * @param tableName name of table
+ * @return {@link org.apache.hadoop.fs.Path} for table
+ */
+ public static Path getTableDir(Path rootdir, final TableName tableName) {
+ return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
+ tableName.getQualifierAsString());
+ }
+
+ /**
+ * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
+ * the table directory under
+ * path rootdir
+ *
+ * @param tablePath path of table
+ * @return {@link org.apache.hadoop.fs.Path} for table
+ */
+ public static TableName getTableName(Path tablePath) {
+ return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
+ }
+
+ /**
+ * Returns the {@link org.apache.hadoop.fs.Path} object representing
+ * the namespace directory under path rootdir
+ *
+ * @param rootdir qualified path of HBase root directory
+ * @param namespace namespace name
+ * @return {@link org.apache.hadoop.fs.Path} for table
+ */
+ public static Path getNamespaceDir(Path rootdir, final String namespace) {
+ return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
+ new Path(namespace)));
+ }
+
+ /**
+ * Sets storage policy for given path according to config setting.
+ * If the passed path is a directory, we'll set the storage policy for all files
+ * created in the future in said directory. Note that this change in storage
+ * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle.
+ * If we're running on a FileSystem implementation that doesn't support the given storage policy
+ * (or storage policies at all), then we'll issue a log message and continue.
+ *
+ * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
+ *
+ * @param fs We only do anything it implements a setStoragePolicy method
+ * @param conf used to look up storage policy with given key; not modified.
+ * @param path the Path whose storage policy is to be set
+ * @param policyKey Key to use pulling a policy from Configuration:
+ * e.g. HConstants.WAL_STORAGE_POLICY (hbase.wal.storage.policy).
+ * @param defaultPolicy if the configured policy is equal to this policy name, we will skip
+ * telling the FileSystem to set a storage policy.
+ */
+ public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
+ final Path path, final String policyKey, final String defaultPolicy) {
+ String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT);
+ if (storagePolicy.equals(defaultPolicy)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
+ }
+ return;
+ }
+ setStoragePolicy(fs, path, storagePolicy);
+ }
+
+ // this mapping means that under a federated FileSystem implementation, we'll
+ // only log the first failure from any of the underlying FileSystems at WARN and all others
+ // will be at DEBUG.
+ private static final Map<FileSystem, Boolean> warningMap =
+ new ConcurrentHashMap<FileSystem, Boolean>();
+
+ /**
+ * Sets storage policy for given path.
+ * If the passed path is a directory, we'll set the storage policy for all files
+ * created in the future in said directory. Note that this change in storage
+ * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle.
+ * If we're running on a version of FileSystem that doesn't support the given storage policy
+ * (or storage policies at all), then we'll issue a log message and continue.
+ *
+ * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
+ *
+ * @param fs We only do anything it implements a setStoragePolicy method
+ * @param path the Path whose storage policy is to be set
+ * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
+ * org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
+ * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
+ */
+ public static void setStoragePolicy(final FileSystem fs, final Path path,
+ final String storagePolicy) {
+ if (storagePolicy == null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("We were passed a null storagePolicy, exiting early.");
+ }
+ return;
+ }
+ final String trimmedStoragePolicy = storagePolicy.trim();
+ if (trimmedStoragePolicy.isEmpty()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("We were passed an empty storagePolicy, exiting early.");
+ }
+ return;
+ }
+ invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
+ }
+
+ /*
+ * All args have been checked and are good. Run the setStoragePolicy invocation.
+ */
+ private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
+ final String storagePolicy) {
+ Method m = null;
+ try {
+ m = fs.getClass().getDeclaredMethod("setStoragePolicy",
+ new Class<?>[] { Path.class, String.class });
+ m.setAccessible(true);
+ } catch (NoSuchMethodException e) {
+ final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584, HDFS-9345 " +
+ "not available. This is normal and expected on earlier Hadoop versions.";
+ if (!warningMap.containsKey(fs)) {
+ warningMap.put(fs, true);
+ LOG.warn(msg, e);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(msg, e);
+ }
+ m = null;
+ } catch (SecurityException e) {
+ final String msg = "No access to setStoragePolicy on FileSystem from the SecurityManager; " +
+ "HDFS-6584, HDFS-9345 not available. This is unusual and probably warrants an email " +
+ "to the user@hbase mailing list. Please be sure to include a link to your configs, and " +
+ "logs that include this message and period of time before it. Logs around service " +
+ "start up will probably be useful as well.";
+ if (!warningMap.containsKey(fs)) {
+ warningMap.put(fs, true);
+ LOG.warn(msg, e);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(msg, e);
+ }
+ m = null; // could happen on setAccessible() or getDeclaredMethod()
+ }
+ if (m != null) {
+ try {
+ m.invoke(fs, path, storagePolicy);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path);
+ }
+ } catch (Exception e) {
+ // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
+ // misuse than a runtime problem with HDFS.
+ if (!warningMap.containsKey(fs)) {
+ warningMap.put(fs, true);
+ LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " +
+ "DEBUG log level might have more details.", e);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
+ }
+ // check for lack of HDFS-7228
+ if (e instanceof InvocationTargetException) {
+ final Throwable exception = e.getCause();
+ if (exception instanceof RemoteException &&
+ HadoopIllegalArgumentException.class.getName().equals(
+ ((RemoteException)exception).getClassName())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
+ "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
+ "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
+ "more information see the 'ArchivalStorage' docs for your Hadoop release.");
+ }
+ // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
+ // that throws UnsupportedOperationException
+ } else if (exception instanceof UnsupportedOperationException) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The underlying FileSystem implementation doesn't support " +
+ "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " +
+ "appears to be present in your version of Hadoop. For more information check " +
+ "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " +
+ "specification docs from HADOOP-11981, and/or related documentation from the " +
+ "provider of the underlying FileSystem (its name should appear in the " +
+ "stacktrace that accompanies this message). Note in particular that Hadoop's " +
+ "local filesystem implementation doesn't support storage policies.", exception);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @param conf must not be null
+ * @return True if this filesystem whose scheme is 'hdfs'.
+ * @throws IOException from underlying FileSystem
+ */
+ public static boolean isHDFS(final Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ String scheme = fs.getUri().getScheme();
+ return scheme.equalsIgnoreCase("hdfs");
+ }
+
+ /**
+ * Checks if the given path is the one with 'recovered.edits' dir.
+ * @param path must not be null
+ * @return True if we recovered edits
+ */
+ public static boolean isRecoveredEdits(Path path) {
+ return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
+ }
+
+ /**
+ * @param conf must not be null
+ * @return Returns the filesystem of the hbase rootdir.
+ * @throws IOException from underlying FileSystem
+ */
+ public static FileSystem getCurrentFileSystem(Configuration conf)
+ throws IOException {
+ return getRootDir(conf).getFileSystem(conf);
+ }
+
+ /**
+ * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
+ * This accommodates differences between hadoop versions, where hadoop 1
+ * does not throw a FileNotFoundException, and return an empty FileStatus[]
+ * while Hadoop 2 will throw FileNotFoundException.
+ *
+ * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem,
+ * Path, FileStatusFilter) instead.
+ *
+ * @param fs file system
+ * @param dir directory
+ * @param filter path filter
+ * @return null if dir is empty or doesn't exist, otherwise FileStatus array
+ */
+ public static FileStatus [] listStatus(final FileSystem fs,
+ final Path dir, final PathFilter filter) throws IOException {
+ FileStatus [] status = null;
+ try {
+ status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
+ } catch (FileNotFoundException fnfe) {
+ // if directory doesn't exist, return null
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(dir + " doesn't exist");
+ }
+ }
+ if (status == null || status.length < 1) {
+ return null;
+ }
+ return status;
+ }
+
+ /**
+ * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
+ * This would accommodates differences between hadoop versions
+ *
+ * @param fs file system
+ * @param dir directory
+ * @return null if dir is empty or doesn't exist, otherwise FileStatus array
+ */
+ public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
+ return listStatus(fs, dir, null);
+ }
+
+ /**
+ * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
+ *
+ * @param fs file system
+ * @param dir directory
+ * @return LocatedFileStatus list
+ */
+ public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
+ final Path dir) throws IOException {
+ List<LocatedFileStatus> status = null;
+ try {
+ RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
+ .listFiles(dir, false);
+ while (locatedFileStatusRemoteIterator.hasNext()) {
+ if (status == null) {
+ status = Lists.newArrayList();
+ }
+ status.add(locatedFileStatusRemoteIterator.next());
+ }
+ } catch (FileNotFoundException fnfe) {
+ // if directory doesn't exist, return null
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(dir + " doesn't exist");
+ }
+ }
+ return status;
+ }
+
+ /**
+ * Calls fs.delete() and returns the value returned by the fs.delete()
+ *
+ * @param fs must not be null
+ * @param path must not be null
+ * @param recursive delete tree rooted at path
+ * @return the value returned by the fs.delete()
+ * @throws IOException from underlying FileSystem
+ */
+ public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
+ throws IOException {
+ return fs.delete(path, recursive);
+ }
+
+ /**
+ * Calls fs.exists(). Checks if the specified path exists
+ *
+ * @param fs must not be null
+ * @param path must not be null
+ * @return the value returned by fs.exists()
+ * @throws IOException from underlying FileSystem
+ */
+ public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
+ return fs.exists(path);
+ }
+
+ /**
+ * Log the current state of the filesystem from a certain root directory
+ * @param fs filesystem to investigate
+ * @param root root file/directory to start logging from
+ * @param LOG log to output information
+ * @throws IOException if an unexpected exception occurs
+ */
+ public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
+ throws IOException {
+ LOG.debug("Current file system:");
+ logFSTree(LOG, fs, root, "|-");
+ }
+
+ /**
+ * Recursive helper to log the state of the FS
+ *
+ * @see #logFileSystemState(FileSystem, Path, Log)
+ */
+ private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
+ throws IOException {
+ FileStatus[] files = listStatus(fs, root, null);
+ if (files == null) {
+ return;
+ }
+
+ for (FileStatus file : files) {
+ if (file.isDirectory()) {
+ LOG.debug(prefix + file.getPath().getName() + "/");
+ logFSTree(LOG, fs, file.getPath(), prefix + "---");
+ } else {
+ LOG.debug(prefix + file.getPath().getName());
+ }
+ }
+ }
+
+ public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
+ throws IOException {
+ // set the modify time for TimeToLive Cleaner
+ fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
+ return fs.rename(src, dest);
+ }
+
+ /**
+ * Do our short circuit read setup.
+ * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
+ * @param conf must not be null
+ */
+ public static void setupShortCircuitRead(final Configuration conf) {
+ // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
+ boolean shortCircuitSkipChecksum =
+ conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
+ boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
+ if (shortCircuitSkipChecksum) {
+ LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
+ "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
+ "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
+ assert !shortCircuitSkipChecksum; //this will fail if assertions are on
+ }
+ checkShortCircuitReadBufferSize(conf);
+ }
+
+ /**
+ * Check if short circuit read buffer size is set and if not, set it to hbase value.
+ * @param conf must not be null
+ */
+ public static void checkShortCircuitReadBufferSize(final Configuration conf) {
+ final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
+ final int notSet = -1;
+ // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
+ final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
+ int size = conf.getInt(dfsKey, notSet);
+ // If a size is set, return -- we will use it.
+ if (size != notSet) {
+ return;
+ }
+ // But short circuit buffer size is normally not set. Put in place the hbase wanted size.
+ int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
+ conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
+ }
+
+ // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
+ // not until we attempt to reference it.
+ private static class StreamCapabilities {
+ public static final boolean PRESENT;
+ public static final Class<?> CLASS;
+ public static final Method METHOD;
+ static {
+ boolean tmp = false;
+ Class<?> clazz = null;
+ Method method = null;
+ try {
+ clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities");
+ method = clazz.getMethod("hasCapability", String.class);
+ tmp = true;
+ } catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) {
+ LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " +
+ "HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " +
+ "support hflush/hsync. If you are running on top of HDFS this probably just " +
+ "means you have an older version and this can be ignored. If you are running on " +
+ "top of an alternate FileSystem implementation you should manually verify that " +
+ "hflush and hsync are implemented; otherwise you risk data loss and hard to " +
+ "diagnose errors when our assumptions are violated.");
+ LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.",
+ exception);
+ } finally {
+ PRESENT = tmp;
+ CLASS = clazz;
+ METHOD = method;
+ }
+ }
+ }
+
+ /**
+ * If our FileSystem version includes the StreamCapabilities class, check if
+ * the given stream has a particular capability.
+ * @param stream capabilities are per-stream instance, so check this one specifically. must not be
+ * null
+ * @param capability what to look for, per Hadoop Common's FileSystem docs
+ * @return true if there are no StreamCapabilities. false if there are, but this stream doesn't
+ * implement it. return result of asking the stream otherwise.
+ */
+ public static boolean hasCapability(FSDataOutputStream stream, String capability) {
+ // be consistent whether or not StreamCapabilities is present
+ if (stream == null) {
+ throw new NullPointerException("stream parameter must not be null.");
+ }
+ // If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything
+ // otherwise old versions of Hadoop will break.
+ boolean result = true;
+ if (StreamCapabilities.PRESENT) {
+ // if StreamCapabilities is present, but the stream doesn't implement it
+ // or we run into a problem invoking the method,
+ // we treat that as equivalent to not declaring anything
+ result = false;
+ if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) {
+ try {
+ result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue();
+ } catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException
+ exception) {
+ LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " +
+ "our understanding of how it's supposed to work. Please file a JIRA and include " +
+ "the following stack trace. In the mean time we're interpreting this behavior " +
+ "difference as a lack of capability support, which will probably cause a failure.",
+ exception);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Helper exception for those cases where the place where we need to check a stream capability
+ * is not where we have the needed context to explain the impact and mitigation for a lack.
+ */
+ public static class StreamLacksCapabilityException extends Exception {
+ public StreamLacksCapabilityException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ public StreamLacksCapabilityException(String message) {
+ super(message);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java
new file mode 100644
index 0000000..7ff5792
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java
@@ -0,0 +1,164 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test {@link CommonFSUtils}.
+ */
+@Category({MiscTests.class, MediumTests.class})
+public class TestCommonFSUtils {
+ private static final Log LOG = LogFactory.getLog(TestCommonFSUtils.class);
+
+ private HBaseCommonTestingUtility htu;
+ private Configuration conf;
+
+ @Before
+ public void setUp() throws IOException {
+ htu = new HBaseCommonTestingUtility();
+ conf = htu.getConfiguration();
+ }
+
+ /**
+ * Test path compare and prefix checking.
+ */
+ @Test
+ public void testMatchingTail() throws IOException {
+ Path rootdir = htu.getDataTestDir();
+ final FileSystem fs = rootdir.getFileSystem(conf);
+ assertTrue(rootdir.depth() > 1);
+ Path partPath = new Path("a", "b");
+ Path fullPath = new Path(rootdir, partPath);
+ Path fullyQualifiedPath = fs.makeQualified(fullPath);
+ assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath));
+ assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath.toString()));
+ assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullPath.toString()));
+ assertTrue(CommonFSUtils.isStartingWithPath(fullyQualifiedPath, fullPath.toString()));
+ assertFalse(CommonFSUtils.isStartingWithPath(rootdir, partPath.toString()));
+ assertFalse(CommonFSUtils.isMatchingTail(fullyQualifiedPath, partPath));
+ assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath));
+ assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath.toString()));
+ assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fs.makeQualified(fullPath)));
+ assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullyQualifiedPath.toString()));
+ assertFalse(CommonFSUtils.isMatchingTail(fullPath, new Path("x")));
+ assertFalse(CommonFSUtils.isMatchingTail(new Path("x"), fullPath));
+ }
+
+ private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize)
+ throws Exception {
+ FSDataOutputStream out = fs.create(file);
+ byte [] data = new byte[dataSize];
+ out.write(data, 0, dataSize);
+ out.close();
+ }
+
+ @Test
+ public void testSetWALRootDir() throws Exception {
+ Path p = new Path("file:///hbase/root");
+ CommonFSUtils.setWALRootDir(conf, p);
+ assertEquals(p.toString(), conf.get(CommonFSUtils.HBASE_WAL_DIR));
+ }
+
+ @Test
+ public void testGetWALRootDir() throws IOException {
+ Path root = new Path("file:///hbase/root");
+ Path walRoot = new Path("file:///hbase/logroot");
+ CommonFSUtils.setRootDir(conf, root);
+ assertEquals(CommonFSUtils.getRootDir(conf), root);
+ assertEquals(CommonFSUtils.getWALRootDir(conf), root);
+ CommonFSUtils.setWALRootDir(conf, walRoot);
+ assertEquals(CommonFSUtils.getWALRootDir(conf), walRoot);
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testGetWALRootDirIllegalWALDir() throws IOException {
+ Path root = new Path("file:///hbase/root");
+ Path invalidWALDir = new Path("file:///hbase/root/logroot");
+ CommonFSUtils.setRootDir(conf, root);
+ CommonFSUtils.setWALRootDir(conf, invalidWALDir);
+ CommonFSUtils.getWALRootDir(conf);
+ }
+
+ @Test
+ public void testRemoveWALRootPath() throws Exception {
+ CommonFSUtils.setRootDir(conf, new Path("file:///user/hbase"));
+ Path testFile = new Path(CommonFSUtils.getRootDir(conf), "test/testfile");
+ Path tmpFile = new Path("file:///test/testfile");
+ assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), "test/testfile");
+ assertEquals(CommonFSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString());
+ CommonFSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir"));
+ assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), testFile.toString());
+ Path logFile = new Path(CommonFSUtils.getWALRootDir(conf), "test/testlog");
+ assertEquals(CommonFSUtils.removeWALRootPath(logFile, conf), "test/testlog");
+ }
+
+ @Test(expected=NullPointerException.class)
+ public void streamCapabilitiesDoesNotAllowNullStream() {
+ CommonFSUtils.hasCapability(null, "hopefully any string");
+ }
+
+ private static final boolean STREAM_CAPABILITIES_IS_PRESENT;
+ static {
+ boolean tmp = false;
+ try {
+ Class.forName("org.apache.hadoop.fs.StreamCapabilities");
+ tmp = true;
+ LOG.debug("Test thought StreamCapabilities class was present.");
+ } catch (ClassNotFoundException exception) {
+ LOG.debug("Test didn't think StreamCapabilities class was present.");
+ } finally {
+ STREAM_CAPABILITIES_IS_PRESENT = tmp;
+ }
+ }
+
+ @Test
+ public void checkStreamCapabilitiesOnKnownNoopStream() throws IOException {
+ FSDataOutputStream stream = new FSDataOutputStream(new ByteArrayOutputStream(), null);
+ assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
+ "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
+ CommonFSUtils.hasCapability(stream, "hsync"));
+ assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
+ "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
+ CommonFSUtils.hasCapability(stream, "hflush"));
+ assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
+ "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
+ CommonFSUtils.hasCapability(stream, "a capability that hopefully no filesystem will " +
+ "implement."));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 974bc13..f49833c 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -46,18 +46,20 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/**
* WAL implementation of the ProcedureStore.
@@ -67,6 +69,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
public class WALProcedureStore extends ProcedureStoreBase {
private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
public static final String LOG_PREFIX = "pv2-";
+ /** Used to construct the name of the log directory for master procedures */
+ public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
+
public interface LeaseRecovery {
void recoverFileLease(FileSystem fs, Path path) throws IOException;
@@ -185,18 +190,42 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
- public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
- final LeaseRecovery leaseRecovery) {
- this(conf, fs, walDir, null, leaseRecovery);
+ public WALProcedureStore(final Configuration conf, final LeaseRecovery leaseRecovery)
+ throws IOException {
+ this(conf,
+ new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
+ new Path(CommonFSUtils.getRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME), leaseRecovery);
}
- public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
- final Path walArchiveDir, final LeaseRecovery leaseRecovery) {
- this.fs = fs;
+ @VisibleForTesting
+ public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir,
+ final LeaseRecovery leaseRecovery) throws IOException {
this.conf = conf;
+ this.leaseRecovery = leaseRecovery;
this.walDir = walDir;
this.walArchiveDir = walArchiveDir;
- this.leaseRecovery = leaseRecovery;
+ this.fs = walDir.getFileSystem(conf);
+
+ // Create the log directory for the procedure store
+ if (!fs.exists(walDir)) {
+ if (!fs.mkdirs(walDir)) {
+ throw new IOException("Unable to mkdir " + walDir);
+ }
+ }
+ // Now that it exists, set the log policy
+ CommonFSUtils.setStoragePolicy(fs, conf, walDir, HConstants.WAL_STORAGE_POLICY,
+ HConstants.DEFAULT_WAL_STORAGE_POLICY);
+
+ // Create archive dir up front. Rename won't work w/o it up on HDFS.
+ if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
+ if (this.fs.mkdirs(this.walArchiveDir)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created Procedure Store WAL archive dir " + this.walArchiveDir);
+ }
+ } else {
+ LOG.warn("Failed create of " + this.walArchiveDir);
+ }
+ }
}
@Override
@@ -247,16 +276,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
};
syncThread.start();
-
- // Create archive dir up front. Rename won't work w/o it up on HDFS.
- if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
- if (this.fs.mkdirs(this.walArchiveDir)) {
- if (LOG.isDebugEnabled()) LOG.debug("Created Procedure Store WAL archive dir " +
- this.walArchiveDir);
- } else {
- LOG.warn("Failed create of " + this.walArchiveDir);
- }
- }
}
@Override
@@ -1005,6 +1024,17 @@ public class WALProcedureStore extends ProcedureStoreBase {
LOG.warn("failed to create log file with id=" + logId, re);
return false;
}
+ // After we create the stream but before we attempt to use it at all
+ // ensure that we can provide the level of data safety we're configured
+ // to provide.
+ final String durability = useHsync ? "hsync" : "hflush";
+ if (!(CommonFSUtils.hasCapability(newStream, durability))) {
+ throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
+ " for proper operation during component failures, but the underlying filesystem does " +
+ "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY +
+ "' to set the desired level of robustness and ensure the config value of '" +
+ CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it.");
+ }
try {
ProcedureWALFormat.writeHeader(newStream, header);
startPos = newStream.getPos();
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 99d3c28..6e0c02e 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -51,14 +51,14 @@ public class ProcedureTestingUtility {
private ProcedureTestingUtility() {
}
- public static ProcedureStore createStore(final Configuration conf, final FileSystem fs,
- final Path baseDir) throws IOException {
- return createWalStore(conf, fs, baseDir);
+ public static ProcedureStore createStore(final Configuration conf, final Path dir)
+ throws IOException {
+ return createWalStore(conf, dir);
}
- public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs,
- final Path walDir) throws IOException {
- return new WALProcedureStore(conf, fs, walDir, new WALProcedureStore.LeaseRecovery() {
+ public static WALProcedureStore createWalStore(final Configuration conf, final Path dir)
+ throws IOException {
+ return new WALProcedureStore(conf, dir, null, new WALProcedureStore.LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
index 1a4dd86..4c1611a 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
@@ -60,7 +60,7 @@ public class TestChildProcedures {
logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv();
- procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
+ procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
index ce9795f..bd310fd 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
@@ -61,7 +61,7 @@ public class TestProcedureEvents {
logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv();
- procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+ procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procStore.start(1);
procExecutor.start(1, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
index 1a3f898..ed6d512 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
@@ -63,7 +63,7 @@ public class TestProcedureExecution {
assertTrue(testDir.depth() > 1);
logDir = new Path(testDir, "proc-logs");
- procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+ procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java
index 0a57efa..6246629 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java
@@ -66,7 +66,7 @@ public class TestProcedureMetrics {
logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv();
- procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
+ procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor<TestProcEnv>(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java
index ec2e54e..12a8012 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java
@@ -67,7 +67,7 @@ public class TestProcedureNonce {
logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv();
- procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
+ procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index b0f6cbc..06f8833 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -69,7 +69,7 @@ public class TestProcedureRecovery {
logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv();
- procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
+ procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
index 23ca6ba..12b2184 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
@@ -68,7 +68,7 @@ public class TestProcedureReplayOrder {
logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcedureEnv();
- procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+ procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procStore.start(NUM_THREADS);
procExecutor.start(1, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java
index 8347dbf..cbe50f2 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java
@@ -76,7 +76,7 @@ public class TestStateMachineProcedure {
fs = testDir.getFileSystem(htu.getConfiguration());
logDir = new Path(testDir, "proc-logs");
- procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+ procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
index 4882168..017992c 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
@@ -65,7 +65,7 @@ public class TestYieldProcedures {
assertTrue(testDir.depth() > 1);
logDir = new Path(testDir, "proc-logs");
- procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+ procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procRunnables = new TestScheduler();
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(),
procStore, procRunnables);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
index 5554a6c..503850d 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
@@ -126,7 +126,7 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
Path logDir = new Path(testDir, "proc-logs");
System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n");
fs.delete(logDir, true);
- store = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
+ store = ProcedureTestingUtility.createWalStore(conf, logDir);
store.start(1);
store.recoverLease();
store.load(new LoadCounter());
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
index 823972f..1a7fc80 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
@@ -93,9 +93,9 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
System.out.println("Logs directory : " + logDir.toString());
fs.delete(logDir, true);
if ("nosync".equals(syncType)) {
- store = new NoSyncWalProcedureStore(conf, fs, logDir);
+ store = new NoSyncWalProcedureStore(conf, logDir);
} else {
- store = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
+ store = ProcedureTestingUtility.createWalStore(conf, logDir);
}
store.start(numThreads);
store.recoverLease();
@@ -244,9 +244,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
}
private class NoSyncWalProcedureStore extends WALProcedureStore {
- public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs,
- final Path logDir) {
- super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
+ public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
+ super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
index 610688f..98ec114 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
@@ -75,7 +75,7 @@ public class TestStressWALProcedureStore {
assertTrue(testDir.depth() > 1);
logDir = new Path(testDir, "proc-logs");
- procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+ procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procStore.start(PROCEDURE_STORE_SLOTS);
procStore.recoverLease();
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 44c8e12..98b1b7c 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -86,7 +86,7 @@ public class TestWALProcedureStore {
setupConfig(htu.getConfiguration());
logDir = new Path(testDir, "proc-logs");
- procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+ procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procStore.start(PROCEDURE_STORE_SLOTS);
procStore.recoverLease();
procStore.load(new LoadCounter());
@@ -729,7 +729,7 @@ public class TestWALProcedureStore {
assertEquals(procs.length + 1, status.length);
// simulate another active master removing the wals
- procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir,
+ procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null,
new WALProcedureStore.LeaseRecovery() {
private int count = 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
index bb34af6..f7eb02b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
@@ -65,9 +65,6 @@ import edu.umd.cs.findbugs.annotations.Nullable;
public class HFileSystem extends FilterFileSystem {
public static final Log LOG = LogFactory.getLog(HFileSystem.class);
- /** Parameter name for HBase WAL directory */
- public static final String HBASE_WAL_DIR = "hbase.wal.dir";
-
private final FileSystem noChecksumFs; // read hfile data from storage
private final boolean useHBaseChecksum;
private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE;
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index 04bf01f..1f5462f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -56,7 +57,8 @@ public final class AsyncFSOutputHelper {
*/
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
boolean createParent, short replication, long blockSize, EventLoop eventLoop,
- Class<? extends Channel> channelClass) throws IOException {
+ Class<? extends Channel> channelClass)
+ throws IOException, CommonFSUtils.StreamLacksCapabilityException {
if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
overwrite, createParent, replication, blockSize, eventLoop, channelClass);
@@ -69,6 +71,13 @@ public final class AsyncFSOutputHelper {
} else {
fsOut = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null);
}
+ // After we create the stream but before we attempt to use it at all
+ // ensure that we can provide the level of data safety we're configured
+ // to provide.
+ if (!(CommonFSUtils.hasCapability(fsOut, "hflush") &&
+ CommonFSUtils.hasCapability(fsOut, "hsync"))) {
+ throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
+ }
final ExecutorService flushExecutor =
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 3ba31da..7a778f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -53,7 +53,6 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.ClusterStatus.Option;
@@ -1201,23 +1200,8 @@ public class HMaster extends HRegionServer implements MasterServices {
private void startProcedureExecutor() throws IOException {
final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
final Path rootDir = FSUtils.getRootDir(conf);
- final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
- MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
- final Path walArchiveDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- final FileSystem walFs = walDir.getFileSystem(conf);
-
- // Create the log directory for the procedure store
- if (!walFs.exists(walDir)) {
- if (!walFs.mkdirs(walDir)) {
- throw new IOException("Unable to mkdir " + walDir);
- }
- }
- // Now that it exists, set the log policy
- FSUtils.setStoragePolicy(walFs, conf, walDir, HConstants.WAL_STORAGE_POLICY,
- HConstants.DEFAULT_WAL_STORAGE_POLICY);
-
- procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir, walArchiveDir,
+ procedureStore = new WALProcedureStore(conf,
new MasterProcedureEnv.WALStoreLeaseRecovery(this));
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index 3b268cb..27987f6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
@@ -144,10 +144,10 @@ public class MasterFileSystem {
};
final String[] protectedSubLogDirs = new String[] {
- HConstants.HREGION_LOGDIR_NAME,
- HConstants.HREGION_OLDLOGDIR_NAME,
- HConstants.CORRUPT_DIR_NAME,
- MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR
+ HConstants.HREGION_LOGDIR_NAME,
+ HConstants.HREGION_OLDLOGDIR_NAME,
+ HConstants.CORRUPT_DIR_NAME,
+ WALProcedureStore.MASTER_PROCEDURE_LOGDIR
};
// check if the root directory exists
checkRootDir(this.rootdir, conf, this.fs);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
index 16647d2..495fab6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
@@ -24,9 +24,6 @@ import org.apache.yetus.audience.InterfaceAudience;
public final class MasterProcedureConstants {
private MasterProcedureConstants() {}
- /** Used to construct the name of the log directory for master procedures */
- public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
-
/** Number of threads used by the procedure executor */
public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads";
public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16;
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 61c7100..ad54cab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.DrainBarrier;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -356,7 +356,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
// Now that it exists, set the storage policy for the entire directory of wal files related to
// this FSHLog instance
- FSUtils.setStoragePolicy(fs, conf, this.walDir, HConstants.WAL_STORAGE_POLICY,
+ CommonFSUtils.setStoragePolicy(fs, conf, this.walDir, HConstants.WAL_STORAGE_POLICY,
HConstants.DEFAULT_WAL_STORAGE_POLICY);
this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
@@ -381,7 +381,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
};
if (failIfWALExists) {
- final FileStatus[] walFiles = FSUtils.listStatus(fs, walDir, ourFiles);
+ final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
if (null != walFiles && 0 != walFiles.length) {
throw new IOException("Target WAL already exists within directory " + walDir);
}
@@ -398,7 +398,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks
// (it costs a little x'ing bocks)
final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
- FSUtils.getDefaultBlockSize(this.fs, this.walDir));
+ CommonFSUtils.getDefaultBlockSize(this.fs, this.walDir));
this.logrollsize = (long) (blocksize
* conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
@@ -652,7 +652,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
}
LOG.info("Archiving " + p + " to " + newPath);
- if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
+ if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
throw new IOException("Unable to rename " + p + " to " + newPath);
}
// Tell our listeners that a log has been archived.
@@ -685,12 +685,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
try {
long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter);
int oldNumEntries = this.numEntries.getAndSet(0);
- final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
+ final String newPathString = (null == newPath ? null : CommonFSUtils.getPath(newPath));
if (oldPath != null) {
this.walFile2Props.put(oldPath,
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
this.totalLogSize.addAndGet(oldFileLen);
- LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries
+ LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries
+ ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
} else {
LOG.info("New WAL " + newPathString);
@@ -767,6 +767,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
cleanOldLogs();
regionsToFlush = findRegionsToForceFlush();
}
+ } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
+ // If the underlying FileSystem can't do what we ask, treat as IO failure so
+ // we'll abort.
+ throw new IOException("Underlying FileSystem can't meet stream requirements. See RS log " +
+ "for details.", exception);
} finally {
closeBarrier.endOp();
assert scope == NullScope.INSTANCE || !scope.isDetached();
@@ -794,7 +799,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* @return may be null if there are no files.
*/
protected FileStatus[] getFiles() throws IOException {
- return FSUtils.listStatus(fs, walDir, ourFiles);
+ return CommonFSUtils.listStatus(fs, walDir, ourFiles);
}
@Override
@@ -833,7 +838,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
}
- if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
+ if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
throw new IOException("Unable to rename " + file.getPath() + " to " + p);
}
// Tell our listeners that a log was archived.
@@ -843,7 +848,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
}
}
- LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.walArchiveDir));
+ LOG.debug("Moved " + files.length + " WAL file(s) to " +
+ CommonFSUtils.getPath(this.walArchiveDir));
}
LOG.info("Closed WAL: " + toString());
}
@@ -1022,7 +1028,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
- protected abstract W createWriterInstance(Path path) throws IOException;
+ protected abstract W createWriterInstance(Path path) throws IOException,
+ CommonFSUtils.StreamLacksCapabilityException;
/**
* @return old wal file size
http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
index 3747f47..256ced6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -153,7 +154,7 @@ public abstract class AbstractProtobufLogWriter {
}
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
- throws IOException {
+ throws IOException, StreamLacksCapabilityException {
this.conf = conf;
boolean doCompress = initializeCompressionContext(conf, path);
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
@@ -237,7 +238,7 @@ public abstract class AbstractProtobufLogWriter {
}
protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
- short replication, long blockSize) throws IOException;
+ short replication, long blockSize) throws IOException, StreamLacksCapabilityException;
/**
* return the file length after written.