You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by hs...@apache.org on 2015/11/12 20:46:05 UTC
sqoop git commit: SQOOP-2652. Sqoop2: Add test case for S3
incremental import to HDFS
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 8ea1c8580 -> ede322199
SQOOP-2652. Sqoop2: Add test case for S3 incremental import to HDFS
(Jarcec via Hari)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ede32219
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ede32219
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ede32219
Branch: refs/heads/sqoop2
Commit: ede322199d5ef8f18f417d52a633ce744fb2363b
Parents: 8ea1c85
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Nov 12 11:45:52 2015 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Nov 12 11:45:52 2015 -0800
----------------------------------------------------------------------
.../sqoop/test/testcases/JettyTestCase.java | 8 ++
.../integration/connector/hdfs/S3Test.java | 79 +++++++++++++++++---
2 files changed, 76 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ede32219/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java
index 2349f1c..bd4ba6a 100644
--- a/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java
+++ b/test/src/main/java/org/apache/sqoop/test/testcases/JettyTestCase.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.lang.reflect.Method;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
@@ -237,6 +238,13 @@ abstract public class JettyTestCase implements ITest {
* @throws IOException
*/
protected void createFromFile(String filename, String...lines) throws IOException {
+ createFromFile(hdfsClient, filename, lines);
+ }
+
+ /**
+ * Create file on given HDFS instance with given lines
+ */
+ protected void createFromFile(FileSystem hdfsClient, String filename, String...lines) throws IOException {
HdfsUtils.createFile(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), filename), lines);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ede32219/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
index 73a9acf..7fec310 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/S3Test.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.integration.connector.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
@@ -53,20 +54,26 @@ public class S3Test extends ConnectorTestCase {
public static final String BUCKET_URL = "s3a://" + BUCKET;
- @Test
- public void test() throws Exception {
- // Verify presence external configuration
+ // S3 client (HDFS interface) to be used in the tests
+ private FileSystem s3Client;
+
+ public void setUpS3Client() throws Exception {
assumeNotNull(BUCKET, PROPERTY_BUCKET);
assumeNotNull(ACCESS, PROPERTY_ACCESS);
assumeNotNull(SECRET, PROPERTY_SECRET);
- createAndLoadTableCities();
-
Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.defaultFS", BUCKET_URL);
hadoopConf.set("fs.s3a.access.key", ACCESS);
hadoopConf.set("fs.s3a.secret.key", SECRET);
- FileSystem s3Client = FileSystem.get(hadoopConf);
+ s3Client = FileSystem.get(hadoopConf);
+ }
+
+ @Test
+ public void testImportExport() throws Exception {
+ setUpS3Client();
+ s3Client.delete(new Path(getMapreduceDirectory()), true);
+ createAndLoadTableCities();
// RDBMS link
MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
@@ -82,8 +89,6 @@ public class S3Test extends ConnectorTestCase {
hdfsLink.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(configOverrides);
saveLink(hdfsLink);
- s3Client.delete(new Path(getMapreduceDirectory()), true);
-
// DB -> S3
MJob db2aws = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
fillRdbmsFromConfig(db2aws, "id");
@@ -120,9 +125,61 @@ public class S3Test extends ConnectorTestCase {
assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
}
- /**
- * Skip this test if given value is null
- */
+ @Test
+ public void testIncrementalRead() throws Exception {
+ setUpS3Client();
+ s3Client.delete(new Path(getMapreduceDirectory()), true);
+
+ // S3 link
+ MLink s3Link = getClient().createLink("hdfs-connector");
+ s3Link.getConnectorLinkConfig().getStringInput("linkConfig.uri").setValue(BUCKET_URL);
+ Map<String, String> configOverrides = new HashMap<>();
+ configOverrides.put("fs.s3a.access.key", ACCESS);
+ configOverrides.put("fs.s3a.secret.key", SECRET);
+ s3Link.getConnectorLinkConfig().getMapInput("linkConfig.configOverrides").setValue(configOverrides);
+ saveLink(s3Link);
+
+ // HDFS link
+ MLink hdfsLink = getClient().createLink("hdfs-connector");
+ fillHdfsLink(hdfsLink);
+ saveLink(hdfsLink);
+
+ // S3 -> HDFS
+ MJob aws2hdfs = getClient().createJob(s3Link.getPersistenceId(), hdfsLink.getPersistenceId());
+ fillHdfsFromConfig(aws2hdfs);
+ aws2hdfs.getFromJobConfig().getEnumInput("incremental.incrementalType").setValue(IncrementalType.NEW_FILES);
+
+ fillHdfsToConfig(aws2hdfs, ToFormat.TEXT_FILE);
+ aws2hdfs.getToJobConfig().getBooleanInput("toJobConfig.appendMode").setValue(true);
+ saveJob(aws2hdfs);
+
+ // First import (first file)
+ createFromFile(s3Client, "input-0001",
+ "1,'USA','2004-10-23','San Francisco'",
+ "2,'USA','2004-10-24','Sunnyvale'"
+ );
+ executeJob(aws2hdfs);
+
+ HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(),
+ "1,'USA','2004-10-23','San Francisco'",
+ "2,'USA','2004-10-24','Sunnyvale'"
+ );
+
+ // Second import (second file)
+ createFromFile(s3Client, "input-0002",
+ "3,'Czech Republic','2004-10-25','Brno'",
+ "4,'USA','2004-10-26','Palo Alto'"
+ );
+ executeJob(aws2hdfs);
+
+ HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(),
+ "1,'USA','2004-10-23','San Francisco'",
+ "2,'USA','2004-10-24','Sunnyvale'",
+ "3,'Czech Republic','2004-10-25','Brno'",
+ "4,'USA','2004-10-26','Palo Alto'"
+ );
+ }
+
void assumeNotNull(String value, String key) {
if(value == null) {
throw new SkipException("Missing value for " + key);