You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/03/18 22:36:52 UTC

sqoop git commit: SQOOP-2228: Sqoop2: HDFS Connector: Import data to temporary directory before moving them to target directory

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 f98fc2885 -> a9f7b3ddd


SQOOP-2228: Sqoop2: HDFS Connector: Import data to temporary directory before moving them to target directory

(Jarek Jarcec Cecho via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/a9f7b3dd
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/a9f7b3dd
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/a9f7b3dd

Branch: refs/heads/sqoop2
Commit: a9f7b3ddd0b00557901dd2268b66b799a4f1c684
Parents: f98fc28
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Wed Mar 18 14:36:05 2015 -0700
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Wed Mar 18 14:36:05 2015 -0700

----------------------------------------------------------------------
 .../sqoop/error/code/HdfsConnectorError.java    |   2 +
 .../sqoop/connector/hdfs/HdfsConstants.java     |   3 +
 .../apache/sqoop/connector/hdfs/HdfsLoader.java |   2 +-
 .../sqoop/connector/hdfs/HdfsToDestroyer.java   |  45 ++++++--
 .../sqoop/connector/hdfs/HdfsToInitializer.java |  23 ++--
 .../apache/sqoop/connector/hdfs/TestLoader.java |   4 +-
 .../sqoop/connector/hdfs/TestToDestroyer.java   | 114 +++++++++++++++++++
 .../sqoop/connector/hdfs/TestToInitializer.java |  21 ++++
 8 files changed, 196 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
index c85e7fc..6cd66cc 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
@@ -36,6 +36,8 @@ public enum HdfsConnectorError implements ErrorCode{
 
   GENERIC_HDFS_CONNECTOR_0007("Invalid output directory"),
 
+  GENERIC_HDFS_CONNECTOR_0008("Error occurs during destroyer run"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
index bd74bec..9d20a79 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
@@ -30,4 +30,7 @@ public final class HdfsConstants extends Constants {
 
   public static final char DEFAULT_RECORD_DELIMITER = '\n';
 
+  public static final String PREFIX = "org.apache.sqoop.connector.hdfs.";
+
+  public static final String WORK_DIRECTORY = PREFIX + "work_dir";
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 0ced6d0..96913e8 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -57,7 +57,7 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
     HdfsUtils.contextToConfiguration(context.getContext(), conf);
 
     DataReader reader = context.getDataReader();
-    String directoryName = toJobConfig.toJobConfig.outputDirectory;
+    String directoryName = context.getString(HdfsConstants.WORK_DIRECTORY);
     String codecname = getCompressionCodecName(toJobConfig);
 
     CompressionCodec codec = null;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
index 3c85be8..11b2ae3 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
@@ -17,22 +17,51 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
 
+import java.io.IOException;
+
 public class HdfsToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> {
+
+  private static final Logger LOG = Logger.getLogger(HdfsToDestroyer.class);
+
   /**
-   * Callback to clean up after job execution.
-   *
-   * @param context Destroyer context
-   * @param linkConfig link configuration object
-   * @param jobConfig TO job configuration object
+   * {@inheritDoc}
    */
   @Override
-  public void destroy(DestroyerContext context, LinkConfiguration linkConfig,
-      ToJobConfiguration jobConfig) {
-    // do nothing at this point
+  public void destroy(DestroyerContext context, LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
+    Configuration configuration = new Configuration();
+    HdfsUtils.contextToConfiguration(context.getContext(), configuration);
+
+    String workingDirectory = context.getString(HdfsConstants.WORK_DIRECTORY);
+    Path targetDirectory = new Path(jobConfig.toJobConfig.outputDirectory);
+
+    try {
+      FileSystem fs = FileSystem.get(configuration);
+
+      // If we succeeded, we need to move all files from working directory
+      if(context.isSuccess()) {
+        FileStatus[] fileStatuses = fs.listStatus(new Path(workingDirectory));
+        for (FileStatus status : fileStatuses) {
+          LOG.info("Committing file: " + status.getPath().toString() + " of size " + status.getLen());
+          fs.rename(status.getPath(), new Path(targetDirectory, status.getPath().getName()));
+        }
+      }
+
+      // Clean up working directory
+      fs.delete(new Path(workingDirectory), true);
+    } catch (IOException e) {
+      throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0008, e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
index 83bac27..05ceb23 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
@@ -29,19 +30,22 @@ import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
 
 import java.io.IOException;
+import java.util.UUID;
 
 public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration> {
+
+  private static final Logger LOG = Logger.getLogger(HdfsToInitializer.class);
+
   /**
-   * Initialize new submission based on given configuration properties. Any
-   * needed temporary values might be saved to context object and they will be
-   * promoted to all other part of the workflow automatically.
-   *
-   * @param context Initializer context object
-   * @param linkConfig link configuration object
-   * @param jobConfig TO job configuration object
+   * {@inheritDoc}
    */
   @Override
   public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
+    assert jobConfig != null;
+    assert linkConfig != null;
+    assert jobConfig.toJobConfig != null;
+    assert jobConfig.toJobConfig.outputDirectory != null;
+
     Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
     HdfsUtils.configurationToContext(configuration, context.getContext());
 
@@ -65,5 +69,10 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi
     } catch (IOException e) {
       throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Unexpected exception", e);
     }
+
+    // Building working directory
+    String workingDirectory = jobConfig.toJobConfig.outputDirectory + "/." + UUID.randomUUID();
+    LOG.info("Using working directory: " + workingDirectory);
+    context.getContext().setString(HdfsConstants.WORK_DIRECTORY, workingDirectory);
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
index b7c81ec..3b81715 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
@@ -103,6 +103,7 @@ public class TestLoader extends TestHdfsBase {
         .addColumn(new Text("col3"));
 
     Configuration conf = new Configuration();
+    conf.set("org.apache.sqoop.job.connector.from.context." + HdfsConstants.WORK_DIRECTORY, outputDirectory);
     PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
     LoaderContext context = new LoaderContext(prefixContext, new DataReader() {
       private long index = 0L;
@@ -128,7 +129,6 @@ public class TestLoader extends TestHdfsBase {
     }, null);
     LinkConfiguration linkConf = new LinkConfiguration();
     ToJobConfiguration jobConf = new ToJobConfiguration();
-    jobConf.toJobConfig.outputDirectory = outputDirectory;
     jobConf.toJobConfig.compression = compression;
     jobConf.toJobConfig.outputFormat = outputFormat;
     Path outputPath = new Path(outputDirectory);
@@ -157,6 +157,7 @@ public class TestLoader extends TestHdfsBase {
         .addColumn(new Text("col4"));
 
     Configuration conf = new Configuration();
+    conf.set("org.apache.sqoop.job.connector.from.context." + HdfsConstants.WORK_DIRECTORY, outputDirectory);
     PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
     LoaderContext context = new LoaderContext(prefixContext, new DataReader() {
       private long index = 0L;
@@ -187,7 +188,6 @@ public class TestLoader extends TestHdfsBase {
     }, schema);
     LinkConfiguration linkConf = new LinkConfiguration();
     ToJobConfiguration jobConf = new ToJobConfiguration();
-    jobConf.toJobConfig.outputDirectory = outputDirectory;
     jobConf.toJobConfig.compression = compression;
     jobConf.toJobConfig.outputFormat = outputFormat;
     jobConf.toJobConfig.overrideNullValue = true;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java
new file mode 100644
index 0000000..e1f416e
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import com.google.common.io.Files;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+import org.testng.annotations.Test;
+
+import java.io.File;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ */
+public class TestToDestroyer {
+
+  @Test
+  public void testDestroyOnSuccess() throws Exception {
+    File workDir = Files.createTempDir();
+    File targetDir = Files.createTempDir();
+
+    File.createTempFile("part-01-", ".txt", workDir).createNewFile();
+    File.createTempFile("part-02-", ".txt", workDir).createNewFile();
+    File.createTempFile("part-03-", ".txt", workDir).createNewFile();
+
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
+    jobConfig.toJobConfig.outputDirectory = targetDir.getAbsolutePath();
+
+    MutableContext context = new MutableMapContext();
+    context.setString(HdfsConstants.WORK_DIRECTORY, workDir.getAbsolutePath());
+
+    Destroyer destroyer = new HdfsToDestroyer();
+    destroyer.destroy(new DestroyerContext(context, true, null), linkConfig, jobConfig);
+
+    File[] files = targetDir.listFiles();
+
+    // We should see three files in the target directory
+    assertNotNull(files);
+    assertEquals(files.length, 3);
+
+    // With expected file names
+    boolean f1 = false, f2 = false, f3 = false;
+    for(File f : files) {
+      if(f.getName().startsWith("part-01-")) {
+        f1 = true;
+      }
+      if(f.getName().startsWith("part-02-")) {
+        f2 = true;
+      }
+      if(f.getName().startsWith("part-03-")) {
+        f3 = true;
+      }
+    }
+    assertTrue(f1);
+    assertTrue(f2);
+    assertTrue(f3);
+
+    // And target directory should not exists
+    assertFalse(workDir.exists());
+  }
+  @Test
+  public void testDestroyOnFailure() throws Exception {
+    File workDir = Files.createTempDir();
+    File targetDir = Files.createTempDir();
+
+    File.createTempFile("part-01-", ".txt", workDir).createNewFile();
+    File.createTempFile("part-02-", ".txt", workDir).createNewFile();
+    File.createTempFile("part-03-", ".txt", workDir).createNewFile();
+
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
+    jobConfig.toJobConfig.outputDirectory = targetDir.getAbsolutePath();
+
+    MutableContext context = new MutableMapContext();
+    context.setString(HdfsConstants.WORK_DIRECTORY, workDir.getAbsolutePath());
+
+    Destroyer destroyer = new HdfsToDestroyer();
+    destroyer.destroy(new DestroyerContext(context, false, null), linkConfig, jobConfig);
+
+    File[] files = targetDir.listFiles();
+
+    // We should see no files in the target directory
+    assertNotNull(files);
+    assertEquals(files.length, 0);
+
+    // And target directory should not exists
+    assertFalse(workDir.exists());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a9f7b3dd/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
index 1daa25a..914c3ca 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
@@ -28,11 +28,32 @@ import org.testng.annotations.Test;
 
 import java.io.File;
 
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
 /**
  *
  */
 public class TestToInitializer extends TestHdfsBase {
 
+  @Test
+  public void testWorkDirectoryBeingSet() {
+    final String TARGET_DIR = "/target/directory";
+
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
+
+    jobConfig.toJobConfig.outputDirectory = TARGET_DIR;
+
+    InitializerContext initializerContext = new InitializerContext(new MutableMapContext());
+
+    Initializer initializer = new HdfsToInitializer();
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+    assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));
+    assertTrue(initializerContext.getString(HdfsConstants.WORK_DIRECTORY).startsWith(TARGET_DIR + "/."));
+  }
+
   @Test(expectedExceptions = SqoopException.class)
   public void testOutputDirectoryIsAFile() throws Exception {
     File file = File.createTempFile("MastersOfOrion", ".txt");