You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by af...@apache.org on 2016/12/06 19:31:32 UTC
sqoop git commit: SQOOP-2941: Sqoop2: Provide option to re-create
target directory if it exists in HDFS connector
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 d9412c2b3 -> 42e7443d5
SQOOP-2941: Sqoop2: Provide option to re-create target directory if it exists in HDFS connector
(Boglarka Egyed via Abraham Fine)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/42e7443d
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/42e7443d
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/42e7443d
Branch: refs/heads/sqoop2
Commit: 42e7443d56f9527bb2c34fd6a20ce7abcae0a18c
Parents: d9412c2
Author: Abraham Fine <af...@apache.org>
Authored: Tue Dec 6 11:30:09 2016 -0800
Committer: Abraham Fine <af...@apache.org>
Committed: Tue Dec 6 11:30:09 2016 -0800
----------------------------------------------------------------------
.../sqoop/connector/hdfs/HdfsToInitializer.java | 8 +-
.../hdfs/configuration/ToJobConfig.java | 2 +
.../resources/hdfs-connector-config.properties | 5 ++
.../sqoop/connector/hdfs/TestToInitializer.java | 85 ++++++++++++------
.../connector/hdfs/OutputDirectoryTest.java | 94 ++++++++++----------
5 files changed, 120 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/42e7443d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
index 70e0fde..256faf7 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -54,6 +54,8 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi
HdfsUtils.configurationToContext(configuration, context.getContext());
final boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode);
+ final boolean deleteOutputDirectory = Boolean.TRUE.equals(jobConfig.toJobConfig.deleteOutputDirectory);
+
// Verification that given HDFS directory either don't exists or is empty
try {
@@ -70,7 +72,11 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi
if (fs.isDirectory(path) && !appendMode) {
FileStatus[] fileStatuses = fs.listStatus(path);
if (fileStatuses.length != 0) {
- throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory is not empty");
+ if (deleteOutputDirectory) {
+ fs.delete(path, true);
+ } else {
+ throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory is not empty");
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/42e7443d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
index d76ba5f..dc4b285 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
@@ -43,6 +43,8 @@ public class ToJobConfig {
@Input public Boolean appendMode;
+ @Input public Boolean deleteOutputDirectory;
+
public static class ToJobConfigValidator extends AbstractValidator<ToJobConfig> {
@Override
public void validate(ToJobConfig conf) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/42e7443d/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
index 29efced..3b24a65 100644
--- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
+++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
@@ -59,6 +59,11 @@ toJobConfig.appendMode.example = true
toJobConfig.appendMode.help = If set to false, job will fail if output directory already exists. If set to true \
then imported data will be stored to already existing and possibly non empty directory.
+toJobConfig.deleteOutputDirectory.label = Delete output directory
+toJobConfig.deleteOutputDirectory.example = true
+toJobConfig.deleteOutputDirectory.help = If set to false, job will fail if output directory already exists. If set to true \
+ then existing output directory will be deleted before job execution.
+
toJobConfig.overrideNullValue.label = Override null value
toJobConfig.overrideNullValue.example = true
toJobConfig.overrideNullValue.help = If set to true, then the null value will be overridden with the value set in \
http://git-wip-us.apache.org/repos/asf/sqoop/blob/42e7443d/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
index 5441702..673f447 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
@@ -24,6 +24,7 @@ import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.File;
@@ -36,19 +37,28 @@ import static org.testng.Assert.assertTrue;
*/
public class TestToInitializer extends TestHdfsBase {
+ private LinkConfiguration linkConfig;
+ private ToJobConfiguration jobConfig;
+ private InitializerContext initializerContext;
+ private Initializer initializer;
+
+ @BeforeMethod
+ public void setup() {
+ linkConfig = new LinkConfiguration();
+ jobConfig = new ToJobConfiguration();
+
+ linkConfig.linkConfig.uri = "file:///";
+
+ initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
+ initializer= new HdfsToInitializer();
+ }
+
@Test
public void testWorkDirectoryBeingSet() {
final String TARGET_DIR = "/target/directory";
- LinkConfiguration linkConfig = new LinkConfiguration();
- ToJobConfiguration jobConfig = new ToJobConfiguration();
-
- linkConfig.linkConfig.uri = "file:///";
jobConfig.toJobConfig.outputDirectory = TARGET_DIR;
- InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
-
- Initializer initializer = new HdfsToInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));
@@ -60,50 +70,69 @@ public class TestToInitializer extends TestHdfsBase {
File file = File.createTempFile("MastersOfOrion", ".txt");
file.createNewFile();
- LinkConfiguration linkConfig = new LinkConfiguration();
- ToJobConfiguration jobConfig = new ToJobConfiguration();
-
- linkConfig.linkConfig.uri = "file:///";
jobConfig.toJobConfig.outputDirectory = file.getAbsolutePath();
- InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
-
- Initializer initializer = new HdfsToInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
}
@Test(expectedExceptions = SqoopException.class)
- public void testOutputDirectoryIsNotEmpty() throws Exception {
+ public void testOutputDirectoryIsNotEmptyWithoutDeleteOption() throws Exception {
File dir = Files.createTempDir();
File file = File.createTempFile("MastersOfOrion", ".txt", dir);
- LinkConfiguration linkConfig = new LinkConfiguration();
- ToJobConfiguration jobConfig = new ToJobConfiguration();
-
- linkConfig.linkConfig.uri = "file:///";
jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
- InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
+ initializer.initialize(initializerContext, linkConfig, jobConfig);
+ }
+
+ @Test
+ public void testOutputDirectoryIsNotEmptyWithDeleteOption() throws Exception {
+ File dir = Files.createTempDir();
+ File.createTempFile("MastersOfOrion", ".txt", dir);
+
+ jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
+ jobConfig.toJobConfig.deleteOutputDirectory = true;
- Initializer initializer = new HdfsToInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+ assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));
+ assertTrue(initializerContext.getString(HdfsConstants.WORK_DIRECTORY).startsWith(dir.getAbsolutePath() + "/."));
}
@Test
public void testOutputDirectoryIsNotEmptyWithIncremental() throws Exception {
File dir = Files.createTempDir();
- File file = File.createTempFile("MastersOfOrion", ".txt", dir);
-
- LinkConfiguration linkConfig = new LinkConfiguration();
- ToJobConfiguration jobConfig = new ToJobConfiguration();
+ File.createTempFile("MastersOfOrion", ".txt", dir);
- linkConfig.linkConfig.uri = "file:///";
jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
jobConfig.toJobConfig.appendMode = true;
- InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
+ initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+ assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));
+ assertTrue(initializerContext.getString(HdfsConstants.WORK_DIRECTORY).startsWith(dir.getAbsolutePath()));
+ }
+
+ @Test(expectedExceptions = SqoopException.class)
+ public void testOutputDirectoryIsNotEmptyWithoutIncrementalWithoutDeleteOption() throws Exception {
+ File dir = Files.createTempDir();
+ File.createTempFile("MastersOfOrion", ".txt", dir);
+
+ jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
+ jobConfig.toJobConfig.appendMode = false;
+
+ initializer.initialize(initializerContext, linkConfig, jobConfig);
+ }
+
+ @Test
+ public void testOutputDirectoryIsNotEmptyWithoutIncrementalWithDeleteOption() throws Exception {
+ File dir = Files.createTempDir();
+ File.createTempFile("MastersOfOrion", ".txt", dir);
+
+ jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
+ jobConfig.toJobConfig.appendMode = false;
+ jobConfig.toJobConfig.deleteOutputDirectory = true;
- Initializer initializer = new HdfsToInitializer();
initializer.initialize(initializerContext, linkConfig, jobConfig);
assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));
http://git-wip-us.apache.org/repos/asf/sqoop/blob/42e7443d/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
index 722c126..90b0d15 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
@@ -30,6 +30,9 @@ import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProv
import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
+import org.apache.sqoop.test.utils.HdfsUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -39,12 +42,12 @@ import static org.testng.Assert.fail;
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
public class OutputDirectoryTest extends SqoopTestCase {
- @Test
- public void testOutputDirectoryIsAFile() throws Exception {
- createAndLoadTableCities();
- hdfsClient.delete(new Path(getMapreduceDirectory()), true);
- hdfsClient.createNewFile(new Path(getMapreduceDirectory()));
+ private MJob job;
+
+ @BeforeMethod
+ public void setup() {
+ createAndLoadTableCities();
// RDBMS link
MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
@@ -57,10 +60,22 @@ public class OutputDirectoryTest extends SqoopTestCase {
saveLink(hdfsConnection);
// Job creation
- MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
+ job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
// Set rdbms "FROM" config
fillRdbmsFromConfig(job, "id");
+ }
+
+ @AfterMethod
+ public void cleanUp() {
+ dropTable();
+ }
+
+ @Test
+ public void testOutputDirectoryIsAFile() throws Exception {
+
+ hdfsClient.delete(new Path(getMapreduceDirectory()), true);
+ hdfsClient.createNewFile(new Path(getMapreduceDirectory()));
// fill the hdfs "TO" config
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
@@ -71,33 +86,14 @@ public class OutputDirectoryTest extends SqoopTestCase {
HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007.toString(),
"is a file"
);
-
- dropTable();
}
@Test
- public void testOutputDirectoryIsNotEmpty() throws Exception {
- createAndLoadTableCities();
+ public void testOutputDirectoryIsNotEmptyWithoutDeleteOption() throws Exception {
hdfsClient.mkdirs(new Path(getMapreduceDirectory()));
hdfsClient.createNewFile(new Path(getMapreduceDirectory() + "/x"));
- // RDBMS link
- MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
- fillRdbmsLinkConfig(rdbmsConnection);
- saveLink(rdbmsConnection);
-
- // HDFS link
- MLink hdfsConnection = getClient().createLink("hdfs-connector");
- fillHdfsLink(hdfsConnection);
- saveLink(hdfsConnection);
-
- // Job creation
- MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
-
- // Set rdbms "FROM" config
- fillRdbmsFromConfig(job, "id");
-
// fill the hdfs "TO" config
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
@@ -107,31 +103,41 @@ public class OutputDirectoryTest extends SqoopTestCase {
HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007.toString(),
"is not empty"
);
-
- dropTable();
}
@Test
- public void testOutputDirectoryIsEmpty() throws Exception {
- createAndLoadTableCities();
+ public void testOutputDirectoryIsNotEmptyWithDeleteOption() throws Exception {
hdfsClient.mkdirs(new Path(getMapreduceDirectory()));
+ hdfsClient.createNewFile(new Path(getMapreduceDirectory() + "/x"));
- // RDBMS link
- MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
- fillRdbmsLinkConfig(rdbmsConnection);
- saveLink(rdbmsConnection);
+ // fill the hdfs "TO" config
+ fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+ job.getToJobConfig().getStringInput("toJobConfig.outputDirectory")
+ .setValue(getMapreduceDirectory());
- // HDFS link
- MLink hdfsConnection = getClient().createLink("hdfs-connector");
- fillHdfsLink(hdfsConnection);
- saveLink(hdfsConnection);
+ fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+ job.getToJobConfig().getBooleanInput("toJobConfig.deleteOutputDirectory").setValue(true);
- // Job creation
- MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
- // Set rdbms "FROM" config
- fillRdbmsFromConfig(job, "id");
+ saveJob(job);
+
+ executeJob(job);
+
+ // Assert correct output
+ assertTo(
+ "1,'USA','2004-10-23 00:00:00.000','San Francisco'",
+ "2,'USA','2004-10-24 00:00:00.000','Sunnyvale'",
+ "3,'Czech Republic','2004-10-25 00:00:00.000','Brno'",
+ "4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
+ "5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
+ );
+ }
+
+ @Test
+ public void testOutputDirectoryIsEmpty() throws Exception {
+
+ hdfsClient.mkdirs(new Path(getMapreduceDirectory()));
// fill the hdfs "TO" config
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
@@ -148,12 +154,10 @@ public class OutputDirectoryTest extends SqoopTestCase {
"4,'USA','2004-10-26 00:00:00.000','Palo Alto'",
"5,'USA','2004-10-27 00:00:00.000','Martha\\'s Vineyard'"
);
-
- dropTable();
}
public void assertJobSubmissionFailure(MJob job, String ...fragments) throws Exception {
- // Try to execute the job and verify that the it was not successful
+ // Try to execute the job and verify that it was not successful
try {
executeJob(job);
fail("Expected failure in the job submission.");