You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ps...@apache.org on 2020/04/06 09:54:33 UTC

[hbase] branch master updated: HBASE-24111 Enable CompactionTool executions on non-HDFS filesystems (#1427)

This is an automated email from the ASF dual-hosted git repository.

psomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 1784938  HBASE-24111 Enable CompactionTool executions on non-HDFS filesystems (#1427)
1784938 is described below

commit 1784938af7ba2036978fb52552280fe5209b1bf0
Author: Peter Somogyi <ps...@apache.org>
AuthorDate: Mon Apr 6 11:54:19 2020 +0200

    HBASE-24111 Enable CompactionTool executions on non-HDFS filesystems (#1427)
    
    Signed-off-by: Josh Elser <el...@apache.org>
---
 .../org/apache/hadoop/hbase/mapreduce/JobUtil.java | 16 ++++++
 .../hadoop/hbase/regionserver/CompactionTool.java  | 58 ++++++++++------------
 2 files changed, 42 insertions(+), 32 deletions(-)

diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/mapreduce/JobUtil.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/mapreduce/JobUtil.java
index b6d9642..b4f62b3 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/mapreduce/JobUtil.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/mapreduce/JobUtil.java
@@ -53,4 +53,20 @@ public abstract class JobUtil {
       throws IOException, InterruptedException {
     return JobSubmissionFiles.getStagingDir(new Cluster(conf), conf);
   }
+
+  /**
+   * Initializes the staging directory and returns the qualified path.
+   *
+   * @param conf conf system configuration
+   * @return qualified staging directory path
+   * @throws IOException if the ownership on the staging directory is not as expected
+   * @throws InterruptedException if the thread getting the staging directory is interrupted
+   */
+  public static Path getQualifiedStagingDir(Configuration conf)
+    throws IOException, InterruptedException {
+    Cluster cluster = new Cluster(conf);
+    Path stagingDir = JobSubmissionFiles.getStagingDir(cluster, conf);
+    return cluster.getFileSystem().makeQualified(stagingDir);
+  }
+
 }
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
index da06fff..f01e3d9 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -57,6 +58,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -76,7 +78,6 @@ import org.slf4j.LoggerFactory;
 public class CompactionTool extends Configured implements Tool {
   private static final Logger LOG = LoggerFactory.getLogger(CompactionTool.class);
 
-  private final static String CONF_TMP_DIR = "hbase.tmp.dir";
   private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
   private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
   private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
@@ -89,12 +90,10 @@ public class CompactionTool extends Configured implements Tool {
     private final boolean deleteCompacted;
     private final Configuration conf;
     private final FileSystem fs;
-    private final Path tmpDir;
 
     public CompactionWorker(final FileSystem fs, final Configuration conf) {
       this.conf = conf;
       this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
-      this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
       this.fs = fs;
     }
 
@@ -105,7 +104,8 @@ public class CompactionTool extends Configured implements Tool {
      * @param compactOnce Execute just a single step of compaction.
      * @param major Request major compaction.
      */
-    public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException {
+    public void compact(final Path path, final boolean compactOnce, final boolean major)
+        throws IOException {
       if (isFamilyDir(fs, path)) {
         Path regionDir = path.getParent();
         Path tableDir = regionDir.getParent();
@@ -150,7 +150,7 @@ public class CompactionTool extends Configured implements Tool {
     private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
         final RegionInfo hri, final String familyName, final boolean compactOnce,
         final boolean major) throws IOException {
-      HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir);
+      HStore store = getStore(conf, fs, tableDir, htd, hri, familyName);
       LOG.info("Compact table=" + htd.getTableName() +
         " region=" + hri.getRegionNameAsString() +
         " family=" + familyName);
@@ -177,19 +177,10 @@ public class CompactionTool extends Configured implements Tool {
       store.close();
     }
 
-    /**
-     * Create a "mock" HStore that uses the tmpDir specified by the user and
-     * the store dir to compact as source.
-     */
     private static HStore getStore(final Configuration conf, final FileSystem fs,
         final Path tableDir, final TableDescriptor htd, final RegionInfo hri,
-        final String familyName, final Path tempDir) throws IOException {
-      HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) {
-        @Override
-        public Path getTempDir() {
-          return tempDir;
-        }
-      };
+        final String familyName) throws IOException {
+      HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri);
       HRegion region = new HRegion(regionFs, null, conf, htd, null);
       return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
     }
@@ -221,7 +212,7 @@ public class CompactionTool extends Configured implements Tool {
       major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
 
       try {
-        FileSystem fs = FileSystem.get(conf);
+        FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);
         this.compactor = new CompactionWorker(fs, conf);
       } catch (IOException e) {
         throw new RuntimeException("Could not get the input FileSystem", e);
@@ -301,23 +292,19 @@ public class CompactionTool extends Configured implements Tool {
      * The file is a TextFile with each line corrisponding to a
      * store files directory to compact.
      */
-    public static void createInputFile(final FileSystem fs, final Path path,
-        final Set<Path> toCompactDirs) throws IOException {
+    public static List<Path> createInputFile(final FileSystem fs, final FileSystem stagingFs,
+        final Path path, final Set<Path> toCompactDirs) throws IOException {
       // Extract the list of store dirs
       List<Path> storeDirs = new LinkedList<>();
       for (Path compactDir: toCompactDirs) {
         if (isFamilyDir(fs, compactDir)) {
           storeDirs.add(compactDir);
         } else if (isRegionDir(fs, compactDir)) {
-          for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) {
-            storeDirs.add(familyDir);
-          }
+          storeDirs.addAll(FSUtils.getFamilyDirs(fs, compactDir));
         } else if (isTableDir(fs, compactDir)) {
           // Lookup regions
           for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
-            for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) {
-              storeDirs.add(familyDir);
-            }
+            storeDirs.addAll(FSUtils.getFamilyDirs(fs, regionDir));
           }
         } else {
           throw new IOException(
@@ -326,7 +313,7 @@ public class CompactionTool extends Configured implements Tool {
       }
 
       // Write Input File
-      FSDataOutputStream stream = fs.create(path);
+      FSDataOutputStream stream = stagingFs.create(path);
       LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
       try {
         final byte[] newLine = Bytes.toBytes("\n");
@@ -337,6 +324,7 @@ public class CompactionTool extends Configured implements Tool {
       } finally {
         stream.close();
       }
+      return storeDirs;
     }
   }
 
@@ -361,15 +349,20 @@ public class CompactionTool extends Configured implements Tool {
     // add dependencies (including HBase ones)
     TableMapReduceUtil.addDependencyJars(job);
 
-    Path stagingDir = JobUtil.getStagingDir(conf);
+    Path stagingDir = JobUtil.getQualifiedStagingDir(conf);
+    FileSystem stagingFs = stagingDir.getFileSystem(conf);
     try {
       // Create input file with the store dirs
       Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
-      CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs);
+      List<Path> storeDirs = CompactionInputFormat.createInputFile(fs, stagingFs,
+          inputPath, toCompactDirs);
       CompactionInputFormat.addInputPath(job, inputPath);
 
       // Initialize credential for secure cluster
       TableMapReduceUtil.initCredentials(job);
+      // Despite the method name this will get delegation token for the filesystem
+      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+        storeDirs.toArray(new Path[0]), conf);
 
       // Start the MR Job and wait
       return job.waitForCompletion(true) ? 0 : 1;
@@ -398,7 +391,7 @@ public class CompactionTool extends Configured implements Tool {
     boolean mapred = false;
 
     Configuration conf = getConf();
-    FileSystem fs = FileSystem.get(conf);
+    FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);
 
     try {
       for (int i = 0; i < args.length; ++i) {
@@ -458,14 +451,15 @@ public class CompactionTool extends Configured implements Tool {
     System.err.println("Note: -D properties will be applied to the conf used. ");
     System.err.println("For example: ");
     System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
-    System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
     System.err.println();
     System.err.println("Examples:");
     System.err.println(" To compact the full 'TestTable' using MapReduce:");
-    System.err.println(" $ hbase " + this.getClass().getName() + " -mapred hdfs://hbase/data/default/TestTable");
+    System.err.println(" $ hbase " + this.getClass().getName() +
+      " -mapred hdfs://hbase/data/default/TestTable");
     System.err.println();
     System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
-    System.err.println(" $ hbase " + this.getClass().getName() + " hdfs://hbase/data/default/TestTable/abc/x");
+    System.err.println(" $ hbase " + this.getClass().getName() +
+      " hdfs://hbase/data/default/TestTable/abc/x");
   }
 
   public static void main(String[] args) throws Exception {