You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/12/27 08:21:15 UTC
sqoop git commit: SQOOP-2757: Sqoop2: Add module connector-sdk-hadoop
to hold hadoop specific SDK classes used by the connectors
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 b1561866e -> b69fb9b68
SQOOP-2757: Sqoop2: Add module connector-sdk-hadoop to hold hadoop specific SDK classes used by the connectors
(Dian Fu via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b69fb9b6
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b69fb9b6
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b69fb9b6
Branch: refs/heads/sqoop2
Commit: b69fb9b6801e70cd4b144ff963c08a3679f1b261
Parents: b156186
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Sun Dec 27 08:20:10 2015 +0100
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Sun Dec 27 08:20:10 2015 +0100
----------------------------------------------------------------------
connector/connector-hdfs/pom.xml | 6 +
.../sqoop/connector/hdfs/HdfsConstants.java | 2 -
.../sqoop/connector/hdfs/HdfsExtractor.java | 3 +-
.../connector/hdfs/HdfsFromInitializer.java | 4 +-
.../apache/sqoop/connector/hdfs/HdfsLoader.java | 3 +-
.../sqoop/connector/hdfs/HdfsPartitioner.java | 3 +-
.../sqoop/connector/hdfs/HdfsToDestroyer.java | 4 +-
.../sqoop/connector/hdfs/HdfsToInitializer.java | 4 +-
.../connector/hdfs/security/SecurityUtils.java | 146 ------------------
.../hdfs/security/TestSecurityUtils.java | 49 -------
connector/connector-sdk-hadoop/pom.xml | 60 ++++++++
.../hadoop/security/SecurityUtils.java | 147 +++++++++++++++++++
.../hadoop/security/TestSecurityUtils.java | 50 +++++++
connector/pom.xml | 1 +
pom.xml | 5 +
server/pom.xml | 5 +
16 files changed, 280 insertions(+), 212 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml
index d695750..5996314 100644
--- a/connector/connector-hdfs/pom.xml
+++ b/connector/connector-hdfs/pom.xml
@@ -44,6 +44,12 @@ limitations under the License.
</dependency>
<dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>connector-sdk-hadoop</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
index f06300a..39ee4a3 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
@@ -35,6 +35,4 @@ public final class HdfsConstants extends Constants {
public static final String WORK_DIRECTORY = PREFIX + "work_dir";
public static final String MAX_IMPORT_DATE = PREFIX + "max_import_date";
-
- public static final String DELEGATION_TOKENS = PREFIX + "delegation_tokens";
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index 441fe30..a813c47 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -30,14 +30,13 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.LineReader;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.SqoopIDFUtils;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
-import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.job.etl.Extractor;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
index 3a0d626..d815d58 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
@@ -21,19 +21,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
-import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.log4j.Logger;
-import java.io.IOException;
import java.security.PrivilegedExceptionAction;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index a6551e6..774221a 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -25,16 +25,15 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.SqoopIDFUtils;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter;
-import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
index d01e932..f35b8e9 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
@@ -39,13 +39,12 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
-import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
index 858042c..0c62ab1 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
@@ -21,17 +21,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
-import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
-import java.io.IOException;
import java.security.PrivilegedExceptionAction;
public class HdfsToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
index 204c978..70e0fde 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -21,18 +21,16 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
-import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
-import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.UUID;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java
deleted file mode 100644
index 0a42936..0000000
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.hdfs.security;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.ImmutableContext;
-import org.apache.sqoop.common.MutableContext;
-import org.apache.sqoop.connector.hdfs.HdfsConstants;
-import org.apache.sqoop.job.etl.TransferableContext;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Sqoop is designed in a way to abstract connectors from execution engine. Hence the security portion
- * (like generating and distributing delegation tokens) won't happen automatically for us under the hood
- * and we have to do everything manually.
- */
-public class SecurityUtils {
-
- private static final Logger LOG = Logger.getLogger(SecurityUtils.class);
-
- /**
- * Creates proxy user for user who submitted the Sqoop job (e.g. who has issued the "start job" commnad)
- */
- static public UserGroupInformation createProxyUser(TransferableContext context) throws IOException {
- return UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser());
- }
-
- /**
- * Creates proxy user and load's it up with all delegation tokens that we have created ourselves
- */
- static public UserGroupInformation createProxyUserAndLoadDelegationTokens(TransferableContext context) throws IOException {
- UserGroupInformation proxyUser = createProxyUser(context);
- loadDelegationTokensToUGI(proxyUser, context.getContext());
-
- return proxyUser;
- }
-
- /**
- * Generate delegation tokens for current user (this code is suppose to run in doAs) and store them
- * serialized in given mutable context.
- */
- static public void generateDelegationTokens(MutableContext context, Path path, Configuration configuration) throws IOException {
- if(!UserGroupInformation.isSecurityEnabled()) {
- LOG.info("Running on unsecured cluster, skipping delegation token generation.");
- return;
- }
-
- // String representation of all tokens that we will create (most likely single one)
- List<String> tokens = new LinkedList<>();
-
- Credentials credentials = new Credentials();
- TokenCache.obtainTokensForNamenodes(credentials, new Path[]{path}, configuration);
- for (Token token : credentials.getAllTokens()) {
- LOG.info("Generated token: " + token.toString());
- tokens.add(serializeToken(token));
- }
-
- // The context classes are transferred via "Credentials" rather then with jobconf, so we're not leaking the DT out here
- if(tokens.size() > 0) {
- context.setString(HdfsConstants.DELEGATION_TOKENS, StringUtils.join(tokens, " "));
- }
- }
-
- /**
- * Loads delegation tokens that we created and serialize into the mutable context
- */
- static public void loadDelegationTokensToUGI(UserGroupInformation ugi, ImmutableContext context) throws IOException {
- String tokenList = context.getString(HdfsConstants.DELEGATION_TOKENS);
- if(tokenList == null) {
- LOG.info("No delegation tokens found");
- return;
- }
-
- for(String stringToken: tokenList.split(" ")) {
- Token token = deserializeToken(stringToken);
- LOG.info("Loaded delegation token: " + token.toString());
- ugi.addToken(token);
- }
- }
-
- /**
- * Serialize given token into String.
- *
- * We'll convert token to byte[] using Writable methods fro I/O and then Base64
- * encode the bytes to a human readable string.
- */
- static public String serializeToken(Token token) throws IOException {
- // Serialize the Token to a byte array
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- token.write(dos);
- baos.flush();
-
- return Base64.encodeBase64String(baos.toByteArray());
- }
-
- /**
- * Deserialize token from given String.
- *
- * See serializeToken for details how the token is expected to be serialized.
- */
- static public Token deserializeToken(String stringToken) throws IOException {
- Token token = new Token();
- byte[] tokenBytes = Base64.decodeBase64(stringToken);
-
- ByteArrayInputStream bais = new ByteArrayInputStream(tokenBytes);
- DataInputStream dis = new DataInputStream(bais);
- token.readFields(dis);
-
- return token;
- }
-
- private SecurityUtils() {
- // Initialization is prohibited
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java
deleted file mode 100644
index 713c704..0000000
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.hdfs.security;
-
-import org.apache.hadoop.io.Text;
-import org.testng.annotations.Test;
-import org.apache.hadoop.security.token.Token;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-public class TestSecurityUtils {
-
- @Test
- public void testTokenSerializationDeserialization() throws Exception {
- byte[] identifier = "identifier".getBytes();
- byte[] password = "password".getBytes();
- Text kind = new Text("kind");
- Text service = new Text("service");
-
- Token token = new Token(identifier, password, kind, service);
- String serializedForm = SecurityUtils.serializeToken(token);
- assertNotNull(serializedForm);
-
- Token deserializedToken = SecurityUtils.deserializeToken(serializedForm);
- assertNotNull(deserializedToken);
-
- assertEquals(identifier, deserializedToken.getIdentifier());
- assertEquals(password, deserializedToken.getPassword());
- assertEquals(kind.toString(), deserializedToken.getKind().toString());
- assertEquals(service.toString(), deserializedToken.getService().toString());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-sdk-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-sdk-hadoop/pom.xml b/connector/connector-sdk-hadoop/pom.xml
new file mode 100644
index 0000000..2793886
--- /dev/null
+++ b/connector/connector-sdk-hadoop/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>connector</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>connector-sdk-hadoop</artifactId>
+ <name>Sqoop Connector Hadoop Specific SDK</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>sqoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-sdk-hadoop/src/main/java/org/apache/sqoop/connector/hadoop/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk-hadoop/src/main/java/org/apache/sqoop/connector/hadoop/security/SecurityUtils.java b/connector/connector-sdk-hadoop/src/main/java/org/apache/sqoop/connector/hadoop/security/SecurityUtils.java
new file mode 100644
index 0000000..44f5c03
--- /dev/null
+++ b/connector/connector-sdk-hadoop/src/main/java/org/apache/sqoop/connector/hadoop/security/SecurityUtils.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hadoop.security;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.job.etl.TransferableContext;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Sqoop is designed in a way to abstract connectors from execution engine. Hence the security portion
+ * (like generating and distributing delegation tokens) won't happen automatically for us under the hood
+ * and we have to do everything manually.
+ */
+public class SecurityUtils {
+
+ private static final Logger LOG = Logger.getLogger(SecurityUtils.class);
+
+ private static final String DELEGATION_TOKENS = "org.apache.sqoop.connector.delegation_tokens";
+
+ /**
+ * Creates proxy user for user who submitted the Sqoop job (e.g. who has issued the "start job" commnad)
+ */
+ static public UserGroupInformation createProxyUser(TransferableContext context) throws IOException {
+ return UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser());
+ }
+
+ /**
+ * Creates proxy user and load's it up with all delegation tokens that we have created ourselves
+ */
+ static public UserGroupInformation createProxyUserAndLoadDelegationTokens(TransferableContext context) throws IOException {
+ UserGroupInformation proxyUser = createProxyUser(context);
+ loadDelegationTokensToUGI(proxyUser, context.getContext());
+
+ return proxyUser;
+ }
+
+ /**
+ * Generate delegation tokens for current user (this code is suppose to run in doAs) and store them
+ * serialized in given mutable context.
+ */
+ static public void generateDelegationTokens(MutableContext context, Path path, Configuration configuration) throws IOException {
+ if(!UserGroupInformation.isSecurityEnabled()) {
+ LOG.info("Running on unsecured cluster, skipping delegation token generation.");
+ return;
+ }
+
+ // String representation of all tokens that we will create (most likely single one)
+ List<String> tokens = new LinkedList<>();
+
+ Credentials credentials = new Credentials();
+ TokenCache.obtainTokensForNamenodes(credentials, new Path[]{path}, configuration);
+ for (Token token : credentials.getAllTokens()) {
+ LOG.info("Generated token: " + token.toString());
+ tokens.add(serializeToken(token));
+ }
+
+ // The context classes are transferred via "Credentials" rather then with jobconf, so we're not leaking the DT out here
+ if(tokens.size() > 0) {
+ context.setString(DELEGATION_TOKENS, StringUtils.join(tokens, " "));
+ }
+ }
+
+ /**
+ * Loads delegation tokens that we created and serialize into the mutable context
+ */
+ static public void loadDelegationTokensToUGI(UserGroupInformation ugi, ImmutableContext context) throws IOException {
+ String tokenList = context.getString(DELEGATION_TOKENS);
+ if(tokenList == null) {
+ LOG.info("No delegation tokens found");
+ return;
+ }
+
+ for(String stringToken: tokenList.split(" ")) {
+ Token token = deserializeToken(stringToken);
+ LOG.info("Loaded delegation token: " + token.toString());
+ ugi.addToken(token);
+ }
+ }
+
+ /**
+ * Serialize given token into String.
+ *
+ * We'll convert token to byte[] using Writable methods fro I/O and then Base64
+ * encode the bytes to a human readable string.
+ */
+ static public String serializeToken(Token token) throws IOException {
+ // Serialize the Token to a byte array
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ token.write(dos);
+ baos.flush();
+
+ return Base64.encodeBase64String(baos.toByteArray());
+ }
+
+ /**
+ * Deserialize token from given String.
+ *
+ * See serializeToken for details how the token is expected to be serialized.
+ */
+ static public Token deserializeToken(String stringToken) throws IOException {
+ Token token = new Token();
+ byte[] tokenBytes = Base64.decodeBase64(stringToken);
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(tokenBytes);
+ DataInputStream dis = new DataInputStream(bais);
+ token.readFields(dis);
+
+ return token;
+ }
+
+ private SecurityUtils() {
+ // Initialization is prohibited
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-sdk-hadoop/src/test/java/org/apache/sqoop/connector/hadoop/security/TestSecurityUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk-hadoop/src/test/java/org/apache/sqoop/connector/hadoop/security/TestSecurityUtils.java b/connector/connector-sdk-hadoop/src/test/java/org/apache/sqoop/connector/hadoop/security/TestSecurityUtils.java
new file mode 100644
index 0000000..59293b2
--- /dev/null
+++ b/connector/connector-sdk-hadoop/src/test/java/org/apache/sqoop/connector/hadoop/security/TestSecurityUtils.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hadoop.security;
+
+import org.apache.hadoop.io.Text;
+import org.testng.annotations.Test;
+import org.apache.hadoop.security.token.Token;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class TestSecurityUtils {
+
+ @Test
+ public void testTokenSerializationDeserialization() throws Exception {
+ byte[] identifier = "identifier".getBytes();
+ byte[] password = "password".getBytes();
+ Text kind = new Text("kind");
+ Text service = new Text("service");
+
+ Token token = new Token(identifier, password, kind, service);
+ String serializedForm = SecurityUtils.serializeToken(token);
+ assertNotNull(serializedForm);
+
+ Token deserializedToken = SecurityUtils.deserializeToken(serializedForm);
+ assertNotNull(deserializedToken);
+
+ assertEquals(identifier, deserializedToken.getIdentifier());
+ assertEquals(password, deserializedToken.getPassword());
+ assertEquals(kind.toString(), deserializedToken.getKind().toString());
+ assertEquals(service.toString(), deserializedToken.getService().toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/pom.xml
----------------------------------------------------------------------
diff --git a/connector/pom.xml b/connector/pom.xml
index be8fcb1..7340b37 100644
--- a/connector/pom.xml
+++ b/connector/pom.xml
@@ -34,6 +34,7 @@ limitations under the License.
<modules>
<module>connector-sdk</module>
+ <module>connector-sdk-hadoop</module>
<module>connector-generic-jdbc</module>
<module>connector-hdfs</module>
<module>connector-kite</module>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d7b0dd5..12786ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -347,6 +347,11 @@ limitations under the License.
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>connector-sdk-hadoop</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-generic-jdbc</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index c24183c..c0e40d5 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -78,6 +78,11 @@ limitations under the License.
</dependency>
<dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>connector-sdk-hadoop</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-generic-jdbc</artifactId>
<exclusions>