You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/02/26 02:06:16 UTC

svn commit: r1571891 - in /hbase/branches/0.98/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/ main/java/org/apache/hadoop/hbase/security/token/ main/java/org/apache/hadoop/hbase/snapshot/ test/java/org/apache/hadoop/hbase/snapshot/

Author: mbertozzi
Date: Wed Feb 26 01:06:16 2014
New Revision: 1571891

URL: http://svn.apache.org/r1571891
Log:
HBASE-10608 Acquire the FS Delegation Token for Secure ExportSnapshot

Added:
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
Modified:
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1571891&r1=1571890&r2=1571891&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Wed Feb 26 01:06:16 2014
@@ -79,10 +79,10 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.security.FsDelegationToken;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -110,8 +110,7 @@ public class LoadIncrementalHFiles exten
   private int maxFilesPerRegionPerFamily;
   private boolean assignSeqIds;
 
-  private boolean hasForwardedToken;
-  private Token<?> userToken;
+  private FsDelegationToken fsDelegationToken;
   private String bulkToken;
   private UserProvider userProvider;
 
@@ -123,6 +122,7 @@ public class LoadIncrementalHFiles exten
     getConf().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
     this.hbAdmin = new HBaseAdmin(conf);
     this.userProvider = UserProvider.instantiate(conf);
+    this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
     assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
     maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
   }
@@ -261,19 +261,8 @@ public class LoadIncrementalHFiles exten
       //prepare staging directory and token
       if (userProvider.isHBaseSecurityEnabled()) {
         FileSystem fs = FileSystem.get(getConf());
-        //This condition is here for unit testing
-        //Since delegation token doesn't work in mini cluster
-        if (userProvider.isHadoopSecurityEnabled()) {
-          userToken = userProvider.getCurrent().getToken("HDFS_DELEGATION_TOKEN",
-                                                         fs.getCanonicalServiceName());
-          if (userToken == null) {
-            hasForwardedToken = false;
-            userToken = fs.getDelegationToken("renewer");
-          } else {
-            hasForwardedToken = true;
-            LOG.info("Use the existing token: " + userToken);
-          }
-        }
+        fsDelegationToken.acquireDelegationToken(fs);
+
         bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
       }
 
