You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/07/20 18:23:19 UTC

sqoop git commit: SQOOP-2406: Add support for secure mode when importing Parquet files into Hive

Repository: sqoop
Updated Branches:
  refs/heads/trunk c6627c04c -> d2c062b20


SQOOP-2406: Add support for secure mode when importing Parquet files into Hive

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d2c062b2
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d2c062b2
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d2c062b2

Branch: refs/heads/trunk
Commit: d2c062b202a5d44bf5b2c35f98734f9a01cc9b74
Parents: c6627c0
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Jul 20 09:22:12 2015 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Jul 20 09:22:12 2015 -0700

----------------------------------------------------------------------
 src/docs/user/hive-notes.txt                    |   7 ++
 .../sqoop/mapreduce/DataDrivenImportJob.java    |   3 +-
 .../org/apache/sqoop/mapreduce/ParquetJob.java  | 109 ++++++++++++++++++-
 3 files changed, 117 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2c062b2/src/docs/user/hive-notes.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hive-notes.txt b/src/docs/user/hive-notes.txt
index a665a20..deee270 100644
--- a/src/docs/user/hive-notes.txt
+++ b/src/docs/user/hive-notes.txt
@@ -29,3 +29,10 @@ direct mapping (for example, +DATE+, +TIME+, and +TIMESTAMP+) will be coerced to
 +DOUBLE+. In these cases, Sqoop will emit a warning in its log messages
 informing you of the loss of precision.
 
