You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/08/27 22:29:42 UTC

svn commit: r1377834 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java

Author: daijy
Date: Mon Aug 27 20:29:41 2012
New Revision: 1377834

URL: http://svn.apache.org/viewvc?rev=1377834&view=rev
Log:
PIG-2821:  HBaseStorage should work with secure hbase

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1377834&r1=1377833&r2=1377834&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Aug 27 20:29:41 2012
@@ -519,6 +519,8 @@ PIG-2228: support partial aggregation in
 
 BUG FIXES
 
+PIG-2821: HBaseStorage should work with secure hbase (rohini via daijy)
+
 PIG-2859: Fix few e2e test failures (rohini via daijy)
 
 PIG-2729: Macro expansion does not use pig.import.search.path - UnitTest borked (johannesch via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1377834&r1=1377833&r2=1377834&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Mon Aug 27 20:29:41 2012
@@ -21,11 +21,13 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.lang.reflect.Array;
+import java.lang.reflect.Method;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.HashMap;
 import java.util.Properties;
@@ -63,12 +65,14 @@ import org.apache.hadoop.hbase.mapreduce
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadPushDown;
@@ -79,7 +83,6 @@ import org.apache.pig.ResourceSchema.Res
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.builtin.Utf8StorageConverter;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -134,10 +137,15 @@ public class HBaseStorage extends LoadFu
     private final static String CASTER_PROPERTY = "pig.hbase.caster";
     private final static String ASTERISK = "*";
     private final static String COLON = ":";
-    
+    private final static String HBASE_SECURITY_CONF_KEY = "hbase.security.authentication";
+    private final static String HBASE_CONFIG_SET = "hbase.config.set";
+    private final static String HBASE_TOKEN_SET = "hbase.token.set";
+
     private List<ColumnInfo> columnInfo_ = Lists.newArrayList();
     private HTable m_table;
-    private Configuration m_conf;
+
+    //Use JobConf to store hbase delegation token
+    private JobConf m_conf;
     private RecordReader reader;
     private RecordWriter writer;
     private TableOutputFormat outputFormat = null;
@@ -147,6 +155,7 @@ public class HBaseStorage extends LoadFu
     private final CommandLine configuredOptions_;
     private final static Options validOptions_ = new Options();
     private final static CommandLineParser parser_ = new GnuParser();
+
     private boolean loadRowKey_;
     private String delimiter_;
     private boolean ignoreWhitespace_;
@@ -250,7 +259,6 @@ public class HBaseStorage extends LoadFu
 
         columnInfo_ = parseColumnList(columnList, delimiter_, ignoreWhitespace_);
 
-        m_conf = HBaseConfiguration.create();
         String defaultCaster = UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER);
         String casterOption = configuredOptions_.getOptionValue("caster", defaultCaster);
         if (STRING_CASTER.equalsIgnoreCase(casterOption)) {
@@ -536,12 +544,20 @@ public class HBaseStorage extends LoadFu
 
     @Override
     public void setLocation(String location, Job job) throws IOException {
+        Properties udfProps = getUDFProperties();
         job.getConfiguration().setBoolean("pig.noSplitCombination", true);
-        m_conf = initialiseHBaseClassLoaderResources(job);
+
+        initialiseHBaseClassLoaderResources(job);
+        m_conf = initializeLocalJobConfig(job);
+        String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET);
+        if (delegationTokenSet == null) {
+            addHBaseDelegationToken(m_conf, job);
+            udfProps.setProperty(HBASE_TOKEN_SET, "true");
+        }
 
         String tablename = location;
-        if (location.startsWith("hbase://")){
-           tablename = location.substring(8);
+        if (location.startsWith("hbase://")) {
+            tablename = location.substring(8);
         }
         if (m_table == null) {
             m_table = new HTable(m_conf, tablename);
@@ -549,7 +565,7 @@ public class HBaseStorage extends LoadFu
         m_table.setScannerCaching(caching_);
         m_conf.set(TableInputFormat.INPUT_TABLE, tablename);
 
-        String projectedFields = getUDFProperties().getProperty( projectedFieldsName() );
+        String projectedFields = udfProps.getProperty( projectedFieldsName() );
         if (projectedFields != null) {
             // update columnInfo_
             pushProjection((RequiredFieldList) ObjectSerializer.deserialize(projectedFields));
@@ -574,22 +590,73 @@ public class HBaseStorage extends LoadFu
         m_conf.set(TableInputFormat.SCAN, convertScanToString(scan));
     }
 
-    private Configuration initialiseHBaseClassLoaderResources(Job job) throws IOException {
-        Configuration hbaseConfig = initialiseHBaseConfig(job.getConfiguration());
-
+    private void initialiseHBaseClassLoaderResources(Job job) throws IOException {
         // Make sure the HBase, ZooKeeper, and Guava jars get shipped.
         TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
             org.apache.hadoop.hbase.client.HTable.class,
             com.google.common.collect.Lists.class,
             org.apache.zookeeper.ZooKeeper.class);
 
-        return hbaseConfig;
     }
 
-    private Configuration initialiseHBaseConfig(Configuration conf) {
-        Configuration hbaseConfig = HBaseConfiguration.create();
-        ConfigurationUtil.mergeConf(hbaseConfig, conf);
-        return hbaseConfig;
+    private JobConf initializeLocalJobConfig(Job job) {
+        Properties udfProps = getUDFProperties();
+        Configuration jobConf = job.getConfiguration();
+        JobConf localConf = new JobConf(jobConf);
+        if (udfProps.containsKey(HBASE_CONFIG_SET)) {
+            for (Entry<Object, Object> entry : udfProps.entrySet()) {
+                localConf.set((String) entry.getKey(), (String) entry.getValue());
+            }
+        } else {
+            Configuration hbaseConf = HBaseConfiguration.create();
+            for (Entry<String, String> entry : hbaseConf) {
+                // JobConf may have some conf overriding ones in hbase-site.xml
+                // So only copy hbase config not in job config to UDFContext
+                // Also avoids copying core-default.xml and core-site.xml
+                // props in hbaseConf to UDFContext which would be redundant.
+                if (jobConf.get(entry.getKey()) == null) {
+                    udfProps.setProperty(entry.getKey(), entry.getValue());
+                    localConf.set(entry.getKey(), entry.getValue());
+                }
+            }
+            udfProps.setProperty(HBASE_CONFIG_SET, "true");
+        }
+        return localConf;
+    }
+
+    /**
+     * Get delegation token from hbase and add it to the Job
+     *
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private void addHBaseDelegationToken(Configuration hbaseConf, Job job) {
+
+        if (!UDFContext.getUDFContext().isFrontend()) {
+            return;
+        }
+
+        if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) {
+            try {
+                // getCurrentUser method is not public in 0.20.2
+                Method m1 = UserGroupInformation.class.getMethod("getCurrentUser");
+                UserGroupInformation currentUser = (UserGroupInformation) m1.invoke(null,(Object[]) null);
+                // Class and method are available only from 0.92 security release
+                Class tokenUtilClass = Class
+                        .forName("org.apache.hadoop.hbase.security.token.TokenUtil");
+                Method m2 = tokenUtilClass.getMethod("obtainTokenForJob",
+                        new Class[] { Configuration.class, UserGroupInformation.class, Job.class });
+                m2.invoke(null,
+                        new Object[] { hbaseConf, currentUser, job });
+            } catch (ClassNotFoundException cnfe) {
+                throw new RuntimeException("Failure loading TokenUtil class, "
+                        + "is secure RPC available?", cnfe);
+            } catch (RuntimeException re) {
+                throw re;
+            } catch (Exception e) {
+                throw new UndeclaredThrowableException(e,
+                        "Unexpected error calling TokenUtil.obtainTokenForJob()");
+            }
+        }
     }
 
     @Override
@@ -627,9 +694,12 @@ public class HBaseStorage extends LoadFu
     @Override
     public OutputFormat getOutputFormat() throws IOException {
         if (outputFormat == null) {
-            this.outputFormat = new TableOutputFormat();
-            m_conf = initialiseHBaseConfig(m_conf);
-            this.outputFormat.setConf(m_conf);            
+            if (m_conf == null) {
+                throw new IllegalStateException("setStoreLocation has not been called");
+            } else {
+                this.outputFormat = new TableOutputFormat();
+                this.outputFormat.setConf(m_conf);
+            }
         }
         return outputFormat;
     }
@@ -770,7 +840,13 @@ public class HBaseStorage extends LoadFu
             schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
         }
 
-        m_conf = initialiseHBaseClassLoaderResources(job);
+        initialiseHBaseClassLoaderResources(job);
+        m_conf = initializeLocalJobConfig(job);
+        // Not setting a udf property and getting the hbase delegation token
+        // only once like in setLocation as setStoreLocation gets different Job
+        // objects for each call and the last Job passed is the one that is
+        // launched. So we end up getting multiple hbase delegation tokens.
+        addHBaseDelegationToken(m_conf, job);
     }
 
     @Override