You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/04/11 18:12:22 UTC

svn commit: r1586709 - in /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase: mapreduce/LoadIncrementalHFiles.java security/access/SecureBulkLoadEndpoint.java

Author: tedyu
Date: Fri Apr 11 16:12:22 2014
New Revision: 1586709

URL: http://svn.apache.org/r1586709
Log:
HBASE-10902 Make Secure Bulk Load work across remote secure clusters (Jerry He)


Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1586709&r1=1586708&r2=1586709&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Fri Apr 11 16:12:22 2014
@@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -113,6 +114,9 @@ public class LoadIncrementalHFiles exten
   private int maxFilesPerRegionPerFamily;
   private boolean assignSeqIds;
 
+  // Source filesystem
+  private FileSystem fs;
+  // Source delegation token
   private FsDelegationToken fsDelegationToken;
   private String bulkToken;
   private UserProvider userProvider;
@@ -164,7 +168,7 @@ public class LoadIncrementalHFiles exten
    */
   private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir)
   throws IOException {
-    FileSystem fs = hfofDir.getFileSystem(getConf());
+    fs = hfofDir.getFileSystem(getConf());
 
     if (!fs.exists(hfofDir)) {
       throw new FileNotFoundException("HFileOutputFormat dir " +
@@ -270,10 +274,10 @@ public class LoadIncrementalHFiles exten
         return;
       }
 
-      //If using secure bulk load
+      //If using secure bulk load, get source delegation token, and
       //prepare staging directory and token
       if (userProvider.isHBaseSecurityEnabled()) {
-        FileSystem fs = FileSystem.get(getConf());
+        // fs is the source filesystem
         fsDelegationToken.acquireDelegationToken(fs);
 
         bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
@@ -332,7 +336,7 @@ public class LoadIncrementalHFiles exten
         LOG.error(err);
       }
     }
-    
+
     if (queue != null && !queue.isEmpty()) {
         throw new RuntimeException("Bulk load aborted with some files not yet loaded."
           + "Please check log for more details.");
@@ -510,7 +514,6 @@ public class LoadIncrementalHFiles exten
       final Pair<byte[][], byte[][]> startEndKeys)
       throws IOException {
     final Path hfilePath = item.hfilePath;
-    final FileSystem fs = hfilePath.getFileSystem(getConf());
     HFile.Reader hfr = HFile.createReader(fs, hfilePath,
         new CacheConfig(getConf()), getConf());
     final byte[] first, last;
@@ -640,23 +643,28 @@ public class LoadIncrementalHFiles exten
           //from the staging directory back to original location
           //in user directory
           if(secureClient != null && !success) {
-            FileSystem fs = FileSystem.get(getConf());
-            for(Pair<byte[], String> el : famPaths) {
-              Path hfileStagingPath = null;
-              Path hfileOrigPath = new Path(el.getSecond());
-              try {
-                hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
+            FileSystem targetFs = FileSystem.get(getConf());
+            // Check to see if the source and target filesystems are the same
+            // If they are the same filesystem, we will try move the files back
+            // because previously we moved them to the staging directory.
+            if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
+              for(Pair<byte[], String> el : famPaths) {
+                Path hfileStagingPath = null;
+                Path hfileOrigPath = new Path(el.getSecond());
+                try {
+                  hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
                     hfileOrigPath.getName());
-                if(fs.rename(hfileStagingPath, hfileOrigPath)) {
-                  LOG.debug("Moved back file " + hfileOrigPath + " from " +
-                      hfileStagingPath);
-                } else if(fs.exists(hfileStagingPath)){
+                  if(targetFs.rename(hfileStagingPath, hfileOrigPath)) {
+                    LOG.debug("Moved back file " + hfileOrigPath + " from " +
+                        hfileStagingPath);
+                  } else if(targetFs.exists(hfileStagingPath)){
+                    LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+                        hfileStagingPath);
+                  }
+                } catch(Exception ex) {
                   LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
-                      hfileStagingPath);
+                      hfileStagingPath, ex);
                 }
-              } catch(Exception ex) {
-                LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
-                    hfileStagingPath, ex);
               }
             }
           }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java?rev=1586709&r1=1586708&r2=1586709&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java Fri Apr 11 16:12:22 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.Coprocessor;
