You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by hs...@apache.org on 2015/09/25 06:23:05 UTC
sqoop git commit: SQOOP-2524. Sqoop2: Add S3 support to HDFS Connector
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 a70975c66 -> bf09850c3
SQOOP-2524. Sqoop2: Add S3 support to HDFS Connector
(Jarcec via Hari)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/bf09850c
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/bf09850c
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/bf09850c
Branch: refs/heads/sqoop2
Commit: bf09850c33721f8d7629a121b15c7f57da9e295f
Parents: a70975c
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Sep 24 21:22:17 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Sep 24 21:22:17 2015 -0700
----------------------------------------------------------------------
.../org/apache/sqoop/common/MapContext.java | 2 +-
.../org/apache/sqoop/common/TestMapContext.java | 4 +
.../sqoop/common/TestMutableMapContext.java | 4 +
connector/connector-hdfs/pom.xml | 9 +-
.../connector/hdfs/HdfsFromInitializer.java | 3 +-
.../sqoop/connector/hdfs/HdfsToInitializer.java | 3 +-
pom.xml | 6 +
test/pom.xml | 6 +
.../minicluster/TomcatSqoopMiniCluster.java | 4 +-
.../integration/connector/hdfs/S3Test.java | 132 +++++++++++++++++++
10 files changed, 168 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/common/src/main/java/org/apache/sqoop/common/MapContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/common/MapContext.java b/common/src/main/java/org/apache/sqoop/common/MapContext.java
index fc722c0..3167530 100644
--- a/common/src/main/java/org/apache/sqoop/common/MapContext.java
+++ b/common/src/main/java/org/apache/sqoop/common/MapContext.java
@@ -36,7 +36,7 @@ public class MapContext implements ImmutableContext {
private final Map<String, String> options;
public MapContext(Map<String, String> options) {
- this.options = options;
+ this.options = options == null ? new HashMap<String, String>() : options;
}
protected Map<String, String> getOptions() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/common/src/test/java/org/apache/sqoop/common/TestMapContext.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/common/TestMapContext.java b/common/src/test/java/org/apache/sqoop/common/TestMapContext.java
index 2a27c0c..22f42a4 100644
--- a/common/src/test/java/org/apache/sqoop/common/TestMapContext.java
+++ b/common/src/test/java/org/apache/sqoop/common/TestMapContext.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static junit.framework.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -40,6 +41,9 @@ public class TestMapContext {
options.put("testkey", "testvalue");
MapContext mc = new MapContext(options);
Assert.assertEquals("testvalue", mc.getString("testkey"));
+
+ MapContext nullMc = new MapContext(null);
+ assertNull(nullMc.getString("random.key.property"));
}
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java b/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java
index db7fa34..3cbe7be 100644
--- a/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java
+++ b/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java
@@ -50,6 +50,10 @@ public class TestMutableMapContext {
assertEquals(context.getLong("long", -1), 1L);
assertEquals(context.getInt("integer", -1), 13);
assertEquals(context.getBoolean("boolean", false), true);
+
+ context = new MutableMapContext(null);
+ context.setString("key", "value");
+ assertEquals(context.getString("key"), "value");
}
@Test
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/connector/connector-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml
index 512b54c..a28989c 100644
--- a/connector/connector-hdfs/pom.xml
+++ b/connector/connector-hdfs/pom.xml
@@ -59,6 +59,13 @@ limitations under the License.
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
</dependencies>
<build>
@@ -77,4 +84,4 @@ limitations under the License.
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
index 6c943a8..e98e02b 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
@@ -51,8 +52,8 @@ public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobC
assert jobConfig.incremental != null;
Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
+ HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration);
HdfsUtils.configurationToContext(configuration, context.getContext());
- context.getContext().setAll(linkConfig.linkConfig.configOverrides);
boolean incremental = jobConfig.incremental.incrementalType != null && jobConfig.incremental.incrementalType == IncrementalType.NEW_FILES;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
index 5bb0928..29cf3b9 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
@@ -47,8 +48,8 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi
assert jobConfig.toJobConfig.outputDirectory != null;
Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
+ HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration);
HdfsUtils.configurationToContext(configuration, context.getContext());
- context.getContext().setAll(linkConfig.linkConfig.configOverrides);
boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6e334a7..ef3f5f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -426,6 +426,12 @@ limitations under the License.
<version>${hadoop.2.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.2.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
index 3e11f59..8218477 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -113,6 +113,12 @@ limitations under the License.
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java b/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java
index 83f42b6..a0ef78a 100644
--- a/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java
+++ b/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java
@@ -131,7 +131,9 @@ public class TomcatSqoopMiniCluster extends SqoopMiniCluster {
jar.contains("sqljdbc") || // Microsoft SQL Server driver
jar.contains("libfb303") || // Facebook thrift lib
jar.contains("datanucleus-") || // Data nucleus libs
- jar.contains("google") // Google libraries (guava, ...)
+ jar.contains("google") || // Google libraries (guava, ...)
+ jar.contains("joda-time") || // Joda time
+ jar.contains("aws-java-sdk") // Amazon AWS SDK (S3, ...)
) {
extraClassPath.add(jar);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
new file mode 100644
index 0000000..73a9acf
--- /dev/null
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.test.asserts.HdfsAsserts;
+import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.testng.SkipException;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Run with something like:
+ *
+ * mvn clean test -pl test -Dtest=S3Test
+ * -Dorg.apache.sqoop.integration.connector.hdfs.s3.bucket=test-bucket
+ * -Dorg.apache.sqoop.integration.connector.hdfs.s3.access=AKI...
+ * -Dorg.apache.sqoop.integration.connector.hdfs.s3.secret=93JKx...
+ */
+public class S3Test extends ConnectorTestCase {
+
+ public static final String PROPERTY_BUCKET = "org.apache.sqoop.integration.connector.hdfs.s3.bucket";
+ public static final String PROPERTY_ACCESS = "org.apache.sqoop.integration.connector.hdfs.s3.access";
+ public static final String PROPERTY_SECRET = "org.apache.sqoop.integration.connector.hdfs.s3.secret";
+
+ public static final String BUCKET = System.getProperty(PROPERTY_BUCKET);
+ public static final String ACCESS = System.getProperty(PROPERTY_ACCESS);
+ public static final String SECRET = System.getProperty(PROPERTY_SECRET);
+
+ public static final String BUCKET_URL = "s3a://" + BUCKET;
+
+ @Test
+ public void test() throws Exception {
+ // Verify presence external configuration
+ assumeNotNull(BUCKET, PROPERTY_BUCKET);
+ assumeNotNull(ACCESS, PROPERTY_ACCESS);
+ assumeNotNull(SECRET, PROPERTY_SECRET);
+
+ createAndLoadTableCities();
+
+ Configuration hadoopConf = new Configuration();
+ hadoopConf.set("fs.defaultFS", BUCKET_URL);
+ hadoopConf.set("fs.s3a.access.key", ACCESS);
+ hadoopConf.set("fs.s3a.secret.key", SECRET);
+ FileSystem s3Client = FileSystem.get(hadoopConf);
+
+ // RDBMS link
+ MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
+ fillRdbmsLinkConfig(rdbmsLink);
+ saveLink(rdbmsLink);
+
+ // HDFS link
+ MLink hdfsLink = getClient().createLink("hdfs-connector");
+ hdfsLink.getConnectorLinkConfig().getStringInput("linkConfig.uri").setValue(BUCKET_URL);
+ Map<String, String> configOverrides = new HashMap<>();
+ configOverrides.put("fs.s3a.access.key", ACCESS);
+ configOverrides.put("fs.s3a.secret.key", SECRET);
+ hdfsLink.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(configOverrides);
+ saveLink(hdfsLink);
+
+ s3Client.delete(new Path(getMapreduceDirectory()), true);
+
+ // DB -> S3
+ MJob db2aws = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
+ fillRdbmsFromConfig(db2aws, "id");
+ fillHdfsToConfig(db2aws, ToFormat.TEXT_FILE);
+
+ saveJob(db2aws);
+ executeJob(db2aws);
+
+ // Verifying locally imported data
+ HdfsAsserts.assertMapreduceOutput(s3Client, getMapreduceDirectory(),
+ "1,'USA','2004-10-23','San Francisco'",
+ "2,'USA','2004-10-24','Sunnyvale'",
+ "3,'Czech Republic','2004-10-25','Brno'",
+ "4,'USA','2004-10-26','Palo Alto'"
+ );
+
+ // This re-creates the table completely
+ createTableCities();
+ assertEquals(provider.rowCount(getTableName()), 0);
+
+ // S3 -> DB
+ MJob aws2db = getClient().createJob(hdfsLink.getPersistenceId(), rdbmsLink.getPersistenceId());
+ fillHdfsFromConfig(aws2db);
+ fillRdbmsToConfig(aws2db);
+
+ saveJob(aws2db);
+ executeJob(aws2db);
+
+ // Final verification
+ assertEquals(4L, provider.rowCount(getTableName()));
+ assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
+ assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
+ assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
+ assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
+ }
+
+ /**
+ * Skip this test if given value is null
+ */
+ void assumeNotNull(String value, String key) {
+ if(value == null) {
+ throw new SkipException("Missing value for " + key);
+ }
+ }
+
+}