You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/12/28 08:30:08 UTC

sqoop git commit: SQOOP-2752: Sqoop2: Add impersonation support for kite hdfs

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 b69fb9b68 -> 14eac41fa


SQOOP-2752: Sqoop2: Add impersonation support for kite hdfs

(Dian Fu via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/14eac41f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/14eac41f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/14eac41f

Branch: refs/heads/sqoop2
Commit: 14eac41fa39238df5677daa46880fb12de2f3c1d
Parents: b69fb9b
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Dec 28 08:29:34 2015 +0100
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Dec 28 08:29:34 2015 +0100

----------------------------------------------------------------------
 .../sqoop/error/code/KiteConnectorError.java    |  5 ++
 connector/connector-kite/pom.xml                |  5 ++
 .../sqoop/connector/kite/KiteExtractor.java     | 45 ++++++++----
 .../connector/kite/KiteFromInitializer.java     | 49 ++++++++++---
 .../apache/sqoop/connector/kite/KiteLoader.java | 60 ++++++++++------
 .../sqoop/connector/kite/KiteToDestroyer.java   | 29 ++++++--
 .../sqoop/connector/kite/KiteToInitializer.java | 33 +++++++--
 .../apache/sqoop/connector/kite/KiteUtils.java  | 73 ++++++++++++++++++++
 .../kite/configuration/ConfigUtil.java          | 15 ++++
 .../kite/configuration/LinkConfig.java          |  4 ++
 .../resources/kite-connector-config.properties  |  3 +
 .../sqoop/connector/kite/TestKiteExtractor.java |  3 +-
 .../connector/kite/TestKiteFromInitializer.java | 11 ++-
 .../sqoop/connector/kite/TestKiteLoader.java    |  3 +-
 .../connector/kite/TestKiteToDestroyer.java     |  7 +-
 .../connector/kite/TestKiteToInitializer.java   | 10 ++-
 .../connector/kite/FromRDBMSToKiteTest.java     |  1 +
 17 files changed, 287 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java
index 7db9904..170d6cb 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java
@@ -33,6 +33,11 @@ public enum KiteConnectorError implements ErrorCode {
   /** Error occurred while creating partitions */
   GENERIC_KITE_CONNECTOR_0003("Error occurred while creating partitions"),
 
+  /** Error occurred while adding configuration directory to classpath */
+  GENERIC_KITE_CONNECTOR_0004("Error occurred while adding configuration directory to classpath"),
+
+  GENERIC_KITE_CONNECTOR_0005("Invalid kite dataset uri"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-kite/pom.xml b/connector/connector-kite/pom.xml
index 0792445..a492c5b 100644
--- a/connector/connector-kite/pom.xml
+++ b/connector/connector-kite/pom.xml
@@ -39,6 +39,11 @@ limitations under the License.
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>connector-sdk-hadoop</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java
index d93f9b5..e5b2e65 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java
@@ -17,12 +17,18 @@
  */
 package org.apache.sqoop.connector.kite;
 
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
 import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
 import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
+import org.apache.sqoop.error.code.KiteConnectorError;
 import org.apache.sqoop.etl.io.DataWriter;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
@@ -47,25 +53,34 @@ public class KiteExtractor extends Extractor<LinkConfiguration,
   }
 
   @Override
-  public void extract(ExtractorContext context, LinkConfiguration linkConfig,
-      FromJobConfiguration fromJobConfig, KiteDatasetPartition partition) {
-    String uri = ConfigUtil.buildDatasetUri(
+  public void extract(final ExtractorContext context, final LinkConfiguration linkConfig,
+      final FromJobConfiguration fromJobConfig, final KiteDatasetPartition partition) {
+    final String uri = ConfigUtil.buildDatasetUri(
         linkConfig.linkConfig, partition.getUri());
     LOG.info("Loading data from " + uri);
 
-    KiteDatasetExecutor executor = getExecutor(uri);
-    DataWriter writer = context.getDataWriter();
-    Object[] array;
-    rowsRead = 0L;
-
     try {
-      while ((array = executor.readRecord()) != null) {
-        // TODO: SQOOP-1616 will cover more column data types. Use schema and do data type conversion (e.g. datatime).
-        writer.writeArrayRecord(array);
-        rowsRead++;
-      }
-    } finally {
-      executor.closeReader();
+      SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          KiteDatasetExecutor executor = getExecutor(uri);
+          DataWriter writer = context.getDataWriter();
+          Object[] array;
+          rowsRead = 0L;
+
+          try {
+            while ((array = executor.readRecord()) != null) {
+              // TODO: SQOOP-1616 will cover more column data types. Use schema and do data type conversion (e.g. datatime).
+              writer.writeArrayRecord(array);
+              rowsRead++;
+            }
+          } finally {
+            executor.closeReader();
+          }
+          return null;
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
index 4502d59..002286b 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
@@ -17,8 +17,11 @@
  */
 package org.apache.sqoop.connector.kite;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
 import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
 import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
@@ -31,6 +34,8 @@ import org.apache.sqoop.utils.ClassUtils;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.Datasets;
 
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Set;
 
 /**
@@ -42,16 +47,33 @@ public class KiteFromInitializer extends Initializer<LinkConfiguration,
   private static final Logger LOG = Logger.getLogger(KiteFromInitializer.class);
 
   @Override
-  public void initialize(InitializerContext context,
-      LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
-    String uri = ConfigUtil.buildDatasetUri(
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+  public void initialize(final InitializerContext context,
+      final LinkConfiguration linkConfig, final FromJobConfiguration fromJobConfig) {
+    final String uri = ConfigUtil.buildDatasetUri(
         linkConfig.linkConfig, fromJobConfig.fromJobConfig.uri);
     LOG.debug("Constructed dataset URI: " + uri);
-    if (!Datasets.exists(uri)) {
-      LOG.error("Dataset does not exist");
-      throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0002);
+    KiteUtils.addConfigDirToClasspath(linkConfig);
+    try {
+      SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          if (!Datasets.exists(uri)) {
+            LOG.error("Dataset does not exist");
+            throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0002);
+          }
+
+          if (ConfigUtil.isHdfsJob(fromJobConfig.fromJobConfig)) {
+            // Generate delegation tokens if we are on secured cluster
+            SecurityUtils.generateDelegationTokens(context.getContext(), new Path(ConfigUtil.removeDatasetPrefix(uri)), new Configuration());
+          }
+          return null;
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e);
     }
   }
+
   @Override
   public Set<String> getJars(InitializerContext context,
       LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
@@ -72,12 +94,23 @@ public class KiteFromInitializer extends Initializer<LinkConfiguration,
     return jars;
   }
 
+  @SuppressWarnings("rawtypes")
   @Override
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
   public Schema getSchema(InitializerContext context,
       LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
-    String uri = ConfigUtil.buildDatasetUri(
+    final String uri = ConfigUtil.buildDatasetUri(
         linkConfig.linkConfig, fromJobConfig.fromJobConfig.uri);
-    Dataset dataset = Datasets.load(uri);
+    Dataset dataset = null;
+    try {
+      dataset = SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Dataset>() {
+        public Dataset run() {
+          return Datasets.load(uri);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e);
+    }
     org.apache.avro.Schema avroSchema = dataset.getDescriptor().getSchema();
     return AvroDataTypeUtil.createSqoopSchema(avroSchema);
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
index ca0a5c7..ff9aa33 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
@@ -17,13 +17,19 @@
  */
 package org.apache.sqoop.connector.kite;
 
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.common.FileFormat;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
 import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
+import org.apache.sqoop.error.code.KiteConnectorError;
 import org.apache.sqoop.etl.io.DataReader;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
@@ -55,32 +61,42 @@ public class KiteLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
   }
 
   @Override
-  public void load(LoaderContext context, LinkConfiguration linkConfiguration,
-      ToJobConfiguration toJobConfig) throws Exception {
-    String uri = ConfigUtil.buildDatasetUri(
+  public void load(final LoaderContext context, final LinkConfiguration linkConfiguration,
+      final ToJobConfiguration toJobConfig) throws Exception {
+    final String uri = ConfigUtil.buildDatasetUri(
         linkConfiguration.linkConfig, toJobConfig.toJobConfig);
-    KiteDatasetExecutor executor = getExecutor(
-        linkConfiguration, uri, context.getSchema(), toJobConfig.toJobConfig.fileFormat);
-    LOG.info("Temporary dataset created.");
-
-    DataReader reader = context.getDataReader();
-    Object[] array;
-    boolean success = false;
 
     try {
-      while ((array = reader.readArrayRecord()) != null) {
-        executor.writeRecord(array);
-        rowsWritten++;
-      }
-      LOG.info(rowsWritten + " data record(s) have been written into dataset.");
-      success = true;
-    } finally {
-      executor.closeWriter();
+      SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          KiteDatasetExecutor executor = getExecutor(
+              linkConfiguration, uri, context.getSchema(), toJobConfig.toJobConfig.fileFormat);
+          LOG.info("Temporary dataset created.");
+
+          DataReader reader = context.getDataReader();
+          Object[] array;
+          boolean success = false;
+
+          try {
+            while ((array = reader.readArrayRecord()) != null) {
+              executor.writeRecord(array);
+              rowsWritten++;
+            }
+            LOG.info(rowsWritten + " data record(s) have been written into dataset.");
+            success = true;
+          } finally {
+            executor.closeWriter();
 
-      if (!success) {
-        LOG.error("Fail to write data, dataset will be removed.");
-        executor.deleteDataset();
-      }
+            if (!success) {
+              LOG.error("Fail to write data, dataset will be removed.");
+              executor.deleteDataset();
+            }
+          }
+          return null;
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
index fb83f2b..0ac1836 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
@@ -17,13 +17,19 @@
  */
 package org.apache.sqoop.connector.kite;
 
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.common.FileFormat;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
 import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
+import org.apache.sqoop.error.code.KiteConnectorError;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
 import org.apache.sqoop.schema.Schema;
@@ -42,16 +48,25 @@ public class KiteToDestroyer extends Destroyer<LinkConfiguration,
   private static final Logger LOG = Logger.getLogger(KiteToDestroyer.class);
 
   @Override
-  public void destroy(DestroyerContext context,
-      LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
+  public void destroy(final DestroyerContext context,
+      final LinkConfiguration linkConfig, final ToJobConfiguration toJobConfig) {
     LOG.info("Running Kite connector destroyer");
-    String uri = ConfigUtil.buildDatasetUri(
+    final String uri = ConfigUtil.buildDatasetUri(
         linkConfig.linkConfig, toJobConfig.toJobConfig);
 
-    if (ConfigUtil.isHBaseJob(toJobConfig.toJobConfig)) {
-      destroyHBaseJob(context, uri, toJobConfig);
-    } else {
-      destroyHdfsJob(context, uri, toJobConfig);
+    try {
+      SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          if (ConfigUtil.isHBaseJob(toJobConfig.toJobConfig)) {
+            destroyHBaseJob(context, uri, toJobConfig);
+          } else {
+            destroyHdfsJob(context, uri, toJobConfig);
+          }
+          return null;
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
index effab19..4fc2687 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
@@ -17,8 +17,11 @@
  */
 package org.apache.sqoop.connector.kite;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
 import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
@@ -30,6 +33,8 @@ import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.utils.ClassUtils;
 import org.kitesdk.data.Datasets;
 
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Set;
 
 /**
@@ -43,14 +48,30 @@ public class KiteToInitializer extends Initializer<LinkConfiguration,
   private static final Logger LOG = Logger.getLogger(KiteToInitializer.class);
 
   @Override
-  public void initialize(InitializerContext context,
-      LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
-    String uri = ConfigUtil.buildDatasetUri(
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+  public void initialize(final InitializerContext context,
+      final LinkConfiguration linkConfig, final ToJobConfiguration toJobConfig) {
+    final String uri = ConfigUtil.buildDatasetUri(
         linkConfig.linkConfig, toJobConfig.toJobConfig);
     LOG.debug("Constructed dataset URI: " + uri);
-    if (Datasets.exists(uri)) {
-      LOG.error("Overwrite an existing dataset is not expected in new create mode.");
-      throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001);
+    KiteUtils.addConfigDirToClasspath(linkConfig);
+    try {
+      SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          if (Datasets.exists(uri)) {
+            LOG.error("Overwrite an existing dataset is not expected in new create mode.");
+            throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001);
+          }
+
+          if (ConfigUtil.isHdfsJob(toJobConfig.toJobConfig)) {
+            // Generate delegation tokens if we are on secured cluster
+            SecurityUtils.generateDelegationTokens(context.getContext(), new Path(ConfigUtil.removeDatasetPrefix(uri)), new Configuration());
+          }
+          return null;
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteUtils.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteUtils.java
new file mode 100644
index 0000000..fc6177d
--- /dev/null
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteUtils.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.kite;
+
+import java.io.File;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
+import org.apache.sqoop.error.code.KiteConnectorError;
+
+/**
+ * Utilities for Kite.
+ */
+public class KiteUtils {
+
+  private static final Logger LOG = Logger.getLogger(KiteUtils.class);
+
+  private static final String DEFAULT_HADOOP_CONF_DIR = "/etc/hadoop/conf";
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+  public static void addConfigDirToClasspath(final LinkConfiguration linkConfig) {
+    File configDir = new File(getConfDir(linkConfig));
+    try {
+      final Method method = URLClassLoader.class.getDeclaredMethod("addURL", new Class[]{URL.class});
+      AccessController.doPrivileged(new PrivilegedAction<Void>() {
+        @Override
+        public Void run() {
+          method.setAccessible(true);
+          return null;
+        }
+      });
+      method.invoke(ClassLoader.getSystemClassLoader(), new Object[]{configDir.toURI().toURL()});
+    } catch (NoSuchMethodException | SecurityException
+        | InvocationTargetException | IllegalAccessException
+        | IllegalArgumentException | MalformedURLException e) {
+      throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0004, e);
+    }
+    LOG.debug("Added file " + configDir + " to classpath");
+  }
+
+  private static String getConfDir(LinkConfiguration linkConfig) {
+    String confDir = linkConfig.linkConfig.confDir;
+    if (StringUtils.isBlank(confDir)) {
+      confDir = DEFAULT_HADOOP_CONF_DIR;
+    }
+    return confDir;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
index e63bccf..3393c6e 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
@@ -68,4 +68,19 @@ public class ConfigUtil {
     return toJobConfig.uri.startsWith("dataset:hbase:");
   }
 
+  public static boolean isHdfsJob(ToJobConfig toJobConfig) {
+    return toJobConfig.uri.startsWith("dataset:hdfs:");
+  }
+
+  public static boolean isHdfsJob(FromJobConfig fromJobConfig) {
+    return fromJobConfig.uri.startsWith("dataset:hdfs:");
+  }
+
+  public static String removeDatasetPrefix(String uri) {
+    if (uri.startsWith("dataset:")) {
+      return uri.substring("dataset:".length());
+    } else {
+      return uri;
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
index ee31f15..508d63e 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
@@ -23,6 +23,7 @@ import org.apache.sqoop.model.Input;
 import org.apache.sqoop.model.Validator;
 import org.apache.sqoop.validation.Status;
 import org.apache.sqoop.validation.validators.AbstractValidator;
+import org.apache.sqoop.validation.validators.DirectoryExistsValidator;
 import org.apache.sqoop.validation.validators.HostAndPortValidator;
 
 @ConfigClass(validators = {@Validator(LinkConfig.ConfigValidator.class)})
@@ -31,6 +32,9 @@ public class LinkConfig {
   @Input(size = 255)
   public String authority;
 
+  @Input(size = 255, validators = { @Validator(DirectoryExistsValidator.class)})
+  public String confDir;
+
   public static class ConfigValidator extends AbstractValidator<LinkConfig> {
 
     @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/resources/kite-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/resources/kite-connector-config.properties b/connector/connector-kite/src/main/resources/kite-connector-config.properties
index c134ac3..f384f3d 100644
--- a/connector/connector-kite/src/main/resources/kite-connector-config.properties
+++ b/connector/connector-kite/src/main/resources/kite-connector-config.properties
@@ -25,6 +25,9 @@ linkConfig.help = You must supply the information requested in order to create a
 linkConfig.authority.label = HDFS host and port
 linkConfig.authority.help = Optional to override HDFS file system location.
 
+linkConfig.confDir.label = Hadoop conf directory
+linkConfig.confDir.help = Directory with Hadoop configuration files. This directory will be added to the classpath.
+
 # To Job Config
 #
 toJobConfig.label = To Kite Dataset Configuration

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
index c49be92..4da9218 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
@@ -18,6 +18,7 @@
 
 package org.apache.sqoop.connector.kite;
 
+import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
 import org.apache.sqoop.etl.io.DataWriter;
@@ -73,7 +74,7 @@ public class TestKiteExtractor {
     // setup
     Schema schema = new Schema("testExtractor");
     schema.addColumn(new Text("TextCol"));
-    ExtractorContext context = new ExtractorContext(null, writerMock, schema, "test_user");
+    ExtractorContext context = new ExtractorContext(new MutableMapContext(), writerMock, schema, "test_user");
     LinkConfiguration linkConfig = new LinkConfiguration();
     FromJobConfiguration jobConfig = new FromJobConfiguration();
     KiteDatasetPartition partition = new KiteDatasetPartition();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java
index 6df5d83..5e301bf 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java
@@ -18,9 +18,12 @@
 
 package org.apache.sqoop.connector.kite;
 
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
+import org.apache.sqoop.job.etl.InitializerContext;
 import org.kitesdk.data.Datasets;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -33,10 +36,11 @@ import static org.mockito.MockitoAnnotations.initMocks;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
 @PrepareForTest(Datasets.class)
-@PowerMockIgnore("org.apache.sqoop.common.ErrorCode")
+@PowerMockIgnore({"org.apache.sqoop.common.ErrorCode", "com.sun.security.auth.UnixPrincipal"})
 public class TestKiteFromInitializer extends PowerMockTestCase {
 
   private KiteFromInitializer initializer;
+  private InitializerContext initializerContext;
 
   @BeforeMethod(alwaysRun = true)
   public void setUp() {
@@ -44,6 +48,7 @@ public class TestKiteFromInitializer extends PowerMockTestCase {
     mockStatic(Datasets.class);
 
     initializer = new KiteFromInitializer();
+    initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
   }
 
   @Test
@@ -54,7 +59,7 @@ public class TestKiteFromInitializer extends PowerMockTestCase {
     when(Datasets.exists(jobConfig.fromJobConfig.uri)).thenReturn(true);
 
     // exercise
-    initializer.initialize(null, new LinkConfiguration(), jobConfig);
+    initializer.initialize(initializerContext, new LinkConfiguration(), jobConfig);
   }
 
   @Test(expectedExceptions = SqoopException.class)
@@ -65,7 +70,7 @@ public class TestKiteFromInitializer extends PowerMockTestCase {
     when(Datasets.exists(jobConfig.fromJobConfig.uri)).thenReturn(false);
 
     // exercise
-    initializer.initialize(null, new LinkConfiguration(), jobConfig);
+    initializer.initialize(initializerContext, new LinkConfiguration(), jobConfig);
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
index c5aa1bd..c4ab867 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
@@ -18,6 +18,7 @@
 
 package org.apache.sqoop.connector.kite;
 
+import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.connector.common.FileFormat;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
@@ -81,7 +82,7 @@ public class TestKiteLoader {
         return null;
       }
     };
-    LoaderContext context = new LoaderContext(null, reader, schema, "test_user");
+    LoaderContext context = new LoaderContext(new MutableMapContext(), reader, schema, "test_user");
     LinkConfiguration linkConfig = new LinkConfiguration();
     ToJobConfiguration toJobConfig = new ToJobConfiguration();
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
index 00b8871..cf26986 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
@@ -18,6 +18,7 @@
 
 package org.apache.sqoop.connector.kite;
 
+import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.connector.common.FileFormat;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
@@ -38,7 +39,7 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.powermock.api.mockito.PowerMockito.verifyStatic;
 
 @PrepareForTest({KiteDatasetExecutor.class, Datasets.class})
-@PowerMockIgnore("org.apache.sqoop.common.ErrorCode")
+@PowerMockIgnore({"org.apache.sqoop.common.ErrorCode", "com.sun.security.auth.UnixPrincipal"})
 public class TestKiteToDestroyer extends PowerMockTestCase {
 
   private KiteToDestroyer destroyer;
@@ -78,7 +79,7 @@ public class TestKiteToDestroyer extends PowerMockTestCase {
   @Test
   public void testDestroyForSuccessfulJob() {
     // setup
-    DestroyerContext context = new DestroyerContext(null, true, null, user);
+    DestroyerContext context = new DestroyerContext(new MutableMapContext(), true, null, user);
     when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
         .thenReturn(expectedUris);
 
@@ -94,7 +95,7 @@ public class TestKiteToDestroyer extends PowerMockTestCase {
   @Test
   public void testDestroyForFailedJob() {
     // setup
-    DestroyerContext context = new DestroyerContext(null, false, null, user);
+    DestroyerContext context = new DestroyerContext(new MutableMapContext(), false, null, user);
     when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
         .thenReturn(expectedUris);
     for (String uri : expectedUris) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
index 5230ffe..e5bb667 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
@@ -18,9 +18,11 @@
 
 package org.apache.sqoop.connector.kite;
 
+import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.InitializerContext;
 import org.apache.sqoop.schema.Schema;
 import org.kitesdk.data.Datasets;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -36,10 +38,11 @@ import static org.mockito.MockitoAnnotations.initMocks;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
 @PrepareForTest(Datasets.class)
-@PowerMockIgnore("org.apache.sqoop.common.ErrorCode")
+@PowerMockIgnore({"org.apache.sqoop.common.ErrorCode", "com.sun.security.auth.UnixPrincipal"})
 public class TestKiteToInitializer extends PowerMockTestCase {
 
   private KiteToInitializer initializer;
+  private InitializerContext initializerContext;
 
   @BeforeMethod(alwaysRun = true)
   public void setUp() {
@@ -47,6 +50,7 @@ public class TestKiteToInitializer extends PowerMockTestCase {
     mockStatic(Datasets.class);
 
     initializer = new KiteToInitializer();
+    initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
   }
 
   @Test
@@ -59,7 +63,7 @@ public class TestKiteToInitializer extends PowerMockTestCase {
         .thenReturn(false);
 
     // exercise
-    initializer.initialize(null, linkConfig, toJobConfig);
+    initializer.initialize(initializerContext, linkConfig, toJobConfig);
   }
 
   @Test(expectedExceptions = SqoopException.class)
@@ -72,7 +76,7 @@ public class TestKiteToInitializer extends PowerMockTestCase {
         .thenReturn(true);
 
     // exercise
-    initializer.initialize(null, linkConfig, toJobConfig);
+    initializer.initialize(initializerContext, linkConfig, toJobConfig);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java
index 7b2aced..be9fef1 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java
@@ -64,6 +64,7 @@ public class FromRDBMSToKiteTest extends ConnectorTestCase {
     // Kite link
     MLink kiteLink = getClient().createLink("kite-connector");
     kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.authority").setValue(hdfsClient.getUri().getAuthority());
+    kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.confDir").setValue(getCluster().getConfigurationPath());
     saveLink(kiteLink);
 
     // Job creation