You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/02/03 22:46:51 UTC
svn commit: r1564079 - in /hbase/branches/0.96:
hbase-client/src/main/java/org/apache/hadoop/hbase/replication/
hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/
hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/
Author: mbertozzi
Date: Mon Feb 3 21:46:50 2014
New Revision: 1564079
URL: http://svn.apache.org/r1564079
Log:
HBASE-7963 HBase VerifyReplication not working when security enabled
Modified:
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java?rev=1564079&r1=1564078&r2=1564079&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java Mon Feb 3 21:46:50 2014
@@ -83,7 +83,9 @@ public class ReplicationPeersZKImpl exte
@Override
public void init() throws ReplicationException {
try {
- ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+ if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
+ ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+ }
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize replication peers", e);
}
Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1564079&r1=1564078&r2=1564079&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Mon Feb 3 21:46:50 2014
@@ -329,6 +329,32 @@ public class TableMapReduceUtil {
}
}
+ /**
+ * Obtain an authentication token, for the specified cluster, on behalf of the current user
+ * and add it to the credentials for the given map reduce job.
+ *
+ * The quorumAddress is the key to the ZK ensemble, which contains:
+ * hbase.zookeeper.quorum, hbase.zookeeper.client.port and zookeeper.znode.parent
+ *
+ * @param job The job that requires the permission.
+ * @param quorumAddress string that contains the 3 required configuratins
+ * @throws IOException When the authentication token cannot be obtained.
+ */
+ public static void initCredentialsForCluster(Job job, String quorumAddress)
+ throws IOException {
+ UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
+ if (userProvider.isHBaseSecurityEnabled()) {
+ try {
+ Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
+ ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
+ obtainAuthTokenForJob(job, peerConf, userProvider.getCurrent());
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted obtaining user authentication token");
+ Thread.interrupted();
+ }
+ }
+ }
+
private static void obtainAuthTokenForJob(Job job, Configuration conf, User user)
throws IOException, InterruptedException {
Token<AuthenticationTokenIdentifier> authToken = getAuthToken(conf, user);
Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1564079&r1=1564078&r2=1564079&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Mon Feb 3 21:46:50 2014
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.replicati
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@@ -109,35 +110,13 @@ public class VerifyReplication {
HConnectionManager.execute(new HConnectable<Void>(conf) {
@Override
public Void connect(HConnection conn) throws IOException {
- ZooKeeperWatcher localZKW = null;
- ReplicationPeer peer = null;
- try {
- localZKW = new ZooKeeperWatcher(
- conf, "VerifyReplication", new Abortable() {
- @Override public void abort(String why, Throwable e) {}
- @Override public boolean isAborted() {return false;}
- });
- ReplicationPeers rp =
- ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
- rp.init();
- Configuration peerConf = rp.getPeerConf(peerId);
- if (peerConf == null) {
- throw new IOException("Couldn't get peer conf!");
- }
- HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
- scan.setStartRow(value.getRow());
- replicatedScanner = replicatedTable.getScanner(scan);
- } catch (ReplicationException e) {
- throw new IOException(
- "An error occured while trying to connect to the remove peer cluster", e);
- } finally {
- if (peer != null) {
- peer.close();
- }
- if (localZKW != null) {
- localZKW.close();
- }
- }
+ String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
+ Configuration peerConf = HBaseConfiguration.create(conf);
+ ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
+
+ HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
+ scan.setStartRow(value.getRow());
+ replicatedScanner = replicatedTable.getScanner(scan);
return null;
}
});
@@ -160,6 +139,38 @@ public class VerifyReplication {
}
}
+ private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
+ ZooKeeperWatcher localZKW = null;
+ ReplicationPeer peer = null;
+ try {
+ localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
+ new Abortable() {
+ @Override public void abort(String why, Throwable e) {}
+ @Override public boolean isAborted() {return false;}
+ });
+
+ ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
+ rp.init();
+
+ Configuration peerConf = rp.getPeerConf(peerId);
+ if (peerConf == null) {
+ throw new IOException("Couldn't get peer conf!");
+ }
+
+ return ZKUtil.getZooKeeperClusterKey(peerConf);
+ } catch (ReplicationException e) {
+ throw new IOException(
+ "An error occured while trying to connect to the remove peer cluster", e);
+ } finally {
+ if (peer != null) {
+ peer.close();
+ }
+ if (localZKW != null) {
+ localZKW.close();
+ }
+ }
+ }
+
/**
* Sets up the actual job.
*
@@ -184,6 +195,11 @@ public class VerifyReplication {
if (families != null) {
conf.set(NAME+".families", families);
}
+
+ String peerQuorumAddress = getPeerQuorumAddress(conf);
+ conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
+ LOG.info("Peer Quorum Address: " + peerQuorumAddress);
+
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(VerifyReplication.class);
@@ -200,6 +216,10 @@ public class VerifyReplication {
}
TableMapReduceUtil.initTableMapperJob(tableName, scan,
Verifier.class, null, null, job);
+
+ // Obtain the auth token from peer cluster
+ TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress);
+
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
return job;