You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by af...@apache.org on 2016/12/06 19:31:32 UTC

sqoop git commit: SQOOP-2941: Sqoop2: Provide option to re-create target directory if it exists in HDFS connector

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 d9412c2b3 -> 42e7443d5


SQOOP-2941: Sqoop2: Provide option to re-create target directory if it exists in HDFS connector

(Boglarka Egyed via Abraham Fine)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/42e7443d
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/42e7443d
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/42e7443d

Branch: refs/heads/sqoop2
Commit: 42e7443d56f9527bb2c34fd6a20ce7abcae0a18c
Parents: d9412c2
Author: Abraham Fine <af...@apache.org>
Authored: Tue Dec 6 11:30:09 2016 -0800
Committer: Abraham Fine <af...@apache.org>
Committed: Tue Dec 6 11:30:09 2016 -0800

----------------------------------------------------------------------
 .../sqoop/connector/hdfs/HdfsToInitializer.java |  8 +-
 .../hdfs/configuration/ToJobConfig.java         |  2 +
 .../resources/hdfs-connector-config.properties  |  5 ++
 .../sqoop/connector/hdfs/TestToInitializer.java | 85 ++++++++++++------
 .../connector/hdfs/OutputDirectoryTest.java     | 94 ++++++++++----------
 5 files changed, 120 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/42e7443d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
index 70e0fde..256faf7 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -54,6 +54,8 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi
     HdfsUtils.configurationToContext(configuration, context.getContext());
 
     final boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode);
