You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/07/20 18:23:19 UTC
sqoop git commit: SQOOP-2406: Add support for secure mode when
importing Parquet files into Hive
Repository: sqoop
Updated Branches:
refs/heads/trunk c6627c04c -> d2c062b20
SQOOP-2406: Add support for secure mode when importing Parquet files into Hive
(Abraham Elmahrek via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d2c062b2
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d2c062b2
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d2c062b2
Branch: refs/heads/trunk
Commit: d2c062b202a5d44bf5b2c35f98734f9a01cc9b74
Parents: c6627c0
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Jul 20 09:22:12 2015 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Jul 20 09:22:12 2015 -0700
----------------------------------------------------------------------
src/docs/user/hive-notes.txt | 7 ++
.../sqoop/mapreduce/DataDrivenImportJob.java | 3 +-
.../org/apache/sqoop/mapreduce/ParquetJob.java | 109 ++++++++++++++++++-
3 files changed, 117 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2c062b2/src/docs/user/hive-notes.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hive-notes.txt b/src/docs/user/hive-notes.txt
index a665a20..deee270 100644
--- a/src/docs/user/hive-notes.txt
+++ b/src/docs/user/hive-notes.txt
@@ -29,3 +29,10 @@ direct mapping (for example, +DATE+, +TIME+, and +TIMESTAMP+) will be coerced to
+DOUBLE+. In these cases, Sqoop will emit a warning in its log messages
informing you of the loss of precision.
+Parquet Support in Hive
+~~~~~~~~~~~~~~~~~~~~~~~
+
+In order to contact the Hive MetaStore from a MapReduce job, a delegation token will
+be fetched and passed. HIVE_CONF_DIR and HIVE_HOME must be set appropriately to add
+Hive to the runtime classpath. Otherwise, importing/exporting into Hive in Parquet
+format may not work.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2c062b2/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index 388ce7d..260bc29 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
@@ -98,7 +99,7 @@ public class DataDrivenImportJob extends ImportJobBase {
AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
} else if (options.getFileLayout()
== SqoopOptions.FileLayout.ParquetFile) {
- Configuration conf = job.getConfiguration();
+ JobConf conf = (JobConf)job.getConfiguration();
// Kite SDK requires an Avro schema to represent the data structure of
// target dataset. If the schema name equals to generated java class name,
// the import will fail. So we use table name as schema name and add a
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2c062b2/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
index c775ef3..f310419 100644
--- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
@@ -23,6 +23,11 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.kitesdk.data.CompressionType;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
@@ -32,6 +37,8 @@ import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
import org.kitesdk.data.spi.SchemaValidationUtil;
import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Map;
/**
* Helper class for setting up a Parquet MapReduce job.
@@ -40,6 +47,14 @@ public final class ParquetJob {
public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName());
+ public static final String HIVE_CONF_CLASS = "org.apache.hadoop.hive.conf.HiveConf";
+ public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
+ public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled";
+ // Purposefully choosing the same token alias as the one Oozie chooses.
+ // Make sure we don't generate a new delegation token if oozie
+ // has already generated one.
+ public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token";
+
private ParquetJob() {
}
@@ -72,9 +87,21 @@ public final class ParquetJob {
* {@link org.apache.sqoop.lib.SqoopRecord}. The output key is
* {@link org.apache.avro.generic.GenericRecord}.
*/
- public static void configureImportJob(Configuration conf, Schema schema,
+ public static void configureImportJob(JobConf conf, Schema schema,
String uri, WriteMode writeMode) throws IOException {
Dataset dataset;
+ Configuration hiveConf = getHiveConf(conf);
+
+ // Add hive delegation token only if we don't already have one.
+ if (uri.startsWith("dataset:hive") && isSecureMetastore(hiveConf)) {
+ // Copy hive configs to job config
+ addHiveConfigs(hiveConf, conf);
+
+ if (conf.getCredentials().getToken(new Text(HIVE_METASTORE_TOKEN_ALIAS)) == null) {
+ addHiveDelegationToken(conf);
+ }
+ }
+
if (Datasets.exists(uri)) {
if (WriteMode.DEFAULT.equals(writeMode)) {
throw new IOException("Destination exists! " + uri);
@@ -113,4 +140,84 @@ public final class ParquetJob {
return Datasets.create(uri, descriptor, GenericRecord.class);
}
+ private static boolean isSecureMetastore(Configuration conf) {
+ return conf != null && conf.getBoolean(HIVE_METASTORE_SASL_ENABLED, false);
+ }
+
+ /**
+ * Dynamically create hive configuration object.
+ * @param conf
+ * @return
+ */
+ private static Configuration getHiveConf(Configuration conf) {
+ try {
+ Class HiveConfClass = Class.forName(HIVE_CONF_CLASS);
+ return ((Configuration)(HiveConfClass.getConstructor(Configuration.class, Class.class)
+ .newInstance(conf, Configuration.class)));
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Could not load " + HIVE_CONF_CLASS
+ + ". Make sure HIVE_CONF_DIR is set correctly.");
+ } catch (Exception ex) {
+ LOG.error("Could not instantiate HiveConf instance.", ex);
+ }
+ return null;
+ }
+
+ /**
+ * Add hive delegation token to credentials store.
+ * @param conf
+ */
+ private static void addHiveDelegationToken(JobConf conf) {
+ // Need to use reflection since there's no compile time dependency on the client libs.
+ Class<?> HiveConfClass;
+ Class<?> HiveMetaStoreClientClass;
+
+ try {
+ HiveMetaStoreClientClass = Class.forName(HIVE_METASTORE_CLIENT_CLASS);
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Could not load " + HIVE_METASTORE_CLIENT_CLASS
+ + " when adding hive delegation token. "
+ + "Make sure HIVE_CONF_DIR is set correctly.", ex);
+ throw new RuntimeException("Couldn't fetch delegation token.", ex);
+ }
+
+ try {
+ HiveConfClass = Class.forName(HIVE_CONF_CLASS);
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Could not load " + HIVE_CONF_CLASS
+ + " when adding hive delegation token."
+ + " Make sure HIVE_CONF_DIR is set correctly.", ex);
+ throw new RuntimeException("Couldn't fetch delegation token.", ex);
+ }
+
+ try {
+ Object client = HiveMetaStoreClientClass.getConstructor(HiveConfClass).newInstance(
+ HiveConfClass.getConstructor(Configuration.class, Class.class).newInstance(conf, Configuration.class)
+ );
+ // getDelegationToken(String kerberosPrincial)
+ Method getDelegationTokenMethod = HiveMetaStoreClientClass.getMethod("getDelegationToken", String.class);
+ Object tokenStringForm = getDelegationTokenMethod.invoke(client, UserGroupInformation.getLoginUser().getShortUserName());
+
+ // Load token
+ Token<DelegationTokenIdentifier> metastoreToken = new Token<DelegationTokenIdentifier>();
+ metastoreToken.decodeFromUrlString(tokenStringForm.toString());
+ conf.getCredentials().addToken(new Text(HIVE_METASTORE_TOKEN_ALIAS), metastoreToken);
+
+ LOG.debug("Successfully fetched hive metastore delegation token. " + metastoreToken);
+ } catch (Exception ex) {
+ LOG.error("Couldn't fetch delegation token.", ex);
+ throw new RuntimeException("Couldn't fetch delegation token.", ex);
+ }
+ }
+
+ /**
+ * Add hive conf to configuration object without overriding already set properties.
+ * @param hiveConf
+ * @param conf
+ */
+ private static void addHiveConfigs(Configuration hiveConf, Configuration conf) {
+ for (Map.Entry<String, String> item : hiveConf) {
+ conf.setIfUnset(item.getKey(), item.getValue());
+ }
+ }
}