+Parquet Support in Hive
+~~~~~~~~~~~~~~~~~~~~~~~
+
+In order to contact the Hive MetaStore from a MapReduce job, a delegation token will
+be fetched and passed. HIVE_CONF_DIR and HIVE_HOME must be set appropriately to add
+Hive to the runtime classpath. Otherwise, importing/exporting into Hive in Parquet
+format may not work.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2c062b2/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index 388ce7d..260bc29 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -98,7 +99,7 @@ public class DataDrivenImportJob extends ImportJobBase {
       AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
     } else if (options.getFileLayout()
         == SqoopOptions.FileLayout.ParquetFile) {
-      Configuration conf = job.getConfiguration();
+      JobConf conf = (JobConf)job.getConfiguration();
       // Kite SDK requires an Avro schema to represent the data structure of
       // target dataset. If the schema name equals to generated java class name,
       // the import will fail. So we use table name as schema name and add a

http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2c062b2/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
index c775ef3..f310419 100644
--- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
@@ -23,6 +23,11 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.kitesdk.data.CompressionType;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetDescriptor;
@@ -32,6 +37,8 @@ import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
 import org.kitesdk.data.spi.SchemaValidationUtil;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Map;
 
 /**
  * Helper class for setting up a Parquet MapReduce job.
@@ -40,6 +47,14 @@ public final class ParquetJob {
 
   public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName());
 
+  public static final String HIVE_CONF_CLASS = "org.apache.hadoop.hive.conf.HiveConf";
+  public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
+  public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled";
+  // Purposefully choosing the same token alias as the one Oozie chooses.
+  // Make sure we don't generate a new delegation token if oozie
+  // has already generated one.
+  public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token";
+
   private ParquetJob() {
   }
 
@@ -72,9 +87,21 @@ public final class ParquetJob {
    * {@link org.apache.sqoop.lib.SqoopRecord}. The output key is
    * {@link org.apache.avro.generic.GenericRecord}.
    */
-  public static void configureImportJob(Configuration conf, Schema schema,
+  public static void configureImportJob(JobConf conf, Schema schema,
       String uri, WriteMode writeMode) throws IOException {
     Dataset dataset;
+    Configuration hiveConf = getHiveConf(conf);
+
+    // Add hive delegation token only if we don't already have one.
+    if (uri.startsWith("dataset:hive") && isSecureMetastore(hiveConf)) {
+      // Copy hive configs to job config
+      addHiveConfigs(hiveConf, conf);
+
+      if (conf.getCredentials().getToken(new Text(HIVE_METASTORE_TOKEN_ALIAS)) == null) {
+        addHiveDelegationToken(conf);
+      }
+    }
+
     if (Datasets.exists(uri)) {
       if (WriteMode.DEFAULT.equals(writeMode)) {
         throw new IOException("Destination exists! " + uri);
@@ -113,4 +140,84 @@ public final class ParquetJob {
     return Datasets.create(uri, descriptor, GenericRecord.class);
   }
 
+  private static boolean isSecureMetastore(Configuration conf) {
+    return conf != null && conf.getBoolean(HIVE_METASTORE_SASL_ENABLED, false);
+  }
+
+  /**
+   * Dynamically create hive configuration object.
+   * @param conf
+   * @return
+   */
+  private static Configuration getHiveConf(Configuration conf) {
+    try {
+      Class HiveConfClass = Class.forName(HIVE_CONF_CLASS);
+      return ((Configuration)(HiveConfClass.getConstructor(Configuration.class, Class.class)
+          .newInstance(conf, Configuration.class)));
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Could not load " + HIVE_CONF_CLASS
+          + ". Make sure HIVE_CONF_DIR is set correctly.");
+    } catch (Exception ex) {
+      LOG.error("Could not instantiate HiveConf instance.", ex);
+    }
+    return null;
+  }
+
+  /**
+   * Add hive delegation token to credentials store.
+   * @param conf
+   */
+  private static void addHiveDelegationToken(JobConf conf) {
+    // Need to use reflection since there's no compile time dependency on the client libs.
+    Class<?> HiveConfClass;
+    Class<?> HiveMetaStoreClientClass;
+
+    try {
+      HiveMetaStoreClientClass = Class.forName(HIVE_METASTORE_CLIENT_CLASS);
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Could not load " + HIVE_METASTORE_CLIENT_CLASS
+          + " when adding hive delegation token. "
+          + "Make sure HIVE_CONF_DIR is set correctly.", ex);
+      throw new RuntimeException("Couldn't fetch delegation token.", ex);
+    }
+
+    try {
+      HiveConfClass = Class.forName(HIVE_CONF_CLASS);
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Could not load " + HIVE_CONF_CLASS
+          + " when adding hive delegation token."
+          + " Make sure HIVE_CONF_DIR is set correctly.", ex);
+      throw new RuntimeException("Couldn't fetch delegation token.", ex);
+    }
+
+    try {
+      Object client = HiveMetaStoreClientClass.getConstructor(HiveConfClass).newInstance(
+          HiveConfClass.getConstructor(Configuration.class, Class.class).newInstance(conf, Configuration.class)
+      );
+      // getDelegationToken(String kerberosPrincial)
+      Method getDelegationTokenMethod = HiveMetaStoreClientClass.getMethod("getDelegationToken", String.class);
+      Object tokenStringForm = getDelegationTokenMethod.invoke(client, UserGroupInformation.getLoginUser().getShortUserName());
+
+      // Load token
+      Token<DelegationTokenIdentifier> metastoreToken = new Token<DelegationTokenIdentifier>();
+      metastoreToken.decodeFromUrlString(tokenStringForm.toString());
+      conf.getCredentials().addToken(new Text(HIVE_METASTORE_TOKEN_ALIAS), metastoreToken);
+
+      LOG.debug("Successfully fetched hive metastore delegation token. " + metastoreToken);
+    } catch (Exception ex) {
+      LOG.error("Couldn't fetch delegation token.", ex);
+      throw new RuntimeException("Couldn't fetch delegation token.", ex);
+    }
+  }
+
+  /**
+   * Add hive conf to configuration object without overriding already set properties.
+   * @param hiveConf
+   * @param conf
+   */
+  private static void addHiveConfigs(Configuration hiveConf, Configuration conf) {
+    for (Map.Entry<String, String> item : hiveConf) {
+      conf.setIfUnset(item.getKey(), item.getValue());
+    }
+  }
 }