You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by hs...@apache.org on 2015/12/02 21:15:23 UTC
sqoop git commit: SQOOP-2709. Sqoop2: HDFS: Make sure impersonation
works on secured cluster.
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 e2fc4a75e -> 408e3d566
SQOOP-2709. Sqoop2: HDFS: Make sure impersonation works on secured cluster.
(Jarcec via Hari)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/408e3d56
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/408e3d56
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/408e3d56
Branch: refs/heads/sqoop2
Commit: 408e3d5663d511b99511641a5bcf66a7daf6a7af
Parents: e2fc4a7
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Dec 2 12:14:21 2015 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Wed Dec 2 12:14:21 2015 -0800
----------------------------------------------------------------------
.../sqoop/connector/hdfs/HdfsConstants.java | 2 +
.../sqoop/connector/hdfs/HdfsExtractor.java | 3 +-
.../connector/hdfs/HdfsFromInitializer.java | 7 +-
.../apache/sqoop/connector/hdfs/HdfsLoader.java | 4 +-
.../sqoop/connector/hdfs/HdfsPartitioner.java | 4 +-
.../sqoop/connector/hdfs/HdfsToDestroyer.java | 4 +-
.../sqoop/connector/hdfs/HdfsToInitializer.java | 8 +-
.../connector/hdfs/security/SecurityUtils.java | 146 +++++++++++++++++++
.../hdfs/security/TestSecurityUtils.java | 49 +++++++
9 files changed, 217 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
index 39ee4a3..f06300a 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
@@ -35,4 +35,6 @@ public final class HdfsConstants extends Constants {
public static final String WORK_DIRECTORY = PREFIX + "work_dir";
public static final String MAX_IMPORT_DATE = PREFIX + "max_import_date";
+
+ public static final String DELEGATION_TOKENS = PREFIX + "delegation_tokens";
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index 583acdd..441fe30 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -37,6 +37,7 @@ import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.SqoopIDFUtils;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.job.etl.Extractor;
@@ -60,7 +61,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
@Override
public void extract(final ExtractorContext context, final LinkConfiguration linkConfiguration, final FromJobConfiguration jobConfiguration, final HdfsPartition partition) {
try {
- UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
+ SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
HdfsUtils.contextToConfiguration(context.getContext(), conf);
dataWriter = context.getDataWriter();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
index be837ca..3a0d626 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
@@ -27,6 +27,7 @@ import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
@@ -62,7 +63,7 @@ public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobC
// In case of incremental import, we need to persist the highest last modified
try {
- UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
+ SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
FileSystem fs = FileSystem.get(configuration);
Path path = new Path(jobConfig.fromJobConfig.inputDirectory);
@@ -89,6 +90,10 @@ public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobC
LOG.info("Maximal age of file is: " + maxModifiedTime);
context.getContext().setLong(HdfsConstants.MAX_IMPORT_DATE, maxModifiedTime);
}
+
+ // Generate delegation tokens if we are on secured cluster
+ SecurityUtils.generateDelegationTokens(context.getContext(), path, configuration);
+
return null;
}
});
http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 04acd18..a6551e6 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -34,6 +34,7 @@ import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter;
+import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader;
@@ -56,8 +57,7 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
@Override
public void load(final LoaderContext context, final LinkConfiguration linkConfiguration,
final ToJobConfiguration toJobConfig) throws Exception {
- UserGroupInformation.createProxyUser(context.getUser(),
- UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
+ SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
Configuration conf = new Configuration();
HdfsUtils.contextToConfiguration(context.getContext(), conf);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
index 998b903..d01e932 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
@@ -45,6 +45,7 @@ import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
@@ -83,8 +84,7 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi
final List<Partition> partitions = new ArrayList<>();
try {
- UserGroupInformation.createProxyUser(context.getUser(),
- UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
+ SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory);
maxSplitSize = numInputBytes / context.getMaxPartitions();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
index 2bad23a..858042c 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
@@ -26,6 +26,7 @@ import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
@@ -50,8 +51,7 @@ public class HdfsToDestroyer extends Destroyer<LinkConfiguration, ToJobConfigura
final Path targetDirectory = new Path(jobConfig.toJobConfig.outputDirectory);
try {
- UserGroupInformation.createProxyUser(context.getUser(),
- UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
+ SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
FileSystem fs = FileSystem.get(configuration);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
index 5856371..204c978 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -27,6 +27,7 @@ import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
@@ -58,8 +59,7 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi
// Verification that given HDFS directory either don't exists or is empty
try {
- UserGroupInformation.createProxyUser(context.getUser(),
- UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
+ SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
FileSystem fs = FileSystem.get(configuration);
Path path = new Path(jobConfig.toJobConfig.outputDirectory);
@@ -76,6 +76,10 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi
}
}
}
+
+ // Generate delegation tokens if we are on secured cluster
+ SecurityUtils.generateDelegationTokens(context.getContext(), path, configuration);
+
return null;
}
});
http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java
new file mode 100644
index 0000000..0a42936
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.security;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.connector.hdfs.HdfsConstants;
+import org.apache.sqoop.job.etl.TransferableContext;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Sqoop is designed in a way to abstract connectors from execution engine. Hence the security portion
+ * (like generating and distributing delegation tokens) won't happen automatically for us under the hood
+ * and we have to do everything manually.
+ */
+public class SecurityUtils {
+
+ private static final Logger LOG = Logger.getLogger(SecurityUtils.class);
+
+ /**
+ * Creates proxy user for user who submitted the Sqoop job (e.g. who has issued the "start job" commnad)
+ */
+ static public UserGroupInformation createProxyUser(TransferableContext context) throws IOException {
+ return UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser());
+ }
+
+ /**
+ * Creates proxy user and load's it up with all delegation tokens that we have created ourselves
+ */
+ static public UserGroupInformation createProxyUserAndLoadDelegationTokens(TransferableContext context) throws IOException {
+ UserGroupInformation proxyUser = createProxyUser(context);
+ loadDelegationTokensToUGI(proxyUser, context.getContext());
+
+ return proxyUser;
+ }
+
+ /**
+ * Generate delegation tokens for current user (this code is suppose to run in doAs) and store them
+ * serialized in given mutable context.
+ */
+ static public void generateDelegationTokens(MutableContext context, Path path, Configuration configuration) throws IOException {
+ if(!UserGroupInformation.isSecurityEnabled()) {
+ LOG.info("Running on unsecured cluster, skipping delegation token generation.");
+ return;
+ }
+
+ // String representation of all tokens that we will create (most likely single one)
+ List<String> tokens = new LinkedList<>();
+
+ Credentials credentials = new Credentials();
+ TokenCache.obtainTokensForNamenodes(credentials, new Path[]{path}, configuration);
+ for (Token token : credentials.getAllTokens()) {
+ LOG.info("Generated token: " + token.toString());
+ tokens.add(serializeToken(token));
+ }
+
+ // The context classes are transferred via "Credentials" rather then with jobconf, so we're not leaking the DT out here
+ if(tokens.size() > 0) {
+ context.setString(HdfsConstants.DELEGATION_TOKENS, StringUtils.join(tokens, " "));
+ }
+ }
+
+ /**
+ * Loads delegation tokens that we created and serialize into the mutable context
+ */
+ static public void loadDelegationTokensToUGI(UserGroupInformation ugi, ImmutableContext context) throws IOException {
+ String tokenList = context.getString(HdfsConstants.DELEGATION_TOKENS);
+ if(tokenList == null) {
+ LOG.info("No delegation tokens found");
+ return;
+ }
+
+ for(String stringToken: tokenList.split(" ")) {
+ Token token = deserializeToken(stringToken);
+ LOG.info("Loaded delegation token: " + token.toString());
+ ugi.addToken(token);
+ }
+ }
+
+ /**
+ * Serialize given token into String.
+ *
+ * We'll convert token to byte[] using Writable methods fro I/O and then Base64
+ * encode the bytes to a human readable string.
+ */
+ static public String serializeToken(Token token) throws IOException {
+ // Serialize the Token to a byte array
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ token.write(dos);
+ baos.flush();
+
+ return Base64.encodeBase64String(baos.toByteArray());
+ }
+
+ /**
+ * Deserialize token from given String.
+ *
+ * See serializeToken for details how the token is expected to be serialized.
+ */
+ static public Token deserializeToken(String stringToken) throws IOException {
+ Token token = new Token();
+ byte[] tokenBytes = Base64.decodeBase64(stringToken);
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(tokenBytes);
+ DataInputStream dis = new DataInputStream(bais);
+ token.readFields(dis);
+
+ return token;
+ }
+
+ private SecurityUtils() {
+ // Initialization is prohibited
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java
new file mode 100644
index 0000000..713c704
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.security;
+
+import org.apache.hadoop.io.Text;
+import org.testng.annotations.Test;
+import org.apache.hadoop.security.token.Token;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class TestSecurityUtils {
+
+ @Test
+ public void testTokenSerializationDeserialization() throws Exception {
+ byte[] identifier = "identifier".getBytes();
+ byte[] password = "password".getBytes();
+ Text kind = new Text("kind");
+ Text service = new Text("service");
+
+ Token token = new Token(identifier, password, kind, service);
+ String serializedForm = SecurityUtils.serializeToken(token);
+ assertNotNull(serializedForm);
+
+ Token deserializedToken = SecurityUtils.deserializeToken(serializedForm);
+ assertNotNull(deserializedToken);
+
+ assertEquals(identifier, deserializedToken.getIdentifier());
+ assertEquals(password, deserializedToken.getPassword());
+ assertEquals(kind.toString(), deserializedToken.getKind().toString());
+ assertEquals(service.toString(), deserializedToken.getService().toString());
+ }
+
+}