You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/03/18 22:36:52 UTC
sqoop git commit: SQOOP-2228: Sqoop2: HDFS Connector: Import data to
temporary directory before moving them to target directory
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 f98fc2885 -> a9f7b3ddd
SQOOP-2228: Sqoop2: HDFS Connector: Import data to temporary directory before moving them to target directory
(Jarek Jarcec Cecho via Abraham Elmahrek)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/a9f7b3dd
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/a9f7b3dd
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/a9f7b3dd
Branch: refs/heads/sqoop2
Commit: a9f7b3ddd0b00557901dd2268b66b799a4f1c684
Parents: f98fc28
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Wed Mar 18 14:36:05 2015 -0700
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Wed Mar 18 14:36:05 2015 -0700
----------------------------------------------------------------------
.../sqoop/error/code/HdfsConnectorError.java | 2 +
.../sqoop/connector/hdfs/HdfsConstants.java | 3 +
.../apache/sqoop/connector/hdfs/HdfsLoader.java | 2 +-
.../sqoop/connector/hdfs/HdfsToDestroyer.java | 45 ++++++--
.../sqoop/connector/hdfs/HdfsToInitializer.java | 23 ++--
.../apache/sqoop/connector/hdfs/TestLoader.java | 4 +-
.../sqoop/connector/hdfs/TestToDestroyer.java | 114 +++++++++++++++++++
.../sqoop/connector/hdfs/TestToInitializer.java | 21 ++++
8 files changed, 196 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
index c85e7fc..6cd66cc 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
@@ -36,6 +36,8 @@ public enum HdfsConnectorError implements ErrorCode{
GENERIC_HDFS_CONNECTOR_0007("Invalid output directory"),
+ GENERIC_HDFS_CONNECTOR_0008("Error occurs during destroyer run"),
+
;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
index bd74bec..9d20a79 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
@@ -30,4 +30,7 @@ public final class HdfsConstants extends Constants {
public static final char DEFAULT_RECORD_DELIMITER = '\n';
+ public static final String PREFIX = "org.apache.sqoop.connector.hdfs.";
+
+ public static final String WORK_DIRECTORY = PREFIX + "work_dir";
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 0ced6d0..96913e8 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -57,7 +57,7 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
HdfsUtils.contextToConfiguration(context.getContext(), conf);
DataReader reader = context.getDataReader();
- String directoryName = toJobConfig.toJobConfig.outputDirectory;
+ String directoryName = context.getString(HdfsConstants.WORK_DIRECTORY);
String codecname = getCompressionCodecName(toJobConfig);
CompressionCodec codec = null;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
index 3c85be8..11b2ae3 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
@@ -17,22 +17,51 @@
*/
package org.apache.sqoop.connector.hdfs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
+import java.io.IOException;
+
public class HdfsToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> {
+
+ private static final Logger LOG = Logger.getLogger(HdfsToDestroyer.class);
+
/**
- * Callback to clean up after job execution.
- *
- * @param context Destroyer context
- * @param linkConfig link configuration object
- * @param jobConfig TO job configuration object
+ * {@inheritDoc}
*/
@Override
- public void destroy(DestroyerContext context, LinkConfiguration linkConfig,
- ToJobConfiguration jobConfig) {
- // do nothing at this point
+ public void destroy(DestroyerContext context, LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
+ Configuration configuration = new Configuration();
+ HdfsUtils.contextToConfiguration(context.getContext(), configuration);
+
+ String workingDirectory = context.getString(HdfsConstants.WORK_DIRECTORY);
+ Path targetDirectory = new Path(jobConfig.toJobConfig.outputDirectory);
+
+ try {
+ FileSystem fs = FileSystem.get(configuration);
+
+ // If we succeeded, we need to move all files from working directory
+ if(context.isSuccess()) {
+ FileStatus[] fileStatuses = fs.listStatus(new Path(workingDirectory));
+ for (FileStatus status : fileStatuses) {
+ LOG.info("Committing file: " + status.getPath().toString() + " of size " + status.getLen());
+ fs.rename(status.getPath(), new Path(targetDirectory, status.getPath().getName()));
+ }
+ }
+
+ // Clean up working directory
+ fs.delete(new Path(workingDirectory), true);
+ } catch (IOException e) {
+ throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0008, e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
index 83bac27..05ceb23 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
@@ -29,19 +30,22 @@ import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import java.io.IOException;
+import java.util.UUID;
public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration> {
+
+ private static final Logger LOG = Logger.getLogger(HdfsToInitializer.class);
+
/**
- * Initialize new submission based on given configuration properties. Any
- * needed temporary values might be saved to context object and they will be
- * promoted to all other part of the workflow automatically.
- *
- * @param context Initializer context object
- * @param linkConfig link configuration object
- * @param jobConfig TO job configuration object
+ * {@inheritDoc}
*/
@Override
public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
+ assert jobConfig != null;
+ assert linkConfig != null;
+ assert jobConfig.toJobConfig != null;
+ assert jobConfig.toJobConfig.outputDirectory != null;
+
Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
HdfsUtils.configurationToContext(configuration, context.getContext());
@@ -65,5 +69,10 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi
} catch (IOException e) {
throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Unexpected exception", e);
}
+
+ // Building working directory
+ String workingDirectory = jobConfig.toJobConfig.outputDirectory + "/." + UUID.randomUUID();
+ LOG.info("Using working directory: " + workingDirectory);
+ context.getContext().setString(HdfsConstants.WORK_DIRECTORY, workingDirectory);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
index b7c81ec..3b81715 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
@@ -103,6 +103,7 @@ public class TestLoader extends TestHdfsBase {
.addColumn(new Text("col3"));
Configuration conf = new Configuration();
+ conf.set("org.apache.sqoop.job.connector.from.context." + HdfsConstants.WORK_DIRECTORY, outputDirectory);
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
LoaderContext context = new LoaderContext(prefixContext, new DataReader() {
private long index = 0L;
@@ -128,7 +129,6 @@ public class TestLoader extends TestHdfsBase {
}, null);
LinkConfiguration linkConf = new LinkConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
- jobConf.toJobConfig.outputDirectory = outputDirectory;
jobConf.toJobConfig.compression = compression;
jobConf.toJobConfig.outputFormat = outputFormat;
Path outputPath = new Path(outputDirectory);
@@ -157,6 +157,7 @@ public class TestLoader extends TestHdfsBase {
.addColumn(new Text("col4"));
Configuration conf = new Configuration();
+ conf.set("org.apache.sqoop.job.connector.from.context." + HdfsConstants.WORK_DIRECTORY, outputDirectory);
PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
LoaderContext context = new LoaderContext(prefixContext, new DataReader() {
private long index = 0L;
@@ -187,7 +188,6 @@ public class TestLoader extends TestHdfsBase {
}, schema);
LinkConfiguration linkConf = new LinkConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
- jobConf.toJobConfig.outputDirectory = outputDirectory;
jobConf.toJobConfig.compression = compression;
jobConf.toJobConfig.outputFormat = outputFormat;
jobConf.toJobConfig.overrideNullValue = true;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java
new file mode 100644
index 0000000..e1f416e
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import com.google.common.io.Files;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+import org.testng.annotations.Test;
+
+import java.io.File;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ */
+public class TestToDestroyer {
+
+ @Test
+ public void testDestroyOnSuccess() throws Exception {
+ File workDir = Files.createTempDir();
+ File targetDir = Files.createTempDir();
+
+ File.createTempFile("part-01-", ".txt", workDir).createNewFile();
+ File.createTempFile("part-02-", ".txt", workDir).createNewFile();
+ File.createTempFile("part-03-", ".txt", workDir).createNewFile();
+
+ LinkConfiguration linkConfig = new LinkConfiguration();
+ ToJobConfiguration jobConfig = new ToJobConfiguration();
+ jobConfig.toJobConfig.outputDirectory = targetDir.getAbsolutePath();
+
+ MutableContext context = new MutableMapContext();
+ context.setString(HdfsConstants.WORK_DIRECTORY, workDir.getAbsolutePath());
+
+ Destroyer destroyer = new HdfsToDestroyer();
+ destroyer.destroy(new DestroyerContext(context, true, null), linkConfig, jobConfig);
+
+ File[] files = targetDir.listFiles();
+
+ // We should see three files in the target directory
+ assertNotNull(files);
+ assertEquals(files.length, 3);
+
+ // With expected file names
+ boolean f1 = false, f2 = false, f3 = false;
+ for(File f : files) {
+ if(f.getName().startsWith("part-01-")) {
+ f1 = true;
+ }
+ if(f.getName().startsWith("part-02-")) {
+ f2 = true;
+ }
+ if(f.getName().startsWith("part-03-")) {
+ f3 = true;
+ }
+ }
+ assertTrue(f1);
+ assertTrue(f2);
+ assertTrue(f3);
+
+ // And target directory should not exists
+ assertFalse(workDir.exists());
+ }
+ @Test
+ public void testDestroyOnFailure() throws Exception {
+ File workDir = Files.createTempDir();
+ File targetDir = Files.createTempDir();
+
+ File.createTempFile("part-01-", ".txt", workDir).createNewFile();
+ File.createTempFile("part-02-", ".txt", workDir).createNewFile();
+ File.createTempFile("part-03-", ".txt", workDir).createNewFile();
+
+ LinkConfiguration linkConfig = new LinkConfiguration();
+ ToJobConfiguration jobConfig = new ToJobConfiguration();
+ jobConfig.toJobConfig.outputDirectory = targetDir.getAbsolutePath();
+
+ MutableContext context = new MutableMapContext();
+ context.setString(HdfsConstants.WORK_DIRECTORY, workDir.getAbsolutePath());
+
+ Destroyer destroyer = new HdfsToDestroyer();
+ destroyer.destroy(new DestroyerContext(context, false, null), linkConfig, jobConfig);
+
+ File[] files = targetDir.listFiles();
+
+ // We should see no files in the target directory
+ assertNotNull(files);
+ assertEquals(files.length, 0);
+
+ // And target directory should not exists
+ assertFalse(workDir.exists());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
index 1daa25a..914c3ca 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
@@ -28,11 +28,32 @@ import org.testng.annotations.Test;
import java.io.File;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
/**
*
*/
public class TestToInitializer extends TestHdfsBase {
+ @Test
+ public void testWorkDirectoryBeingSet() {
+ final String TARGET_DIR = "/target/directory";
+
+ LinkConfiguration linkConfig = new LinkConfiguration();
+ ToJobConfiguration jobConfig = new ToJobConfiguration();
+
+ jobConfig.toJobConfig.outputDirectory = TARGET_DIR;
+
+ InitializerContext initializerContext = new InitializerContext(new MutableMapContext());
+
+ Initializer initializer = new HdfsToInitializer();
+ initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+ assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));
+ assertTrue(initializerContext.getString(HdfsConstants.WORK_DIRECTORY).startsWith(TARGET_DIR + "/."));
+ }
+
@Test(expectedExceptions = SqoopException.class)
public void testOutputDirectoryIsAFile() throws Exception {
File file = File.createTempFile("MastersOfOrion", ".txt");