@@ -50,7 +51,9 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.FsDelegationToken;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
 import org.apache.hadoop.hbase.util.Methods;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.Text;
@@ -83,8 +86,8 @@ import java.util.List;
  * 2. A user writes out data to his secure output directory: /user/foo/data
  * 3. A call is made to hbase to create a secret staging directory
  * which globally rwx (777): /user/staging/averylongandrandomdirectoryname
- * 4. The user makes the data world readable and writable, then moves it
- * into the random staging directory, then calls bulkLoadHFiles()
+ * 4. The user moves the data into the random staging directory,
+ * then calls bulkLoadHFiles()
  *
  * Like delegation tokens the strength of the security lies in the length
  * and randomness of the secret directory.
@@ -220,6 +223,21 @@ public class SecureBulkLoadEndpoint exte
     }
     boolean loaded = false;
     if (!bypass) {
+      // Get the target fs (HBase region server fs) delegation token
+      // Since we have checked the permission via 'preBulkLoadHFile', now let's give
+      // the 'request user' necessary token to operate on the target fs.
+      // After this point the 'doAs' user will hold two tokens, one for the source fs
+      // ('request user'), another for the target fs (HBase region server principal).
+      FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+      try {
+        targetfsDelegationToken.acquireDelegationToken(fs);
+      } catch (IOException e) {
+        ResponseConverter.setControllerException(controller, e);
+        done.run(null);
+        return;
+      }
+      ugi.addToken(targetfsDelegationToken.getUserToken());
+
       loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
         @Override
         public Boolean run() {
@@ -228,9 +246,6 @@ public class SecureBulkLoadEndpoint exte
             Configuration conf = env.getConfiguration();
             fs = FileSystem.get(conf);
             for(Pair<byte[], String> el: familyPaths) {
-              Path p = new Path(el.getSecond());
-              LOG.trace("Setting permission for: " + p);
-              fs.setPermission(p, PERM_ALL_ACCESS);
               Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
               if(!fs.exists(stageFamily)) {
                 fs.mkdirs(stageFamily);
@@ -240,7 +255,7 @@ public class SecureBulkLoadEndpoint exte
             //We call bulkLoadHFiles as requesting user
             //To enable access prior to staging
             return env.getRegion().bulkLoadHFiles(familyPaths, true,
-                new SecureBulkLoadListener(fs, bulkToken));
+                new SecureBulkLoadListener(fs, bulkToken, conf));
           } catch (Exception e) {
             LOG.error("Failed to complete bulk load", e);
           }
@@ -303,26 +318,42 @@ public class SecureBulkLoadEndpoint exte
   }
 
   private static class SecureBulkLoadListener implements HRegion.BulkLoadListener {
+    // Target filesystem
     private FileSystem fs;
     private String stagingDir;
+    private Configuration conf;
+    // Source filesystem
+    private FileSystem srcFs = null;
 
-    public SecureBulkLoadListener(FileSystem fs, String stagingDir) {
+    public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
       this.fs = fs;
       this.stagingDir = stagingDir;
+      this.conf = conf;
     }
 
     @Override
     public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
       Path p = new Path(srcPath);
       Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
+      if (srcFs == null) {
+        srcFs = FileSystem.get(p.toUri(), conf);
+      }
 
       if(!isFile(p)) {
         throw new IOException("Path does not reference a file: " + p);
       }
 
-      LOG.debug("Moving " + p + " to " + stageP);
-      if(!fs.rename(p, stageP)) {
-        throw new IOException("Failed to move HFile: " + p + " to " + stageP);
+      // Check to see if the source and target filesystems are the same
+      if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
+        LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
+            "the destination filesystem. Copying file over to destination staging dir.");
+        FileUtil.copy(srcFs, p, fs, stageP, false, conf);
+      }
+      else {
+        LOG.debug("Moving " + p + " to " + stageP);
+        if(!fs.rename(p, stageP)) {
+          throw new IOException("Failed to move HFile: " + p + " to " + stageP);
+        }
       }
       return stageP.toString();
     }
@@ -350,7 +381,7 @@ public class SecureBulkLoadEndpoint exte
      * @throws IOException
      */
     private boolean isFile(Path p) throws IOException {
-      FileStatus status = fs.getFileStatus(p);
+      FileStatus status = srcFs.getFileStatus(p);
       boolean isFile = !status.isDir();
       try {
         isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);