You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/02/03 23:25:40 UTC
svn commit: r1564109 - in
/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce:
TableMapReduceUtil.java replication/VerifyReplication.java
Author: mbertozzi
Date: Mon Feb 3 22:25:40 2014
New Revision: 1564109
URL: http://svn.apache.org/r1564109
Log:
HBASE-7963 HBase VerifyReplication not working when security enabled
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1564109&r1=1564108&r2=1564109&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Mon Feb 3 22:25:40 2014
@@ -289,7 +289,7 @@ public class TableMapReduceUtil {
}
}
-public static void initCredentials(Job job) throws IOException {
+ public static void initCredentials(Job job) throws IOException {
UserProvider provider = UserProvider.instantiate(job.getConfiguration());
if (provider.isHadoopSecurityEnabled()) {
@@ -323,6 +323,32 @@ public static void initCredentials(Job j
}
}
}
+
+ /**
+ * Obtain an authentication token, for the specified cluster, on behalf of the current user
+ * and add it to the credentials for the given map reduce job.
+ *
+ * The quorumAddress is the key to the ZK ensemble, which contains:
+ * hbase.zookeeper.quorum, hbase.zookeeper.client.port and zookeeper.znode.parent
+ *
+ * @param job The job that requires the permission.
+ * @param quorumAddress string that contains the 3 required configuratins
+ * @throws IOException When the authentication token cannot be obtained.
+ */
+ public static void initCredentialsForCluster(Job job, String quorumAddress)
+ throws IOException {
+ UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
+ if (userProvider.isHBaseSecurityEnabled()) {
+ try {
+ Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
+ ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
+ obtainAuthTokenForJob(job, peerConf, userProvider.getCurrent());
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted obtaining user authentication token");
+ Thread.interrupted();
+ }
+ }
+ }
private static void obtainAuthTokenForJob(Job job, Configuration conf, User user)
throws IOException, InterruptedException {
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1564109&r1=1564108&r2=1564109&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Mon Feb 3 22:25:40 2014
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.mapreduce
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.zookeeper.KeeperException;
@@ -110,17 +111,13 @@ public class VerifyReplication {
HConnectionManager.execute(new HConnectable<Void>(conf) {
@Override
public Void connect(HConnection conn) throws IOException {
- try {
- ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
- conn.getZooKeeperWatcher());
- ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId"));
- HTable replicatedTable = new HTable(peer.getConfiguration(),
- conf.get(NAME+".tableName"));
- scan.setStartRow(value.getRow());
- replicatedScanner = replicatedTable.getScanner(scan);
- } catch (KeeperException e) {
- throw new IOException("Got a ZK exception", e);
- }
+ String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
+ Configuration peerConf = HBaseConfiguration.create(conf);
+ ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
+
+ HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
+ scan.setStartRow(value.getRow());
+ replicatedScanner = replicatedTable.getScanner(scan);
return null;
}
});
@@ -143,6 +140,26 @@ public class VerifyReplication {
}
}
+ private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
+ HConnection conn = HConnectionManager.getConnection(conf);
+ try {
+ ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
+ conn.getZooKeeperWatcher());
+
+ ReplicationPeer peer = zk.getPeer(peerId);
+ if (peer == null) {
+ throw new IOException("Couldn't get peer conf!");
+ }
+
+ Configuration peerConf = peer.getConfiguration();
+ return ZKUtil.getZooKeeperClusterKey(peerConf);
+ } catch (KeeperException e) {
+ throw new IOException("Got a ZK exception", e);
+ } finally {
+ conn.close();
+ }
+ }
+
/**
* Sets up the actual job.
*
@@ -185,6 +202,11 @@ public class VerifyReplication {
if (families != null) {
conf.set(NAME+".families", families);
}
+
+ String peerQuorumAddress = getPeerQuorumAddress(conf);
+ conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
+ LOG.info("Peer Quorum Address: " + peerQuorumAddress);
+
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(VerifyReplication.class);
@@ -201,6 +223,10 @@ public class VerifyReplication {
}
TableMapReduceUtil.initTableMapperJob(tableName, scan,
Verifier.class, null, null, job);
+
+ // Obtain the auth token from peer cluster
+ TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress);
+
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
return job;