You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/12/28 08:30:08 UTC
sqoop git commit: SQOOP-2752: Sqoop2: Add impersonation support for
kite hdfs
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 b69fb9b68 -> 14eac41fa
SQOOP-2752: Sqoop2: Add impersonation support for kite hdfs
(Dian Fu via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/14eac41f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/14eac41f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/14eac41f
Branch: refs/heads/sqoop2
Commit: 14eac41fa39238df5677daa46880fb12de2f3c1d
Parents: b69fb9b
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Dec 28 08:29:34 2015 +0100
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Dec 28 08:29:34 2015 +0100
----------------------------------------------------------------------
.../sqoop/error/code/KiteConnectorError.java | 5 ++
connector/connector-kite/pom.xml | 5 ++
.../sqoop/connector/kite/KiteExtractor.java | 45 ++++++++----
.../connector/kite/KiteFromInitializer.java | 49 ++++++++++---
.../apache/sqoop/connector/kite/KiteLoader.java | 60 ++++++++++------
.../sqoop/connector/kite/KiteToDestroyer.java | 29 ++++++--
.../sqoop/connector/kite/KiteToInitializer.java | 33 +++++++--
.../apache/sqoop/connector/kite/KiteUtils.java | 73 ++++++++++++++++++++
.../kite/configuration/ConfigUtil.java | 15 ++++
.../kite/configuration/LinkConfig.java | 4 ++
.../resources/kite-connector-config.properties | 3 +
.../sqoop/connector/kite/TestKiteExtractor.java | 3 +-
.../connector/kite/TestKiteFromInitializer.java | 11 ++-
.../sqoop/connector/kite/TestKiteLoader.java | 3 +-
.../connector/kite/TestKiteToDestroyer.java | 7 +-
.../connector/kite/TestKiteToInitializer.java | 10 ++-
.../connector/kite/FromRDBMSToKiteTest.java | 1 +
17 files changed, 287 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java
index 7db9904..170d6cb 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java
@@ -33,6 +33,11 @@ public enum KiteConnectorError implements ErrorCode {
/** Error occurred while creating partitions */
GENERIC_KITE_CONNECTOR_0003("Error occurred while creating partitions"),
+ /** Error occurred while adding configuration directory to classpath */
+ GENERIC_KITE_CONNECTOR_0004("Error occurred while adding configuration directory to classpath"),
+
+ GENERIC_KITE_CONNECTOR_0005("Invalid kite dataset uri"),
+
;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-kite/pom.xml b/connector/connector-kite/pom.xml
index 0792445..a492c5b 100644
--- a/connector/connector-kite/pom.xml
+++ b/connector/connector-kite/pom.xml
@@ -39,6 +39,11 @@ limitations under the License.
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>connector-sdk-hadoop</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java
index d93f9b5..e5b2e65 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java
@@ -17,12 +17,18 @@
*/
package org.apache.sqoop.connector.kite;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
import com.google.common.annotations.VisibleForTesting;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
+import org.apache.sqoop.error.code.KiteConnectorError;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
@@ -47,25 +53,34 @@ public class KiteExtractor extends Extractor<LinkConfiguration,
}
@Override
- public void extract(ExtractorContext context, LinkConfiguration linkConfig,
- FromJobConfiguration fromJobConfig, KiteDatasetPartition partition) {
- String uri = ConfigUtil.buildDatasetUri(
+ public void extract(final ExtractorContext context, final LinkConfiguration linkConfig,
+ final FromJobConfiguration fromJobConfig, final KiteDatasetPartition partition) {
+ final String uri = ConfigUtil.buildDatasetUri(
linkConfig.linkConfig, partition.getUri());
LOG.info("Loading data from " + uri);
- KiteDatasetExecutor executor = getExecutor(uri);
- DataWriter writer = context.getDataWriter();
- Object[] array;
- rowsRead = 0L;
-
try {
- while ((array = executor.readRecord()) != null) {
- // TODO: SQOOP-1616 will cover more column data types. Use schema and do data type conversion (e.g. datatime).
- writer.writeArrayRecord(array);
- rowsRead++;
- }
- } finally {
- executor.closeReader();
+ SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws Exception {
+ KiteDatasetExecutor executor = getExecutor(uri);
+ DataWriter writer = context.getDataWriter();
+ Object[] array;
+ rowsRead = 0L;
+
+ try {
+ while ((array = executor.readRecord()) != null) {
+ // TODO: SQOOP-1616 will cover more column data types. Use schema and do data type conversion (e.g. datatime).
+ writer.writeArrayRecord(array);
+ rowsRead++;
+ }
+ } finally {
+ executor.closeReader();
+ }
+ return null;
+ }
+ });
+ } catch (IOException | InterruptedException e) {
+ throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
index 4502d59..002286b 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
@@ -17,8 +17,11 @@
*/
package org.apache.sqoop.connector.kite;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
@@ -31,6 +34,8 @@ import org.apache.sqoop.utils.ClassUtils;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.Datasets;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.Set;
/**
@@ -42,16 +47,33 @@ public class KiteFromInitializer extends Initializer<LinkConfiguration,
private static final Logger LOG = Logger.getLogger(KiteFromInitializer.class);
@Override
- public void initialize(InitializerContext context,
- LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
- String uri = ConfigUtil.buildDatasetUri(
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+ public void initialize(final InitializerContext context,
+ final LinkConfiguration linkConfig, final FromJobConfiguration fromJobConfig) {
+ final String uri = ConfigUtil.buildDatasetUri(
linkConfig.linkConfig, fromJobConfig.fromJobConfig.uri);
LOG.debug("Constructed dataset URI: " + uri);
- if (!Datasets.exists(uri)) {
- LOG.error("Dataset does not exist");
- throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0002);
+ KiteUtils.addConfigDirToClasspath(linkConfig);
+ try {
+ SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws Exception {
+ if (!Datasets.exists(uri)) {
+ LOG.error("Dataset does not exist");
+ throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0002);
+ }
+
+ if (ConfigUtil.isHdfsJob(fromJobConfig.fromJobConfig)) {
+ // Generate delegation tokens if we are on secured cluster
+ SecurityUtils.generateDelegationTokens(context.getContext(), new Path(ConfigUtil.removeDatasetPrefix(uri)), new Configuration());
+ }
+ return null;
+ }
+ });
+ } catch (IOException | InterruptedException e) {
+ throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e);
}
}
+
@Override
public Set<String> getJars(InitializerContext context,
LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
@@ -72,12 +94,23 @@ public class KiteFromInitializer extends Initializer<LinkConfiguration,
return jars;
}
+ @SuppressWarnings("rawtypes")
@Override
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
public Schema getSchema(InitializerContext context,
LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
- String uri = ConfigUtil.buildDatasetUri(
+ final String uri = ConfigUtil.buildDatasetUri(
linkConfig.linkConfig, fromJobConfig.fromJobConfig.uri);
- Dataset dataset = Datasets.load(uri);
+ Dataset dataset = null;
+ try {
+ dataset = SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Dataset>() {
+ public Dataset run() {
+ return Datasets.load(uri);
+ }
+ });
+ } catch (IOException | InterruptedException e) {
+ throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e);
+ }
org.apache.avro.Schema avroSchema = dataset.getDescriptor().getSchema();
return AvroDataTypeUtil.createSqoopSchema(avroSchema);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
index ca0a5c7..ff9aa33 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
@@ -17,13 +17,19 @@
*/
package org.apache.sqoop.connector.kite;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
import com.google.common.annotations.VisibleForTesting;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.FileFormat;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
+import org.apache.sqoop.error.code.KiteConnectorError;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
@@ -55,32 +61,42 @@ public class KiteLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
}
@Override
- public void load(LoaderContext context, LinkConfiguration linkConfiguration,
- ToJobConfiguration toJobConfig) throws Exception {
- String uri = ConfigUtil.buildDatasetUri(
+ public void load(final LoaderContext context, final LinkConfiguration linkConfiguration,
+ final ToJobConfiguration toJobConfig) throws Exception {
+ final String uri = ConfigUtil.buildDatasetUri(
linkConfiguration.linkConfig, toJobConfig.toJobConfig);
- KiteDatasetExecutor executor = getExecutor(
- linkConfiguration, uri, context.getSchema(), toJobConfig.toJobConfig.fileFormat);
- LOG.info("Temporary dataset created.");
-
- DataReader reader = context.getDataReader();
- Object[] array;
- boolean success = false;
try {
- while ((array = reader.readArrayRecord()) != null) {
- executor.writeRecord(array);
- rowsWritten++;
- }
- LOG.info(rowsWritten + " data record(s) have been written into dataset.");
- success = true;
- } finally {
- executor.closeWriter();
+ SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws Exception {
+ KiteDatasetExecutor executor = getExecutor(
+ linkConfiguration, uri, context.getSchema(), toJobConfig.toJobConfig.fileFormat);
+ LOG.info("Temporary dataset created.");
+
+ DataReader reader = context.getDataReader();
+ Object[] array;
+ boolean success = false;
+
+ try {
+ while ((array = reader.readArrayRecord()) != null) {
+ executor.writeRecord(array);
+ rowsWritten++;
+ }
+ LOG.info(rowsWritten + " data record(s) have been written into dataset.");
+ success = true;
+ } finally {
+ executor.closeWriter();
- if (!success) {
- LOG.error("Fail to write data, dataset will be removed.");
- executor.deleteDataset();
- }
+ if (!success) {
+ LOG.error("Fail to write data, dataset will be removed.");
+ executor.deleteDataset();
+ }
+ }
+ return null;
+ }
+ });
+ } catch (IOException | InterruptedException e) {
+ throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
index fb83f2b..0ac1836 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
@@ -17,13 +17,19 @@
*/
package org.apache.sqoop.connector.kite;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
import com.google.common.annotations.VisibleForTesting;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.FileFormat;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
+import org.apache.sqoop.error.code.KiteConnectorError;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
import org.apache.sqoop.schema.Schema;
@@ -42,16 +48,25 @@ public class KiteToDestroyer extends Destroyer<LinkConfiguration,
private static final Logger LOG = Logger.getLogger(KiteToDestroyer.class);
@Override
- public void destroy(DestroyerContext context,
- LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
+ public void destroy(final DestroyerContext context,
+ final LinkConfiguration linkConfig, final ToJobConfiguration toJobConfig) {
LOG.info("Running Kite connector destroyer");
- String uri = ConfigUtil.buildDatasetUri(
+ final String uri = ConfigUtil.buildDatasetUri(
linkConfig.linkConfig, toJobConfig.toJobConfig);
- if (ConfigUtil.isHBaseJob(toJobConfig.toJobConfig)) {
- destroyHBaseJob(context, uri, toJobConfig);
- } else {
- destroyHdfsJob(context, uri, toJobConfig);
+ try {
+ SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws Exception {
+ if (ConfigUtil.isHBaseJob(toJobConfig.toJobConfig)) {
+ destroyHBaseJob(context, uri, toJobConfig);
+ } else {
+ destroyHdfsJob(context, uri, toJobConfig);
+ }
+ return null;
+ }
+ });
+ } catch (IOException | InterruptedException e) {
+ throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
index effab19..4fc2687 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
@@ -17,8 +17,11 @@
*/
package org.apache.sqoop.connector.kite;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
@@ -30,6 +33,8 @@ import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.utils.ClassUtils;
import org.kitesdk.data.Datasets;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.Set;
/**
@@ -43,14 +48,30 @@ public class KiteToInitializer extends Initializer<LinkConfiguration,
private static final Logger LOG = Logger.getLogger(KiteToInitializer.class);
@Override
- public void initialize(InitializerContext context,
- LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
- String uri = ConfigUtil.buildDatasetUri(
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+ public void initialize(final InitializerContext context,
+ final LinkConfiguration linkConfig, final ToJobConfiguration toJobConfig) {
+ final String uri = ConfigUtil.buildDatasetUri(
linkConfig.linkConfig, toJobConfig.toJobConfig);
LOG.debug("Constructed dataset URI: " + uri);
- if (Datasets.exists(uri)) {
- LOG.error("Overwrite an existing dataset is not expected in new create mode.");
- throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001);
+ KiteUtils.addConfigDirToClasspath(linkConfig);
+ try {
+ SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws Exception {
+ if (Datasets.exists(uri)) {
+ LOG.error("Overwrite an existing dataset is not expected in new create mode.");
+ throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001);
+ }
+
+ if (ConfigUtil.isHdfsJob(toJobConfig.toJobConfig)) {
+ // Generate delegation tokens if we are on secured cluster
+ SecurityUtils.generateDelegationTokens(context.getContext(), new Path(ConfigUtil.removeDatasetPrefix(uri)), new Configuration());
+ }
+ return null;
+ }
+ });
+ } catch (IOException | InterruptedException e) {
+ throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteUtils.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteUtils.java
new file mode 100644
index 0000000..fc6177d
--- /dev/null
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteUtils.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.kite;
+
+import java.io.File;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
+import org.apache.sqoop.error.code.KiteConnectorError;
+
+/**
+ * Utilities for Kite.
+ */
+public class KiteUtils {
+
+ private static final Logger LOG = Logger.getLogger(KiteUtils.class);
+
+ private static final String DEFAULT_HADOOP_CONF_DIR = "/etc/hadoop/conf";
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+ public static void addConfigDirToClasspath(final LinkConfiguration linkConfig) {
+ File configDir = new File(getConfDir(linkConfig));
+ try {
+ final Method method = URLClassLoader.class.getDeclaredMethod("addURL", new Class[]{URL.class});
+ AccessController.doPrivileged(new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ method.setAccessible(true);
+ return null;
+ }
+ });
+ method.invoke(ClassLoader.getSystemClassLoader(), new Object[]{configDir.toURI().toURL()});
+ } catch (NoSuchMethodException | SecurityException
+ | InvocationTargetException | IllegalAccessException
+ | IllegalArgumentException | MalformedURLException e) {
+ throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0004, e);
+ }
+ LOG.debug("Added file " + configDir + " to classpath");
+ }
+
+ private static String getConfDir(LinkConfiguration linkConfig) {
+ String confDir = linkConfig.linkConfig.confDir;
+ if (StringUtils.isBlank(confDir)) {
+ confDir = DEFAULT_HADOOP_CONF_DIR;
+ }
+ return confDir;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
index e63bccf..3393c6e 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
@@ -68,4 +68,19 @@ public class ConfigUtil {
return toJobConfig.uri.startsWith("dataset:hbase:");
}
+ public static boolean isHdfsJob(ToJobConfig toJobConfig) {
+ return toJobConfig.uri.startsWith("dataset:hdfs:");
+ }
+
+ public static boolean isHdfsJob(FromJobConfig fromJobConfig) {
+ return fromJobConfig.uri.startsWith("dataset:hdfs:");
+ }
+
+ public static String removeDatasetPrefix(String uri) {
+ if (uri.startsWith("dataset:")) {
+ return uri.substring("dataset:".length());
+ } else {
+ return uri;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
index ee31f15..508d63e 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
@@ -23,6 +23,7 @@ import org.apache.sqoop.model.Input;
import org.apache.sqoop.model.Validator;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.validators.AbstractValidator;
+import org.apache.sqoop.validation.validators.DirectoryExistsValidator;
import org.apache.sqoop.validation.validators.HostAndPortValidator;
@ConfigClass(validators = {@Validator(LinkConfig.ConfigValidator.class)})
@@ -31,6 +32,9 @@ public class LinkConfig {
@Input(size = 255)
public String authority;
+ @Input(size = 255, validators = { @Validator(DirectoryExistsValidator.class)})
+ public String confDir;
+
public static class ConfigValidator extends AbstractValidator<LinkConfig> {
@Override
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/resources/kite-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/resources/kite-connector-config.properties b/connector/connector-kite/src/main/resources/kite-connector-config.properties
index c134ac3..f384f3d 100644
--- a/connector/connector-kite/src/main/resources/kite-connector-config.properties
+++ b/connector/connector-kite/src/main/resources/kite-connector-config.properties
@@ -25,6 +25,9 @@ linkConfig.help = You must supply the information requested in order to create a
linkConfig.authority.label = HDFS host and port
linkConfig.authority.help = Optional to override HDFS file system location.
+linkConfig.confDir.label = Hadoop conf directory
+linkConfig.confDir.help = Directory with Hadoop configuration files. This directory will be added to the classpath.
+
# To Job Config
#
toJobConfig.label = To Kite Dataset Configuration
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
index c49be92..4da9218 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
@@ -18,6 +18,7 @@
package org.apache.sqoop.connector.kite;
+import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.etl.io.DataWriter;
@@ -73,7 +74,7 @@ public class TestKiteExtractor {
// setup
Schema schema = new Schema("testExtractor");
schema.addColumn(new Text("TextCol"));
- ExtractorContext context = new ExtractorContext(null, writerMock, schema, "test_user");
+ ExtractorContext context = new ExtractorContext(new MutableMapContext(), writerMock, schema, "test_user");
LinkConfiguration linkConfig = new LinkConfiguration();
FromJobConfiguration jobConfig = new FromJobConfiguration();
KiteDatasetPartition partition = new KiteDatasetPartition();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java
index 6df5d83..5e301bf 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java
@@ -18,9 +18,12 @@
package org.apache.sqoop.connector.kite;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
+import org.apache.sqoop.job.etl.InitializerContext;
import org.kitesdk.data.Datasets;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -33,10 +36,11 @@ import static org.mockito.MockitoAnnotations.initMocks;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
@PrepareForTest(Datasets.class)
-@PowerMockIgnore("org.apache.sqoop.common.ErrorCode")
+@PowerMockIgnore({"org.apache.sqoop.common.ErrorCode", "com.sun.security.auth.UnixPrincipal"})
public class TestKiteFromInitializer extends PowerMockTestCase {
private KiteFromInitializer initializer;
+ private InitializerContext initializerContext;
@BeforeMethod(alwaysRun = true)
public void setUp() {
@@ -44,6 +48,7 @@ public class TestKiteFromInitializer extends PowerMockTestCase {
mockStatic(Datasets.class);
initializer = new KiteFromInitializer();
+ initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
}
@Test
@@ -54,7 +59,7 @@ public class TestKiteFromInitializer extends PowerMockTestCase {
when(Datasets.exists(jobConfig.fromJobConfig.uri)).thenReturn(true);
// exercise
- initializer.initialize(null, new LinkConfiguration(), jobConfig);
+ initializer.initialize(initializerContext, new LinkConfiguration(), jobConfig);
}
@Test(expectedExceptions = SqoopException.class)
@@ -65,7 +70,7 @@ public class TestKiteFromInitializer extends PowerMockTestCase {
when(Datasets.exists(jobConfig.fromJobConfig.uri)).thenReturn(false);
// exercise
- initializer.initialize(null, new LinkConfiguration(), jobConfig);
+ initializer.initialize(initializerContext, new LinkConfiguration(), jobConfig);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
index c5aa1bd..c4ab867 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
@@ -18,6 +18,7 @@
package org.apache.sqoop.connector.kite;
+import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
@@ -81,7 +82,7 @@ public class TestKiteLoader {
return null;
}
};
- LoaderContext context = new LoaderContext(null, reader, schema, "test_user");
+ LoaderContext context = new LoaderContext(new MutableMapContext(), reader, schema, "test_user");
LinkConfiguration linkConfig = new LinkConfiguration();
ToJobConfiguration toJobConfig = new ToJobConfiguration();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
index 00b8871..cf26986 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
@@ -18,6 +18,7 @@
package org.apache.sqoop.connector.kite;
+import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
@@ -38,7 +39,7 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.verifyStatic;
@PrepareForTest({KiteDatasetExecutor.class, Datasets.class})
-@PowerMockIgnore("org.apache.sqoop.common.ErrorCode")
+@PowerMockIgnore({"org.apache.sqoop.common.ErrorCode", "com.sun.security.auth.UnixPrincipal"})
public class TestKiteToDestroyer extends PowerMockTestCase {
private KiteToDestroyer destroyer;
@@ -78,7 +79,7 @@ public class TestKiteToDestroyer extends PowerMockTestCase {
@Test
public void testDestroyForSuccessfulJob() {
// setup
- DestroyerContext context = new DestroyerContext(null, true, null, user);
+ DestroyerContext context = new DestroyerContext(new MutableMapContext(), true, null, user);
when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
.thenReturn(expectedUris);
@@ -94,7 +95,7 @@ public class TestKiteToDestroyer extends PowerMockTestCase {
@Test
public void testDestroyForFailedJob() {
// setup
- DestroyerContext context = new DestroyerContext(null, false, null, user);
+ DestroyerContext context = new DestroyerContext(new MutableMapContext(), false, null, user);
when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
.thenReturn(expectedUris);
for (String uri : expectedUris) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
index 5230ffe..e5bb667 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
@@ -18,9 +18,11 @@
package org.apache.sqoop.connector.kite;
+import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.schema.Schema;
import org.kitesdk.data.Datasets;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -36,10 +38,11 @@ import static org.mockito.MockitoAnnotations.initMocks;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
@PrepareForTest(Datasets.class)
-@PowerMockIgnore("org.apache.sqoop.common.ErrorCode")
+@PowerMockIgnore({"org.apache.sqoop.common.ErrorCode", "com.sun.security.auth.UnixPrincipal"})
public class TestKiteToInitializer extends PowerMockTestCase {
private KiteToInitializer initializer;
+ private InitializerContext initializerContext;
@BeforeMethod(alwaysRun = true)
public void setUp() {
@@ -47,6 +50,7 @@ public class TestKiteToInitializer extends PowerMockTestCase {
mockStatic(Datasets.class);
initializer = new KiteToInitializer();
+ initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
}
@Test
@@ -59,7 +63,7 @@ public class TestKiteToInitializer extends PowerMockTestCase {
.thenReturn(false);
// exercise
- initializer.initialize(null, linkConfig, toJobConfig);
+ initializer.initialize(initializerContext, linkConfig, toJobConfig);
}
@Test(expectedExceptions = SqoopException.class)
@@ -72,7 +76,7 @@ public class TestKiteToInitializer extends PowerMockTestCase {
.thenReturn(true);
// exercise
- initializer.initialize(null, linkConfig, toJobConfig);
+ initializer.initialize(initializerContext, linkConfig, toJobConfig);
}
@Test
http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java
index 7b2aced..be9fef1 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java
@@ -64,6 +64,7 @@ public class FromRDBMSToKiteTest extends ConnectorTestCase {
// Kite link
MLink kiteLink = getClient().createLink("kite-connector");
kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.authority").setValue(hdfsClient.getUri().getAuthority());
+ kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.confDir").setValue(getCluster().getConfigurationPath());
saveLink(kiteLink);
// Job creation