+    final boolean deleteOutputDirectory = Boolean.TRUE.equals(jobConfig.toJobConfig.deleteOutputDirectory);
+
 
     // Verification that given HDFS directory either don't exists or is empty
     try {
@@ -70,7 +72,11 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi
             if (fs.isDirectory(path) && !appendMode) {
               FileStatus[] fileStatuses = fs.listStatus(path);
               if (fileStatuses.length != 0) {
-                throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory is not empty");
+                if (deleteOutputDirectory) {
+                  fs.delete(path, true);
+                } else {
+                  throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory is not empty");
+                }
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/42e7443d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
index d76ba5f..dc4b285 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
@@ -43,6 +43,8 @@ public class ToJobConfig {
 
   @Input public Boolean appendMode;
 
+  @Input public Boolean deleteOutputDirectory;
+
   public static class ToJobConfigValidator extends AbstractValidator<ToJobConfig> {
     @Override
     public void validate(ToJobConfig conf) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/42e7443d/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
index 29efced..3b24a65 100644
--- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
+++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
@@ -59,6 +59,11 @@ toJobConfig.appendMode.example = true
 toJobConfig.appendMode.help = If set to false, job will fail if output directory already exists. If set to true \
   then imported data will be stored to already existing and possibly non empty directory.
 
+toJobConfig.deleteOutputDirectory.label = Delete output directory
+toJobConfig.deleteOutputDirectory.example = true
+toJobConfig.deleteOutputDirectory.help = If set to false, job will fail if output directory already exists. If set to true \
+  then existing output directory will be deleted before job execution.
+
 toJobConfig.overrideNullValue.label = Override null value
 toJobConfig.overrideNullValue.example = true
 toJobConfig.overrideNullValue.help = If set to true, then the null value will be overridden with the value set in \

http://git-wip-us.apache.org/repos/asf/sqoop/blob/42e7443d/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
index 5441702..673f447 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
@@ -24,6 +24,7 @@ import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.io.File;
@@ -36,19 +37,28 @@ import static org.testng.Assert.assertTrue;
  */
 public class TestToInitializer extends TestHdfsBase {
 
+  private LinkConfiguration linkConfig;
+  private ToJobConfiguration jobConfig;
+  private InitializerContext initializerContext;
+  private Initializer initializer;
+
+  @BeforeMethod
+  public void setup() {
+    linkConfig = new LinkConfiguration();
+    jobConfig = new ToJobConfiguration();
+
+    linkConfig.linkConfig.uri = "file:///";
+
+    initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
+    initializer= new HdfsToInitializer();
+  }
+
   @Test
   public void testWorkDirectoryBeingSet() {
     final String TARGET_DIR = "/target/directory";
 
-    LinkConfiguration linkConfig = new LinkConfiguration();
-    ToJobConfiguration jobConfig = new ToJobConfiguration();
-
-    linkConfig.linkConfig.uri = "file:///";
     jobConfig.toJobConfig.outputDirectory = TARGET_DIR;
 
-    InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
-
-    Initializer initializer = new HdfsToInitializer();
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));
@@ -60,50 +70,69 @@ public class TestToInitializer extends TestHdfsBase {
     File file = File.createTempFile("MastersOfOrion", ".txt");
     file.createNewFile();
 
-    LinkConfiguration linkConfig = new LinkConfiguration();
-    ToJobConfiguration jobConfig = new ToJobConfiguration();
-
-    linkConfig.linkConfig.uri = "file:///";
     jobConfig.toJobConfig.outputDirectory = file.getAbsolutePath();
 
-    InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
-
-    Initializer initializer = new HdfsToInitializer();
     initializer.initialize(initializerContext, linkConfig, jobConfig);
   }
 
   @Test(expectedExceptions = SqoopException.class)
-  public void testOutputDirectoryIsNotEmpty() throws Exception {
+  public void testOutputDirectoryIsNotEmptyWithoutDeleteOption() throws Exception {
     File dir = Files.createTempDir();
     File file = File.createTempFile("MastersOfOrion", ".txt", dir);
 
-    LinkConfiguration linkConfig = new LinkConfiguration();
-    ToJobConfiguration jobConfig = new ToJobConfiguration();
-
-    linkConfig.linkConfig.uri = "file:///";
     jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
 
-    InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+  }
+
+  @Test
+  public void testOutputDirectoryIsNotEmptyWithDeleteOption() throws Exception {
+    File dir = Files.createTempDir();
+    File.createTempFile("MastersOfOrion", ".txt", dir);
+
+    jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
+    jobConfig.toJobConfig.deleteOutputDirectory = true;
 
-    Initializer initializer = new HdfsToInitializer();
     initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+    assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));
+    assertTrue(initializerContext.getString(HdfsConstants.WORK_DIRECTORY).startsWith(dir.getAbsolutePath() + "/."));
   }
 
   @Test
   public void testOutputDirectoryIsNotEmptyWithIncremental() throws Exception {
     File dir = Files.createTempDir();
-    File file = File.createTempFile("MastersOfOrion", ".txt", dir);
-
-    LinkConfiguration linkConfig = new LinkConfiguration();
-    ToJobConfiguration jobConfig = new ToJobConfiguration();
+    File.createTempFile("MastersOfOrion", ".txt", dir);
 
-    linkConfig.linkConfig.uri = "file:///";
     jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
     jobConfig.toJobConfig.appendMode = true;
 
-    InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+    assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));
+    assertTrue(initializerContext.getString(HdfsConstants.WORK_DIRECTORY).startsWith(dir.getAbsolutePath()));
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testOutputDirectoryIsNotEmptyWithoutIncrementalWithoutDeleteOption() throws Exception {
+    File dir = Files.createTempDir();
+    File.createTempFile("MastersOfOrion", ".txt", dir);
+
+    jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
+    jobConfig.toJobConfig.appendMode = false;
+
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+  }
+
+  @Test
+  public void testOutputDirectoryIsNotEmptyWithoutIncrementalWithDeleteOption() throws Exception {
+    File dir = Files.createTempDir();
+    File.createTempFile("MastersOfOrion", ".txt", dir);
+
+    jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
+    jobConfig.toJobConfig.appendMode = false;
+    jobConfig.toJobConfig.deleteOutputDirectory = true;
 
-    Initializer initializer = new HdfsToInitializer();
     initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/42e7443d/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
index 722c126..90b0d15 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
@@ -30,6 +30,9 @@ import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProv
 import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
 import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
 import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
+import org.apache.sqoop.test.utils.HdfsUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -39,12 +42,12 @@ import static org.testng.Assert.fail;
 
 @Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
 public class OutputDirectoryTest extends SqoopTestCase {
-  @Test
-  public void testOutputDirectoryIsAFile() throws Exception {
-    createAndLoadTableCities();
 
-    hdfsClient.delete(new Path(getMapreduceDirectory()), true);
-    hdfsClient.createNewFile(new Path(getMapreduceDirectory()));
+  private MJob job;
+
+  @BeforeMethod
+  public void setup() {
+    createAndLoadTableCities();
 
     // RDBMS link
     MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
@@ -57,10 +60,22 @@ public class OutputDirectoryTest extends SqoopTestCase {
     saveLink(hdfsConnection);
 
     // Job creation
-    MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
+    job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
 
     // Set rdbms "FROM" config
     fillRdbmsFromConfig(job, "id");
+  }
+
+  @AfterMethod
+  public void cleanUp() {
+    dropTable();
+  }
+
+  @Test
+  public void testOutputDirectoryIsAFile() throws Exception {
+
+    hdfsClient.delete(new Path(getMapreduceDirectory()), true);
+    hdfsClient.createNewFile(new Path(getMapreduceDirectory()));
 
     // fill the hdfs "TO" config
     fillHdfsToConfig(job, ToFormat.TEXT_FILE);
@@ -71,33 +86,14 @@ public class OutputDirectoryTest extends SqoopTestCase {
       HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007.toString(),
       "is a file"
     );
-
-    dropTable();
   }
 
   @Test
-  public void testOutputDirectoryIsNotEmpty() throws Exception {
-    createAndLoadTableCities();
+  public void testOutputDirectoryIsNotEmptyWithoutDeleteOption() throws Exception {
 
     hdfsClient.mkdirs(new Path(getMapreduceDirectory()));
     hdfsClient.createNewFile(new Path(getMapreduceDirectory() + "/x"));
 
-    // RDBMS link
-    MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
-    fillRdbmsLinkConfig(rdbmsConnection);
-    saveLink(rdbmsConnection);
-
-    // HDFS link
-    MLink hdfsConnection = getClient().createLink("hdfs-connector");
-    fillHdfsLink(hdfsConnection);
-    saveLink(hdfsConnection);
-
-    // Job creation
-    MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
-
-    // Set rdbms "FROM" config
-    fillRdbmsFromConfig(job, "id");
-
     // fill the hdfs "TO" config
     fillHdfsToConfig(job, ToFormat.TEXT_FILE);
 
@@ -107,31 +103,41 @@ public class OutputDirectoryTest extends SqoopTestCase {
       HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007.toString(),
       "is not empty"
     );
-
-    dropTable();
   }
 
   @Test
-  public void testOutputDirectoryIsEmpty() throws Exception {
-    createAndLoadTableCities();
+  public void testOutputDirectoryIsNotEmptyWithDeleteOption() throws Exception {
 
     hdfsClient.mkdirs(new Path(getMapreduceDirectory()));
+    hdfsClient.createNewFile(new Path(getMapreduceDirectory() + "/x"));
 
-    // RDBMS link
-    MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
-    fillRdbmsLinkConfig(rdbmsConnection);
-    saveLink(rdbmsConnection);
+    // fill the hdfs "TO" config
+    fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+    job.getToJobConfig().getStringInput("toJobConfig.outputDirectory")
+        .setValue(getMapreduceDirectory());
 
-    // HDFS link
-    MLink hdfsConnection = getClient().createLink("hdfs-connector");
-    fillHdfsLink(hdfsConnection);
-    saveLink(hdfsConnection);
+    fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+    job.getToJobConfig().getBooleanInput("toJobConfig.deleteOutputDirectory").setValue(true);
 
-    // Job creation
-    MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
 
-    // Set rdbms "FROM" config
-    fillRdbmsFromConfig(job, "id");
+    saveJob(job);
+
+    executeJob(job);
+
+    // Assert correct output
+    assertTo(
+        "1,'USA','2004-10-23 00:00:00.000','San Francisco'",
+        "2,'USA','2004-10-24 00:00:00.000','Sunnyvale'",
+        "3,'Czech Republic','2004-10-25 00:00:00.000','Brno'",
+        "4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
+        "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
+    );
+  }
+
+  @Test
+  public void testOutputDirectoryIsEmpty() throws Exception {
+
+    hdfsClient.mkdirs(new Path(getMapreduceDirectory()));
 
     // fill the hdfs "TO" config
     fillHdfsToConfig(job, ToFormat.TEXT_FILE);
@@ -148,12 +154,10 @@ public class OutputDirectoryTest extends SqoopTestCase {
       "4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
       "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
     );
-
-    dropTable();
   }
 
   public void assertJobSubmissionFailure(MJob job, String ...fragments) throws Exception {
-    // Try to execute the job and verify that the it was not successful
+    // Try to execute the job and verify that it was not successful
     try {
       executeJob(job);
       fail("Expected failure in the job submission.");