You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ps...@apache.org on 2020/04/06 09:54:33 UTC
[hbase] branch master updated: HBASE-24111 Enable CompactionTool
executions on non-HDFS filesystems (#1427)
This is an automated email from the ASF dual-hosted git repository.
psomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 1784938 HBASE-24111 Enable CompactionTool executions on non-HDFS filesystems (#1427)
1784938 is described below
commit 1784938af7ba2036978fb52552280fe5209b1bf0
Author: Peter Somogyi <ps...@apache.org>
AuthorDate: Mon Apr 6 11:54:19 2020 +0200
HBASE-24111 Enable CompactionTool executions on non-HDFS filesystems (#1427)
Signed-off-by: Josh Elser <el...@apache.org>
---
.../org/apache/hadoop/hbase/mapreduce/JobUtil.java | 16 ++++++
.../hadoop/hbase/regionserver/CompactionTool.java | 58 ++++++++++------------
2 files changed, 42 insertions(+), 32 deletions(-)
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/mapreduce/JobUtil.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/mapreduce/JobUtil.java
index b6d9642..b4f62b3 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/mapreduce/JobUtil.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/mapreduce/JobUtil.java
@@ -53,4 +53,20 @@ public abstract class JobUtil {
throws IOException, InterruptedException {
return JobSubmissionFiles.getStagingDir(new Cluster(conf), conf);
}
+
+ /**
+ * Initializes the staging directory and returns the qualified path.
+ *
+ * @param conf conf system configuration
+ * @return qualified staging directory path
+ * @throws IOException if the ownership on the staging directory is not as expected
+ * @throws InterruptedException if the thread getting the staging directory is interrupted
+ */
+ public static Path getQualifiedStagingDir(Configuration conf)
+ throws IOException, InterruptedException {
+ Cluster cluster = new Cluster(conf);
+ Path stagingDir = JobSubmissionFiles.getStagingDir(cluster, conf);
+ return cluster.getFileSystem().makeQualified(stagingDir);
+ }
+
}
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
index da06fff..f01e3d9 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -57,6 +58,7 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -76,7 +78,6 @@ import org.slf4j.LoggerFactory;
public class CompactionTool extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(CompactionTool.class);
- private final static String CONF_TMP_DIR = "hbase.tmp.dir";
private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
@@ -89,12 +90,10 @@ public class CompactionTool extends Configured implements Tool {
private final boolean deleteCompacted;
private final Configuration conf;
private final FileSystem fs;
- private final Path tmpDir;
public CompactionWorker(final FileSystem fs, final Configuration conf) {
this.conf = conf;
this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
- this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
this.fs = fs;
}
@@ -105,7 +104,8 @@ public class CompactionTool extends Configured implements Tool {
* @param compactOnce Execute just a single step of compaction.
* @param major Request major compaction.
*/
- public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException {
+ public void compact(final Path path, final boolean compactOnce, final boolean major)
+ throws IOException {
if (isFamilyDir(fs, path)) {
Path regionDir = path.getParent();
Path tableDir = regionDir.getParent();
@@ -150,7 +150,7 @@ public class CompactionTool extends Configured implements Tool {
private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
final RegionInfo hri, final String familyName, final boolean compactOnce,
final boolean major) throws IOException {
- HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir);
+ HStore store = getStore(conf, fs, tableDir, htd, hri, familyName);
LOG.info("Compact table=" + htd.getTableName() +
" region=" + hri.getRegionNameAsString() +
" family=" + familyName);
@@ -177,19 +177,10 @@ public class CompactionTool extends Configured implements Tool {
store.close();
}
- /**
- * Create a "mock" HStore that uses the tmpDir specified by the user and
- * the store dir to compact as source.
- */
private static HStore getStore(final Configuration conf, final FileSystem fs,
final Path tableDir, final TableDescriptor htd, final RegionInfo hri,
- final String familyName, final Path tempDir) throws IOException {
- HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) {
- @Override
- public Path getTempDir() {
- return tempDir;
- }
- };
+ final String familyName) throws IOException {
+ HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri);
HRegion region = new HRegion(regionFs, null, conf, htd, null);
return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
}
@@ -221,7 +212,7 @@ public class CompactionTool extends Configured implements Tool {
major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
try {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);
this.compactor = new CompactionWorker(fs, conf);
} catch (IOException e) {
throw new RuntimeException("Could not get the input FileSystem", e);
@@ -301,23 +292,19 @@ public class CompactionTool extends Configured implements Tool {
* The file is a TextFile with each line corrisponding to a
* store files directory to compact.
*/
- public static void createInputFile(final FileSystem fs, final Path path,
- final Set<Path> toCompactDirs) throws IOException {
+ public static List<Path> createInputFile(final FileSystem fs, final FileSystem stagingFs,
+ final Path path, final Set<Path> toCompactDirs) throws IOException {
// Extract the list of store dirs
List<Path> storeDirs = new LinkedList<>();
for (Path compactDir: toCompactDirs) {
if (isFamilyDir(fs, compactDir)) {
storeDirs.add(compactDir);
} else if (isRegionDir(fs, compactDir)) {
- for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
- storeDirs.add(familyDir);
- }
+ storeDirs.addAll(FSUtils.getFamilyDirs(fs, compactDir));
} else if (isTableDir(fs, compactDir)) {
// Lookup regions
for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
- for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
- storeDirs.add(familyDir);
- }
+ storeDirs.addAll(FSUtils.getFamilyDirs(fs, regionDir));
}
} else {
throw new IOException(
@@ -326,7 +313,7 @@ public class CompactionTool extends Configured implements Tool {
}
// Write Input File
- FSDataOutputStream stream = fs.create(path);
+ FSDataOutputStream stream = stagingFs.create(path);
LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
try {
final byte[] newLine = Bytes.toBytes("\n");
@@ -337,6 +324,7 @@ public class CompactionTool extends Configured implements Tool {
} finally {
stream.close();
}
+ return storeDirs;
}
}
@@ -361,15 +349,20 @@ public class CompactionTool extends Configured implements Tool {
// add dependencies (including HBase ones)
TableMapReduceUtil.addDependencyJars(job);
- Path stagingDir = JobUtil.getStagingDir(conf);
+ Path stagingDir = JobUtil.getQualifiedStagingDir(conf);
+ FileSystem stagingFs = stagingDir.getFileSystem(conf);
try {
// Create input file with the store dirs
Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
- CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
+ List<Path> storeDirs = CompactionInputFormat.createInputFile(fs, stagingFs,
+ inputPath, toCompactDirs);
CompactionInputFormat.addInputPath(job, inputPath);
// Initialize credential for secure cluster
TableMapReduceUtil.initCredentials(job);
+ // Despite the method name this will get delegation token for the filesystem
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+ storeDirs.toArray(new Path[0]), conf);
// Start the MR Job and wait
return job.waitForCompletion(true) ? 0 : 1;
@@ -398,7 +391,7 @@ public class CompactionTool extends Configured implements Tool {
boolean mapred = false;
Configuration conf = getConf();
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);
try {
for (int i = 0; i < args.length; ++i) {
@@ -458,14 +451,15 @@ public class CompactionTool extends Configured implements Tool {
System.err.println("Note: -D properties will be applied to the conf used. ");
System.err.println("For example: ");
System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
- System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
System.err.println();
System.err.println("Examples:");
System.err.println(" To compact the full 'TestTable' using MapReduce:");
- System.err.println(" $ hbase " + this.getClass().getName() + " -mapred hdfs://hbase/data/default/TestTable");
+ System.err.println(" $ hbase " + this.getClass().getName() +
+ " -mapred hdfs://hbase/data/default/TestTable");
System.err.println();
System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
- System.err.println(" $ hbase " + this.getClass().getName() + " hdfs://hbase/data/default/TestTable/abc/x");
+ System.err.println(" $ hbase " + this.getClass().getName() +
+ " hdfs://hbase/data/default/TestTable/abc/x");
}
public static void main(String[] args) throws Exception {