You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/02/03 23:25:40 UTC

svn commit: r1564109 - in /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce: TableMapReduceUtil.java replication/VerifyReplication.java

Author: mbertozzi
Date: Mon Feb  3 22:25:40 2014
New Revision: 1564109

URL: http://svn.apache.org/r1564109
Log:
HBASE-7963 HBase VerifyReplication not working when security enabled

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1564109&r1=1564108&r2=1564109&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Mon Feb  3 22:25:40 2014
@@ -289,7 +289,7 @@ public class TableMapReduceUtil {
     }
   }
 
-public static void initCredentials(Job job) throws IOException {
+  public static void initCredentials(Job job) throws IOException {
     UserProvider provider = UserProvider.instantiate(job.getConfiguration());
 
     if (provider.isHadoopSecurityEnabled()) {
@@ -323,6 +323,32 @@ public static void initCredentials(Job j
       }
     }
   }
+  
+  /**
+   * Obtain an authentication token, for the specified cluster, on behalf of the current user
+   * and add it to the credentials for the given map reduce job.
+   *
+   * The quorumAddress is the key to the ZK ensemble, which contains:
+   * hbase.zookeeper.quorum, hbase.zookeeper.client.port and zookeeper.znode.parent
+   *
+   * @param job The job that requires the permission.
+   * @param quorumAddress string that contains the 3 required configuratins
+   * @throws IOException When the authentication token cannot be obtained.
+   */
+  public static void initCredentialsForCluster(Job job, String quorumAddress)
+      throws IOException {
+    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
+    if (userProvider.isHBaseSecurityEnabled()) {
+      try {
+        Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
+        ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
+        obtainAuthTokenForJob(job, peerConf, userProvider.getCurrent());
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted obtaining user authentication token");
+        Thread.interrupted();
+      }
+    }
+  }
 
   private static void obtainAuthTokenForJob(Job job, Configuration conf, User user)
       throws IOException, InterruptedException {

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1564109&r1=1564108&r2=1564109&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Mon Feb  3 22:25:40 2014
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.mapreduce
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.zookeeper.KeeperException;
@@ -110,17 +111,13 @@ public class VerifyReplication {
         HConnectionManager.execute(new HConnectable<Void>(conf) {
           @Override
           public Void connect(HConnection conn) throws IOException {
-            try {
-              ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
-                  conn.getZooKeeperWatcher());
-              ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId"));
-              HTable replicatedTable = new HTable(peer.getConfiguration(),
-                  conf.get(NAME+".tableName"));
-              scan.setStartRow(value.getRow());
-              replicatedScanner = replicatedTable.getScanner(scan);
-            } catch (KeeperException e) {
-              throw new IOException("Got a ZK exception", e);
-            }
+            String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
+            Configuration peerConf = HBaseConfiguration.create(conf);
+            ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
+
+            HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
+            scan.setStartRow(value.getRow());
+            replicatedScanner = replicatedTable.getScanner(scan);
             return null;
           }
         });
@@ -143,6 +140,26 @@ public class VerifyReplication {
     }
   }
 
+  private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
+    HConnection conn = HConnectionManager.getConnection(conf);
+    try {
+      ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
+          conn.getZooKeeperWatcher());
+
+      ReplicationPeer peer = zk.getPeer(peerId);
+      if (peer == null) {
+        throw new IOException("Couldn't get peer conf!");
+      }
+
+      Configuration peerConf = peer.getConfiguration();
+      return ZKUtil.getZooKeeperClusterKey(peerConf);
+    } catch (KeeperException e) {
+      throw new IOException("Got a ZK exception", e);
+    } finally {
+      conn.close();
+    }
+  }
+
   /**
    * Sets up the actual job.
    *
@@ -185,6 +202,11 @@ public class VerifyReplication {
     if (families != null) {
       conf.set(NAME+".families", families);
     }
+
+    String peerQuorumAddress = getPeerQuorumAddress(conf);
+    conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
+    LOG.info("Peer Quorum Address: " + peerQuorumAddress);
+
     Job job = new Job(conf, NAME + "_" + tableName);
     job.setJarByClass(VerifyReplication.class);
 
@@ -201,6 +223,10 @@ public class VerifyReplication {
     }
     TableMapReduceUtil.initTableMapperJob(tableName, scan,
         Verifier.class, null, null, job);
+
+    // Obtain the auth token from peer cluster
+    TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress);
+
     job.setOutputFormatClass(NullOutputFormat.class);
     job.setNumReduceTasks(0);
     return job;