You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by hs...@apache.org on 2015/11/12 20:46:05 UTC

sqoop git commit: SQOOP-2652. Sqoop2: Add test case for S3 incremental import to HDFS

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 8ea1c8580 -> ede322199


SQOOP-2652. Sqoop2: Add test case for S3 incremental import to HDFS

(Jarcec via Hari)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ede32219
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ede32219
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ede32219

Branch: refs/heads/sqoop2
Commit: ede322199d5ef8f18f417d52a633ce744fb2363b
Parents: 8ea1c85
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Nov 12 11:45:52 2015 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Nov 12 11:45:52 2015 -0800

----------------------------------------------------------------------
 .../sqoop/test/testcases/JettyTestCase.java     |  8 ++
 .../integration/connector/hdfs/S3Test.java      | 79 +++++++++++++++++---
 2 files changed, 76 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/ede32219/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java
index 2349f1c..bd4ba6a 100644
--- a/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java
+++ b/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.log4j.Logger;
@@ -237,6 +238,13 @@ abstract public class JettyTestCase implements ITest {
    * @throws IOException
    */
   protected void createFromFile(String filename, String...lines) throws IOException {
+    createFromFile(hdfsClient, filename, lines);
+  }
+
+  /**
+   * Create file on given HDFS instance with given lines
+   */
+  protected void createFromFile(FileSystem hdfsClient, String filename, String...lines) throws IOException {
     HdfsUtils.createFile(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), filename), lines);
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ede32219/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
index 73a9acf..7fec310 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.integration.connector.hdfs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
@@ -53,20 +54,26 @@ public class S3Test extends ConnectorTestCase {
 
   public static final String BUCKET_URL = "s3a://" + BUCKET;
 
-  @Test
-  public void test() throws Exception {
-    // Verify presence external configuration
+  // S3 client (HDFS interface) to be used in the tests
+  private FileSystem s3Client;
+
+  public void setUpS3Client() throws Exception {
     assumeNotNull(BUCKET, PROPERTY_BUCKET);
     assumeNotNull(ACCESS, PROPERTY_ACCESS);
     assumeNotNull(SECRET, PROPERTY_SECRET);
 
-    createAndLoadTableCities();
-
     Configuration hadoopConf = new Configuration();
     hadoopConf.set("fs.defaultFS", BUCKET_URL);
     hadoopConf.set("fs.s3a.access.key", ACCESS);
     hadoopConf.set("fs.s3a.secret.key", SECRET);
-    FileSystem s3Client = FileSystem.get(hadoopConf);
+    s3Client = FileSystem.get(hadoopConf);
+  }
+
+  @Test
+  public void testImportExport() throws Exception {
+    setUpS3Client();
+    s3Client.delete(new Path(getMapreduceDirectory()), true);
+    createAndLoadTableCities();
 
     // RDBMS link
     MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
@@ -82,8 +89,6 @@ public class S3Test extends ConnectorTestCase {
     hdfsLink.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(configOverrides);
     saveLink(hdfsLink);
 
-    s3Client.delete(new Path(getMapreduceDirectory()), true);
-
     // DB -> S3
     MJob db2aws = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
     fillRdbmsFromConfig(db2aws, "id");
@@ -120,9 +125,61 @@ public class S3Test extends ConnectorTestCase {
     assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
   }
 
-  /**
-   * Skip this test if given value is null
-   */
+  @Test
+  public void testIncrementalRead() throws Exception {
+    setUpS3Client();
+    s3Client.delete(new Path(getMapreduceDirectory()), true);
+
+    // S3 link
+    MLink s3Link = getClient().createLink("hdfs-connector");
+    s3Link.getConnectorLinkConfig().getStringInput("linkConfig.uri").setValue(BUCKET_URL);
+    Map<String, String> configOverrides = new HashMap<>();
+    configOverrides.put("fs.s3a.access.key", ACCESS);
+    configOverrides.put("fs.s3a.secret.key", SECRET);
+    s3Link.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(configOverrides);
+    saveLink(s3Link);
+
+    // HDFS link
+    MLink hdfsLink = getClient().createLink("hdfs-connector");
+    fillHdfsLink(hdfsLink);
+    saveLink(hdfsLink);
+
+    // S3 -> HDFS
+    MJob aws2hdfs = getClient().createJob(s3Link.getPersistenceId(), hdfsLink.getPersistenceId());
+    fillHdfsFromConfig(aws2hdfs);
+    aws2hdfs.getFromJobConfig().getEnumInput("incremental.incrementalType").setValue(IncrementalType.NEW_FILES);
+
+    fillHdfsToConfig(aws2hdfs, ToFormat.TEXT_FILE);
+    aws2hdfs.getToJobConfig().getBooleanInput("toJobConfig.appendMode").setValue(true);
+    saveJob(aws2hdfs);
+
+    // First import (first file)
+    createFromFile(s3Client, "input-0001",
+      "1,'USA','2004-10-23','San Francisco'",
+      "2,'USA','2004-10-24','Sunnyvale'"
+    );
+    executeJob(aws2hdfs);
+
+    HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(),
+      "1,'USA','2004-10-23','San Francisco'",
+      "2,'USA','2004-10-24','Sunnyvale'"
+    );
+
+    // Second import (second file)
+    createFromFile(s3Client, "input-0002",
+      "3,'Czech Republic','2004-10-25','Brno'",
+      "4,'USA','2004-10-26','Palo Alto'"
+    );
+    executeJob(aws2hdfs);
+
+    HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(),
+      "1,'USA','2004-10-23','San Francisco'",
+      "2,'USA','2004-10-24','Sunnyvale'",
+      "3,'Czech Republic','2004-10-25','Brno'",
+      "4,'USA','2004-10-26','Palo Alto'"
+    );
+  }
+
   void assumeNotNull(String value, String key) {
     if(value == null) {
       throw new SkipException("Missing value for " + key);