You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/04/11 18:12:22 UTC
svn commit: r1586709 - in
/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase:
mapreduce/LoadIncrementalHFiles.java
security/access/SecureBulkLoadEndpoint.java
Author: tedyu
Date: Fri Apr 11 16:12:22 2014
New Revision: 1586709
URL: http://svn.apache.org/r1586709
Log:
HBASE-10902 Make Secure Bulk Load work across remote secure clusters (Jerry He)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1586709&r1=1586708&r2=1586709&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Fri Apr 11 16:12:22 2014
@@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -113,6 +114,9 @@ public class LoadIncrementalHFiles exten
private int maxFilesPerRegionPerFamily;
private boolean assignSeqIds;
+ // Source filesystem
+ private FileSystem fs;
+ // Source delegation token
private FsDelegationToken fsDelegationToken;
private String bulkToken;
private UserProvider userProvider;
@@ -164,7 +168,7 @@ public class LoadIncrementalHFiles exten
*/
private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir)
throws IOException {
- FileSystem fs = hfofDir.getFileSystem(getConf());
+ fs = hfofDir.getFileSystem(getConf());
if (!fs.exists(hfofDir)) {
throw new FileNotFoundException("HFileOutputFormat dir " +
@@ -270,10 +274,10 @@ public class LoadIncrementalHFiles exten
return;
}
- //If using secure bulk load
+ //If using secure bulk load, get source delegation token, and
//prepare staging directory and token
if (userProvider.isHBaseSecurityEnabled()) {
- FileSystem fs = FileSystem.get(getConf());
+ // fs is the source filesystem
fsDelegationToken.acquireDelegationToken(fs);
bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
@@ -332,7 +336,7 @@ public class LoadIncrementalHFiles exten
LOG.error(err);
}
}
-
+
if (queue != null && !queue.isEmpty()) {
throw new RuntimeException("Bulk load aborted with some files not yet loaded."
+ "Please check log for more details.");
@@ -510,7 +514,6 @@ public class LoadIncrementalHFiles exten
final Pair<byte[][], byte[][]> startEndKeys)
throws IOException {
final Path hfilePath = item.hfilePath;
- final FileSystem fs = hfilePath.getFileSystem(getConf());
HFile.Reader hfr = HFile.createReader(fs, hfilePath,
new CacheConfig(getConf()), getConf());
final byte[] first, last;
@@ -640,23 +643,28 @@ public class LoadIncrementalHFiles exten
//from the staging directory back to original location
//in user directory
if(secureClient != null && !success) {
- FileSystem fs = FileSystem.get(getConf());
- for(Pair<byte[], String> el : famPaths) {
- Path hfileStagingPath = null;
- Path hfileOrigPath = new Path(el.getSecond());
- try {
- hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
+ FileSystem targetFs = FileSystem.get(getConf());
+ // Check to see if the source and target filesystems are the same
+ // If they are the same filesystem, we will try move the files back
+ // because previously we moved them to the staging directory.
+ if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
+ for(Pair<byte[], String> el : famPaths) {
+ Path hfileStagingPath = null;
+ Path hfileOrigPath = new Path(el.getSecond());
+ try {
+ hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
hfileOrigPath.getName());
- if(fs.rename(hfileStagingPath, hfileOrigPath)) {
- LOG.debug("Moved back file " + hfileOrigPath + " from " +
- hfileStagingPath);
- } else if(fs.exists(hfileStagingPath)){
+ if(targetFs.rename(hfileStagingPath, hfileOrigPath)) {
+ LOG.debug("Moved back file " + hfileOrigPath + " from " +
+ hfileStagingPath);
+ } else if(targetFs.exists(hfileStagingPath)){
+ LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+ hfileStagingPath);
+ }
+ } catch(Exception ex) {
LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
- hfileStagingPath);
+ hfileStagingPath, ex);
}
- } catch(Exception ex) {
- LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
- hfileStagingPath, ex);
}
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java?rev=1586709&r1=1586708&r2=1586709&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java Fri Apr 11 16:12:22 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.Coprocessor;
@@ -50,7 +51,9 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.Text;
@@ -83,8 +86,8 @@ import java.util.List;
* 2. A user writes out data to his secure output directory: /user/foo/data
* 3. A call is made to hbase to create a secret staging directory
* which globally rwx (777): /user/staging/averylongandrandomdirectoryname
- * 4. The user makes the data world readable and writable, then moves it
- * into the random staging directory, then calls bulkLoadHFiles()
+ * 4. The user moves the data into the random staging directory,
+ * then calls bulkLoadHFiles()
*
* Like delegation tokens the strength of the security lies in the length
* and randomness of the secret directory.
@@ -220,6 +223,21 @@ public class SecureBulkLoadEndpoint exte
}
boolean loaded = false;
if (!bypass) {
+ // Get the target fs (HBase region server fs) delegation token
+ // Since we have checked the permission via 'preBulkLoadHFile', now let's give
+ // the 'request user' necessary token to operate on the target fs.
+ // After this point the 'doAs' user will hold two tokens, one for the source fs
+ // ('request user'), another for the target fs (HBase region server principal).
+ FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+ try {
+ targetfsDelegationToken.acquireDelegationToken(fs);
+ } catch (IOException e) {
+ ResponseConverter.setControllerException(controller, e);
+ done.run(null);
+ return;
+ }
+ ugi.addToken(targetfsDelegationToken.getUserToken());
+
loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
@Override
public Boolean run() {
@@ -228,9 +246,6 @@ public class SecureBulkLoadEndpoint exte
Configuration conf = env.getConfiguration();
fs = FileSystem.get(conf);
for(Pair<byte[], String> el: familyPaths) {
- Path p = new Path(el.getSecond());
- LOG.trace("Setting permission for: " + p);
- fs.setPermission(p, PERM_ALL_ACCESS);
Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
if(!fs.exists(stageFamily)) {
fs.mkdirs(stageFamily);
@@ -240,7 +255,7 @@ public class SecureBulkLoadEndpoint exte
//We call bulkLoadHFiles as requesting user
//To enable access prior to staging
return env.getRegion().bulkLoadHFiles(familyPaths, true,
- new SecureBulkLoadListener(fs, bulkToken));
+ new SecureBulkLoadListener(fs, bulkToken, conf));
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
@@ -303,26 +318,42 @@ public class SecureBulkLoadEndpoint exte
}
private static class SecureBulkLoadListener implements HRegion.BulkLoadListener {
+ // Target filesystem
private FileSystem fs;
private String stagingDir;
+ private Configuration conf;
+ // Source filesystem
+ private FileSystem srcFs = null;
- public SecureBulkLoadListener(FileSystem fs, String stagingDir) {
+ public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
this.fs = fs;
this.stagingDir = stagingDir;
+ this.conf = conf;
}
@Override
public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
Path p = new Path(srcPath);
Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
+ if (srcFs == null) {
+ srcFs = FileSystem.get(p.toUri(), conf);
+ }
if(!isFile(p)) {
throw new IOException("Path does not reference a file: " + p);
}
- LOG.debug("Moving " + p + " to " + stageP);
- if(!fs.rename(p, stageP)) {
- throw new IOException("Failed to move HFile: " + p + " to " + stageP);
+ // Check to see if the source and target filesystems are the same
+ if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
+ LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
+ "the destination filesystem. Copying file over to destination staging dir.");
+ FileUtil.copy(srcFs, p, fs, stageP, false, conf);
+ }
+ else {
+ LOG.debug("Moving " + p + " to " + stageP);
+ if(!fs.rename(p, stageP)) {
+ throw new IOException("Failed to move HFile: " + p + " to " + stageP);
+ }
}
return stageP.toString();
}
@@ -350,7 +381,7 @@ public class SecureBulkLoadEndpoint exte
* @throws IOException
*/
private boolean isFile(Path p) throws IOException {
- FileStatus status = fs.getFileStatus(p);
+ FileStatus status = srcFs.getFileStatus(p);
boolean isFile = !status.isDir();
try {
isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);