You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by hs...@apache.org on 2015/09/25 06:23:05 UTC

sqoop git commit: SQOOP-2524. Sqoop2: Add S3 support to HDFS Connector

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 a70975c66 -> bf09850c3


SQOOP-2524. Sqoop2: Add S3 support to HDFS Connector

(Jarcec via Hari)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/bf09850c
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/bf09850c
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/bf09850c

Branch: refs/heads/sqoop2
Commit: bf09850c33721f8d7629a121b15c7f57da9e295f
Parents: a70975c
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Sep 24 21:22:17 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Sep 24 21:22:17 2015 -0700

----------------------------------------------------------------------
 .../org/apache/sqoop/common/MapContext.java     |   2 +-
 .../org/apache/sqoop/common/TestMapContext.java |   4 +
 .../sqoop/common/TestMutableMapContext.java     |   4 +
 connector/connector-hdfs/pom.xml                |   9 +-
 .../connector/hdfs/HdfsFromInitializer.java     |   3 +-
 .../sqoop/connector/hdfs/HdfsToInitializer.java |   3 +-
 pom.xml                                         |   6 +
 test/pom.xml                                    |   6 +
 .../minicluster/TomcatSqoopMiniCluster.java     |   4 +-
 .../integration/connector/hdfs/S3Test.java      | 132 +++++++++++++++++++
 10 files changed, 168 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/common/src/main/java/org/apache/sqoop/common/MapContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/common/MapContext.java b/common/src/main/java/org/apache/sqoop/common/MapContext.java
