You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/08/27 22:29:42 UTC
svn commit: r1377834 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Author: daijy
Date: Mon Aug 27 20:29:41 2012
New Revision: 1377834
URL: http://svn.apache.org/viewvc?rev=1377834&view=rev
Log:
PIG-2821: HBaseStorage should work with secure hbase
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1377834&r1=1377833&r2=1377834&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Aug 27 20:29:41 2012
@@ -519,6 +519,8 @@ PIG-2228: support partial aggregation in
BUG FIXES
+PIG-2821: HBaseStorage should work with secure hbase (rohini via daijy)
+
PIG-2859: Fix few e2e test failures (rohini via daijy)
PIG-2729: Macro expansion does not use pig.import.search.path - UnitTest borked (johannesch via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1377834&r1=1377833&r2=1377834&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Mon Aug 27 20:29:41 2012
@@ -21,11 +21,13 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
-import java.lang.reflect.Array;
+import java.lang.reflect.Method;
+import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.HashMap;
import java.util.Properties;
@@ -63,12 +65,14 @@ import org.apache.hadoop.hbase.mapreduce
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadPushDown;
@@ -79,7 +83,6 @@ import org.apache.pig.ResourceSchema.Res
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
@@ -134,10 +137,15 @@ public class HBaseStorage extends LoadFu
private final static String CASTER_PROPERTY = "pig.hbase.caster";
private final static String ASTERISK = "*";
private final static String COLON = ":";
-
+ private final static String HBASE_SECURITY_CONF_KEY = "hbase.security.authentication";
+ private final static String HBASE_CONFIG_SET = "hbase.config.set";
+ private final static String HBASE_TOKEN_SET = "hbase.token.set";
+
private List<ColumnInfo> columnInfo_ = Lists.newArrayList();
private HTable m_table;
- private Configuration m_conf;
+
+ //Use JobConf to store hbase delegation token
+ private JobConf m_conf;
private RecordReader reader;
private RecordWriter writer;
private TableOutputFormat outputFormat = null;
@@ -147,6 +155,7 @@ public class HBaseStorage extends LoadFu
private final CommandLine configuredOptions_;
private final static Options validOptions_ = new Options();
private final static CommandLineParser parser_ = new GnuParser();
+
private boolean loadRowKey_;
private String delimiter_;
private boolean ignoreWhitespace_;
@@ -250,7 +259,6 @@ public class HBaseStorage extends LoadFu
columnInfo_ = parseColumnList(columnList, delimiter_, ignoreWhitespace_);
- m_conf = HBaseConfiguration.create();
String defaultCaster = UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER);
String casterOption = configuredOptions_.getOptionValue("caster", defaultCaster);
if (STRING_CASTER.equalsIgnoreCase(casterOption)) {
@@ -536,12 +544,20 @@ public class HBaseStorage extends LoadFu
@Override
public void setLocation(String location, Job job) throws IOException {
+ Properties udfProps = getUDFProperties();
job.getConfiguration().setBoolean("pig.noSplitCombination", true);
- m_conf = initialiseHBaseClassLoaderResources(job);
+
+ initialiseHBaseClassLoaderResources(job);
+ m_conf = initializeLocalJobConfig(job);
+ String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET);
+ if (delegationTokenSet == null) {
+ addHBaseDelegationToken(m_conf, job);
+ udfProps.setProperty(HBASE_TOKEN_SET, "true");
+ }
String tablename = location;
- if (location.startsWith("hbase://")){
- tablename = location.substring(8);
+ if (location.startsWith("hbase://")) {
+ tablename = location.substring(8);
}
if (m_table == null) {
m_table = new HTable(m_conf, tablename);
@@ -549,7 +565,7 @@ public class HBaseStorage extends LoadFu
m_table.setScannerCaching(caching_);
m_conf.set(TableInputFormat.INPUT_TABLE, tablename);
- String projectedFields = getUDFProperties().getProperty( projectedFieldsName() );
+ String projectedFields = udfProps.getProperty( projectedFieldsName() );
if (projectedFields != null) {
// update columnInfo_
pushProjection((RequiredFieldList) ObjectSerializer.deserialize(projectedFields));
@@ -574,22 +590,73 @@ public class HBaseStorage extends LoadFu
m_conf.set(TableInputFormat.SCAN, convertScanToString(scan));
}
- private Configuration initialiseHBaseClassLoaderResources(Job job) throws IOException {
- Configuration hbaseConfig = initialiseHBaseConfig(job.getConfiguration());
-
+ private void initialiseHBaseClassLoaderResources(Job job) throws IOException {
// Make sure the HBase, ZooKeeper, and Guava jars get shipped.
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
org.apache.hadoop.hbase.client.HTable.class,
com.google.common.collect.Lists.class,
org.apache.zookeeper.ZooKeeper.class);
- return hbaseConfig;
}
- private Configuration initialiseHBaseConfig(Configuration conf) {
- Configuration hbaseConfig = HBaseConfiguration.create();
- ConfigurationUtil.mergeConf(hbaseConfig, conf);
- return hbaseConfig;
+ private JobConf initializeLocalJobConfig(Job job) {
+ Properties udfProps = getUDFProperties();
+ Configuration jobConf = job.getConfiguration();
+ JobConf localConf = new JobConf(jobConf);
+ if (udfProps.containsKey(HBASE_CONFIG_SET)) {
+ for (Entry<Object, Object> entry : udfProps.entrySet()) {
+ localConf.set((String) entry.getKey(), (String) entry.getValue());
+ }
+ } else {
+ Configuration hbaseConf = HBaseConfiguration.create();
+ for (Entry<String, String> entry : hbaseConf) {
+ // JobConf may have some conf overriding ones in hbase-site.xml
+ // So only copy hbase config not in job config to UDFContext
+ // Also avoids copying core-default.xml and core-site.xml
+ // props in hbaseConf to UDFContext which would be redundant.
+ if (jobConf.get(entry.getKey()) == null) {
+ udfProps.setProperty(entry.getKey(), entry.getValue());
+ localConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+ udfProps.setProperty(HBASE_CONFIG_SET, "true");
+ }
+ return localConf;
+ }
+
+ /**
+ * Get delegation token from hbase and add it to the Job
+ *
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void addHBaseDelegationToken(Configuration hbaseConf, Job job) {
+
+ if (!UDFContext.getUDFContext().isFrontend()) {
+ return;
+ }
+
+ if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) {
+ try {
+ // getCurrentUser method is not public in 0.20.2
+ Method m1 = UserGroupInformation.class.getMethod("getCurrentUser");
+ UserGroupInformation currentUser = (UserGroupInformation) m1.invoke(null,(Object[]) null);
+ // Class and method are available only from 0.92 security release
+ Class tokenUtilClass = Class
+ .forName("org.apache.hadoop.hbase.security.token.TokenUtil");
+ Method m2 = tokenUtilClass.getMethod("obtainTokenForJob",
+ new Class[] { Configuration.class, UserGroupInformation.class, Job.class });
+ m2.invoke(null,
+ new Object[] { hbaseConf, currentUser, job });
+ } catch (ClassNotFoundException cnfe) {
+ throw new RuntimeException("Failure loading TokenUtil class, "
+ + "is secure RPC available?", cnfe);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new UndeclaredThrowableException(e,
+ "Unexpected error calling TokenUtil.obtainTokenForJob()");
+ }
+ }
}
@Override
@@ -627,9 +694,12 @@ public class HBaseStorage extends LoadFu
@Override
public OutputFormat getOutputFormat() throws IOException {
if (outputFormat == null) {
- this.outputFormat = new TableOutputFormat();
- m_conf = initialiseHBaseConfig(m_conf);
- this.outputFormat.setConf(m_conf);
+ if (m_conf == null) {
+ throw new IllegalStateException("setStoreLocation has not been called");
+ } else {
+ this.outputFormat = new TableOutputFormat();
+ this.outputFormat.setConf(m_conf);
+ }
}
return outputFormat;
}
@@ -770,7 +840,13 @@ public class HBaseStorage extends LoadFu
schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
}
- m_conf = initialiseHBaseClassLoaderResources(job);
+ initialiseHBaseClassLoaderResources(job);
+ m_conf = initializeLocalJobConfig(job);
+ // Not setting a udf property and getting the hbase delegation token
+ // only once like in setLocation as setStoreLocation gets different Job
+ // objects for each call and the last Job passed is the one that is
+ // launched. So we end up getting multiple hbase delegation tokens.
+ addHBaseDelegationToken(m_conf, job);
}
@Override