You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/02/03 22:46:51 UTC

svn commit: r1564079 - in /hbase/branches/0.96: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/

Author: mbertozzi
Date: Mon Feb  3 21:46:50 2014
New Revision: 1564079

URL: http://svn.apache.org/r1564079
Log:
HBASE-7963 HBase VerifyReplication not working when security enabled

Modified:
    hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java

Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java?rev=1564079&r1=1564078&r2=1564079&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java Mon Feb  3 21:46:50 2014
@@ -83,7 +83,9 @@ public class ReplicationPeersZKImpl exte
   @Override
   public void init() throws ReplicationException {
     try {
-      ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+      if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
+        ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+      }
     } catch (KeeperException e) {
       throw new ReplicationException("Could not initialize replication peers", e);
     }

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1564079&r1=1564078&r2=1564079&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Mon Feb  3 21:46:50 2014
@@ -329,6 +329,32 @@ public class TableMapReduceUtil {
     }
   }
 
+  /**
+   * Obtain an authentication token, for the specified cluster, on behalf of the current user
+   * and add it to the credentials for the given map reduce job.
+   *
+   * The quorumAddress is the key to the ZK ensemble, which contains:
+   * hbase.zookeeper.quorum, hbase.zookeeper.client.port and zookeeper.znode.parent
+   *
+   * @param job The job that requires the permission.
+   * @param quorumAddress string that contains the 3 required configuratins
+   * @throws IOException When the authentication token cannot be obtained.
+   */
+  public static void initCredentialsForCluster(Job job, String quorumAddress)
+      throws IOException {
+    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
+    if (userProvider.isHBaseSecurityEnabled()) {
+      try {
+        Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
+        ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
+        obtainAuthTokenForJob(job, peerConf, userProvider.getCurrent());
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted obtaining user authentication token");
+        Thread.interrupted();
+      }
+    }
+  }
+
   private static void obtainAuthTokenForJob(Job job, Configuration conf, User user)
       throws IOException, InterruptedException {
     Token<AuthenticationTokenIdentifier> authToken = getAuthToken(conf, user);

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1564079&r1=1564078&r2=1564079&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Mon Feb  3 21:46:50 2014
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.replicati
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 
@@ -109,35 +110,13 @@ public class VerifyReplication {
         HConnectionManager.execute(new HConnectable<Void>(conf) {
           @Override
           public Void connect(HConnection conn) throws IOException {
-            ZooKeeperWatcher localZKW = null;
-            ReplicationPeer peer = null;
-            try {
-              localZKW = new ZooKeeperWatcher(
-                conf, "VerifyReplication", new Abortable() {
-                @Override public void abort(String why, Throwable e) {}
-                @Override public boolean isAborted() {return false;}
-              });
-              ReplicationPeers rp =
-                  ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
-              rp.init();
-              Configuration peerConf = rp.getPeerConf(peerId);
-              if (peerConf == null) {
-                throw new IOException("Couldn't get peer conf!");
-              }
-              HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
-              scan.setStartRow(value.getRow());
-              replicatedScanner = replicatedTable.getScanner(scan);
-            } catch (ReplicationException e) {
-              throw new IOException(
-                  "An error occured while trying to connect to the remove peer cluster", e);
-            } finally {
-              if (peer != null) {
-                peer.close();
-              }
-              if (localZKW != null) {
-                localZKW.close();
-              }
-            }
+            String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
+            Configuration peerConf = HBaseConfiguration.create(conf);
+            ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
+
+            HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
+            scan.setStartRow(value.getRow());
+            replicatedScanner = replicatedTable.getScanner(scan);
             return null;
           }
         });
@@ -160,6 +139,38 @@ public class VerifyReplication {
     }
   }
 
+  private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
+    ZooKeeperWatcher localZKW = null;
+    ReplicationPeer peer = null;
+    try {
+      localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
+          new Abortable() {
+            @Override public void abort(String why, Throwable e) {}
+            @Override public boolean isAborted() {return false;}
+          });
+
+      ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
+      rp.init();
+
+      Configuration peerConf = rp.getPeerConf(peerId);
+      if (peerConf == null) {
+        throw new IOException("Couldn't get peer conf!");
+      }
+
+      return ZKUtil.getZooKeeperClusterKey(peerConf);
+    } catch (ReplicationException e) {
+      throw new IOException(
+          "An error occured while trying to connect to the remove peer cluster", e);
+    } finally {
+      if (peer != null) {
+        peer.close();
+      }
+      if (localZKW != null) {
+        localZKW.close();
+      }
+    }
+  }
+
   /**
    * Sets up the actual job.
    *
@@ -184,6 +195,11 @@ public class VerifyReplication {
     if (families != null) {
       conf.set(NAME+".families", families);
     }
+
+    String peerQuorumAddress = getPeerQuorumAddress(conf);
+    conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
+    LOG.info("Peer Quorum Address: " + peerQuorumAddress);
+
     Job job = new Job(conf, NAME + "_" + tableName);
     job.setJarByClass(VerifyReplication.class);
 
@@ -200,6 +216,10 @@ public class VerifyReplication {
     }
     TableMapReduceUtil.initTableMapperJob(tableName, scan,
         Verifier.class, null, null, job);
+
+    // Obtain the auth token from peer cluster
+    TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress);
+
     job.setOutputFormatClass(NullOutputFormat.class);
     job.setNumReduceTasks(0);
     return job;