index fc722c0..3167530 100644
--- a/common/src/main/java/org/apache/sqoop/common/MapContext.java
+++ b/common/src/main/java/org/apache/sqoop/common/MapContext.java
@@ -36,7 +36,7 @@ public class MapContext implements ImmutableContext {
   private final Map<String, String> options;
 
   public MapContext(Map<String, String> options) {
-    this.options = options;
+    this.options = options == null ? new HashMap<String, String>() : options;
   }
 
   protected Map<String, String> getOptions() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/common/src/test/java/org/apache/sqoop/common/TestMapContext.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/common/TestMapContext.java b/common/src/test/java/org/apache/sqoop/common/TestMapContext.java
index 2a27c0c..22f42a4 100644
--- a/common/src/test/java/org/apache/sqoop/common/TestMapContext.java
+++ b/common/src/test/java/org/apache/sqoop/common/TestMapContext.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static junit.framework.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -40,6 +41,9 @@ public class TestMapContext {
     options.put("testkey", "testvalue");
     MapContext mc = new MapContext(options);
     Assert.assertEquals("testvalue", mc.getString("testkey"));
+
+    MapContext nullMc = new MapContext(null);
+    assertNull(nullMc.getString("random.key.property"));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java b/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java
index db7fa34..3cbe7be 100644
--- a/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java
+++ b/common/src/test/java/org/apache/sqoop/common/TestMutableMapContext.java
@@ -50,6 +50,10 @@ public class TestMutableMapContext {
     assertEquals(context.getLong("long", -1), 1L);
     assertEquals(context.getInt("integer", -1), 13);
     assertEquals(context.getBoolean("boolean", false), true);
+
+    context = new MutableMapContext(null);
+    context.setString("key", "value");
+    assertEquals(context.getString("key"), "value");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/connector/connector-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml
index 512b54c..a28989c 100644
--- a/connector/connector-hdfs/pom.xml
+++ b/connector/connector-hdfs/pom.xml
@@ -59,6 +59,13 @@ limitations under the License.
       <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
       <scope>provided</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
@@ -77,4 +84,4 @@ limitations under the License.
       </plugin>
     </plugins>
   </build>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
index 6c943a8..e98e02b 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
@@ -51,8 +52,8 @@ public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobC
     assert jobConfig.incremental != null;
 
     Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
+    HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration);
     HdfsUtils.configurationToContext(configuration, context.getContext());
-    context.getContext().setAll(linkConfig.linkConfig.configOverrides);
 
     boolean incremental = jobConfig.incremental.incrementalType != null && jobConfig.incremental.incrementalType == IncrementalType.NEW_FILES;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
index 5bb0928..29cf3b9 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
@@ -47,8 +48,8 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi
     assert jobConfig.toJobConfig.outputDirectory != null;
 
     Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
+    HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration);
     HdfsUtils.configurationToContext(configuration, context.getContext());
-    context.getContext().setAll(linkConfig.linkConfig.configOverrides);
 
     boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6e334a7..ef3f5f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -426,6 +426,12 @@ limitations under the License.
         <version>${hadoop.2.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-aws</artifactId>
+        <version>${hadoop.2.version}</version>
+        <scope>provided</scope>
+      </dependency>
+      <dependency>
         <groupId>org.apache.hive</groupId>
         <artifactId>hive-jdbc</artifactId>
         <version>${hive.version}</version>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
index 3e11f59..8218477 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -113,6 +113,12 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-jdbc</artifactId>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java b/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java
index 83f42b6..a0ef78a 100644
--- a/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java
+++ b/test/src/main/java/org/apache/sqoop/test/minicluster/TomcatSqoopMiniCluster.java
@@ -131,7 +131,9 @@ public class TomcatSqoopMiniCluster extends SqoopMiniCluster {
          jar.contains("sqljdbc")      || // Microsoft SQL Server driver
          jar.contains("libfb303")     || // Facebook thrift lib
          jar.contains("datanucleus-") || // Data nucleus libs
-         jar.contains("google")          // Google libraries (guava, ...)
+         jar.contains("google")       || // Google libraries (guava, ...)
+         jar.contains("joda-time")    || // Joda time
+         jar.contains("aws-java-sdk")    // Amazon AWS SDK (S3, ...)
        ) {
         extraClassPath.add(jar);
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf09850c/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
new file mode 100644
index 0000000..73a9acf
--- /dev/null
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.test.asserts.HdfsAsserts;
+import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.testng.SkipException;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Run with something like:
+ *
+ * mvn clean test -pl test -Dtest=S3Test
+ *   -Dorg.apache.sqoop.integration.connector.hdfs.s3.bucket=test-bucket
+ *   -Dorg.apache.sqoop.integration.connector.hdfs.s3.access=AKI...
+ *   -Dorg.apache.sqoop.integration.connector.hdfs.s3.secret=93JKx...
+ */
+public class S3Test extends ConnectorTestCase {
+
+  public static final String PROPERTY_BUCKET = "org.apache.sqoop.integration.connector.hdfs.s3.bucket";
+  public static final String PROPERTY_ACCESS = "org.apache.sqoop.integration.connector.hdfs.s3.access";
+  public static final String PROPERTY_SECRET = "org.apache.sqoop.integration.connector.hdfs.s3.secret";
+
+  public static final String BUCKET = System.getProperty(PROPERTY_BUCKET);
+  public static final String ACCESS = System.getProperty(PROPERTY_ACCESS);
+  public static final String SECRET = System.getProperty(PROPERTY_SECRET);
+
+  public static final String BUCKET_URL = "s3a://" + BUCKET;
+
+  @Test
+  public void test() throws Exception {
+    // Verify presence external configuration
+    assumeNotNull(BUCKET, PROPERTY_BUCKET);
+    assumeNotNull(ACCESS, PROPERTY_ACCESS);
+    assumeNotNull(SECRET, PROPERTY_SECRET);
+
+    createAndLoadTableCities();
+
+    Configuration hadoopConf = new Configuration();
+    hadoopConf.set("fs.defaultFS", BUCKET_URL);
+    hadoopConf.set("fs.s3a.access.key", ACCESS);
+    hadoopConf.set("fs.s3a.secret.key", SECRET);
+    FileSystem s3Client = FileSystem.get(hadoopConf);
+
+    // RDBMS link
+    MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsLink);
+    saveLink(rdbmsLink);
+
+    // HDFS link
+    MLink hdfsLink = getClient().createLink("hdfs-connector");
+    hdfsLink.getConnectorLinkConfig().getStringInput("linkConfig.uri").setValue(BUCKET_URL);
+    Map<String, String> configOverrides = new HashMap<>();
+    configOverrides.put("fs.s3a.access.key", ACCESS);
+    configOverrides.put("fs.s3a.secret.key", SECRET);
+    hdfsLink.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(configOverrides);
+    saveLink(hdfsLink);
+
+    s3Client.delete(new Path(getMapreduceDirectory()), true);
+
+    // DB -> S3
+    MJob db2aws = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
+    fillRdbmsFromConfig(db2aws, "id");
+    fillHdfsToConfig(db2aws, ToFormat.TEXT_FILE);
+
+    saveJob(db2aws);
+    executeJob(db2aws);
+
+    // Verifying locally imported data
+    HdfsAsserts.assertMapreduceOutput(s3Client, getMapreduceDirectory(),
+      "1,'USA','2004-10-23','San Francisco'",
+      "2,'USA','2004-10-24','Sunnyvale'",
+      "3,'Czech Republic','2004-10-25','Brno'",
+      "4,'USA','2004-10-26','Palo Alto'"
+    );
+
+    // This re-creates the table completely
+    createTableCities();
+    assertEquals(provider.rowCount(getTableName()), 0);
+
+    // S3 -> DB
+    MJob aws2db = getClient().createJob(hdfsLink.getPersistenceId(), rdbmsLink.getPersistenceId());
+    fillHdfsFromConfig(aws2db);
+    fillRdbmsToConfig(aws2db);
+
+    saveJob(aws2db);
+    executeJob(aws2db);
+
+    // Final verification
+    assertEquals(4L, provider.rowCount(getTableName()));
+    assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
+    assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
+    assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
+    assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
+  }
+
+  /**
+   * Skip this test if given value is null
+   */
+  void assumeNotNull(String value, String key) {
+    if(value == null) {
+      throw new SkipException("Missing value for " + key);
+    }
+  }
+
+}