@@ -312,13 +301,8 @@ public class LoadIncrementalHFiles exten
 
     } finally {
       if (userProvider.isHBaseSecurityEnabled()) {
-        if (userToken != null && !hasForwardedToken) {
-          try {
-            userToken.cancel(getConf());
-          } catch (Exception e) {
-            LOG.warn("Failed to cancel HDFS delegation token.", e);
-          }
-        }
+        fsDelegationToken.releaseDelegationToken();
+
         if(bulkToken != null) {
           new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
         }
@@ -609,8 +593,8 @@ public class LoadIncrementalHFiles exten
           } else {
             HTable table = new HTable(conn.getConfiguration(), getTableName());
             secureClient = new SecureBulkLoadClient(table);
-            success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken,
-              getLocation().getRegionInfo().getStartKey());
+            success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(),
+              bulkToken, getLocation().getRegionInfo().getStartKey());
           }
           return success;
         } finally {

Added: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java?rev=1571891&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java (added)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java Wed Feb 26 01:06:16 2014
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Helper class to obtain a filesystem delegation token.
+ * Mainly used by Map-Reduce jobs that requires to read/write data to 
+ * a remote file-system (e.g. BulkLoad, ExportSnapshot).
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class FsDelegationToken {
+  private static final Log LOG = LogFactory.getLog(FsDelegationToken.class);
+
+  private final UserProvider userProvider;
+  private final String renewer;
+
+  private boolean hasForwardedToken = false;
+  private Token<?> userToken = null;
+  private FileSystem fs = null;
+
+  /*
+   * @param renewer the account name that is allowed to renew the token.
+   */
+  public FsDelegationToken(final UserProvider userProvider, final String renewer) {
+    this.userProvider = userProvider;
+    this.renewer = renewer;
+  }
+
+  /**
+   * Acquire the delegation token for the specified filesytem.
+   * Before requesting a new delegation token, tries to find one already available.
+   *
+   * @param fs the filesystem that requires the delegation token
+   * @throws IOException on fs.getDelegationToken() failure
+   */
+  public void acquireDelegationToken(final FileSystem fs)
+      throws IOException {
+    if (userProvider.isHadoopSecurityEnabled()) {
+      this.fs = fs;
+      userToken = userProvider.getCurrent().getToken("HDFS_DELEGATION_TOKEN",
+                                                      fs.getCanonicalServiceName());
+      if (userToken == null) {
+        hasForwardedToken = false;
+        userToken = fs.getDelegationToken(renewer);
+      } else {
+        hasForwardedToken = true;
+        LOG.info("Use the existing token: " + userToken);
+      }
+    }
+  }
+
+  /**
+   * Releases a previously acquired delegation token.
+   */
+  public void releaseDelegationToken() {
+    if (userProvider.isHBaseSecurityEnabled()) {
+      if (userToken != null && !hasForwardedToken) {
+        try {
+          userToken.cancel(this.fs.getConf());
+        } catch (Exception e) {
+          LOG.warn("Failed to cancel HDFS delegation token: " + userToken, e);
+        }
+      }
+      this.userToken = null;
+      this.fs = null;
+    }
+  }
+
+  public UserProvider getUserProvider() {
+    return userProvider;
+  }
+
+  /**
+   * @return the account name that is allowed to renew the token.
+   */
+  public String getRenewer() {
+    return renewer;
+  }
+
+  /**
+   * @return the delegation token acquired, or null in case it was not acquired
+   */
+  public Token<?> getUserToken() {
+    return userToken;
+  }
+
+  public FileSystem getFileSystem() {
+    return fs;
+  }
+}

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java?rev=1571891&r1=1571890&r2=1571891&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java Wed Feb 26 01:06:16 2014
@@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.mapreduce
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.security.FsDelegationToken;
+import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
@@ -549,7 +551,8 @@ public final class ExportSnapshot extend
   /**
    * Run Map-Reduce Job to perform the files copy.
    */
-  private boolean runCopyJob(final Path inputRoot, final Path outputRoot,
+  private boolean runCopyJob(final FileSystem inputFs, final Path inputRoot,
+      final FileSystem outputFs, final Path outputRoot,
       final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum,
       final String filesUser, final String filesGroup, final int filesMode,
       final int mappers) throws IOException, InterruptedException, ClassNotFoundException {
@@ -576,7 +579,20 @@ public final class ExportSnapshot extend
       SequenceFileInputFormat.addInputPath(job, path);
     }
 
-    return job.waitForCompletion(true);
+    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
+    FsDelegationToken inputFsToken = new FsDelegationToken(userProvider, "irenewer");
+    FsDelegationToken outputFsToken = new FsDelegationToken(userProvider, "orenewer");
+    try {
+      // Acquire the delegation Tokens
+      inputFsToken.acquireDelegationToken(inputFs);
+      outputFsToken.acquireDelegationToken(outputFs);
+
+      // Run the MR Job
+      return job.waitForCompletion(true);
+    } finally {
+      inputFsToken.releaseDelegationToken();
+      outputFsToken.releaseDelegationToken();
+    }
   }
 
   /**
@@ -689,7 +705,7 @@ public final class ExportSnapshot extend
       if (files.size() == 0) {
         LOG.warn("There are 0 store file to be copied. There may be no data in the table.");
       } else {
-        if (!runCopyJob(inputRoot, outputRoot, files, verifyChecksum,
+        if (!runCopyJob(inputFs, inputRoot, outputFs, outputRoot, files, verifyChecksum,
             filesUser, filesGroup, filesMode, mappers)) {
           throw new ExportSnapshotException("Snapshot export failed!");
         }

Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java?rev=1571891&r1=1571890&r2=1571891&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java Wed Feb 26 01:06:16 2014
@@ -66,7 +66,7 @@ import org.junit.experimental.categories
 public class TestExportSnapshot {
   private final Log LOG = LogFactory.getLog(getClass());
 
-  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
   private final static byte[] FAMILY = Bytes.toBytes("cf");
 
@@ -75,15 +75,19 @@ public class TestExportSnapshot {
   private TableName tableName;
   private HBaseAdmin admin;
 
+  public static void setUpBaseConf(Configuration conf) {
+    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+    conf.setInt("hbase.regionserver.msginterval", 100);
+    conf.setInt("hbase.client.pause", 250);
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+    conf.setBoolean("hbase.master.enabletable.roundrobin", true);
+    conf.setInt("mapreduce.map.max.attempts", 10);
+    conf.setInt("mapred.map.max.attempts", 10);
+  }
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
-    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
-    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
-    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
-    TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
-    TEST_UTIL.getConfiguration().setInt("mapreduce.map.max.attempts", 10);
-    TEST_UTIL.getConfiguration().setInt("mapred.map.max.attempts", 10);
+    setUpBaseConf(TEST_UTIL.getConfiguration());
     TEST_UTIL.startMiniCluster(3);
     TEST_UTIL.startMiniMapReduceCluster();
   }
@@ -180,7 +184,7 @@ public class TestExportSnapshot {
 
   @Test
   public void testConsecutiveExports() throws Exception {
-    Path copyDir = TEST_UTIL.getDataTestDir("export-" + System.currentTimeMillis());
+    Path copyDir = getLocalDestinationDir();
     testExportFileSystemState(tableName, snapshotName, 2, copyDir, false);
     testExportFileSystemState(tableName, snapshotName, 2, copyDir, true);
     removeExportDir(copyDir);
@@ -239,7 +243,7 @@ public class TestExportSnapshot {
 
   private void testExportFileSystemState(final TableName tableName, final byte[] snapshotName,
       int filesExpected) throws Exception {
-    Path copyDir = TEST_UTIL.getDataTestDir("export-" + System.currentTimeMillis());
+    Path copyDir = getHdfsDestinationDir();
     testExportFileSystemState(tableName, snapshotName, filesExpected, copyDir, false);
     removeExportDir(copyDir);
   }
@@ -305,7 +309,7 @@ public class TestExportSnapshot {
    */
   private int runExportAndInjectFailures(final byte[] snapshotName, boolean retry)
       throws Exception {
-    Path copyDir = TEST_UTIL.getDataTestDir("export-" + System.currentTimeMillis());
+    Path copyDir = getLocalDestinationDir();
     URI hdfsUri = FileSystem.get(TEST_UTIL.getConfiguration()).getUri();
     FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration());
     copyDir = copyDir.makeQualified(fs);
@@ -385,6 +389,19 @@ public class TestExportSnapshot {
     return files;
   }
 
+  private Path getHdfsDestinationDir() {
+    Path rootDir = TEST_UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+    Path path = new Path(new Path(rootDir, "export-test"), "export-" + System.currentTimeMillis());
+    LOG.info("HDFS export destination path: " + path);
+    return path;
+  }
+
+  private Path getLocalDestinationDir() {
+    Path path = TEST_UTIL.getDataTestDir("local-export-" + System.currentTimeMillis());
+    LOG.info("Local export destination path: " + path);
+    return path;
+  }
+
   private void removeExportDir(final Path path) throws IOException {
     FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
     FSUtils.logFileSystemState(fs, path, LOG);

Added: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java?rev=1571891&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java (added)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java Wed Feb 26 01:06:16 2014
@@ -0,0 +1,53 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.snapshot;
+
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Reruns TestExportSnapshot using ExportSnapshot in secure mode.
+ */
+@Category(LargeTests.class)
+public class TestSecureExportSnapshot extends TestExportSnapshot {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    setUpBaseConf(TEST_UTIL.getConfiguration());
+
+    // set the always on security provider
+    UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
+      HadoopSecurityEnabledUserProviderForTesting.class);
+
+    // setup configuration
+    SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
+
+    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.startMiniMapReduceCluster();
+
+    // Wait for the ACL table to become available
+    TEST_UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME.getName());
+  }
+}