You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/09/09 10:34:13 UTC

git commit: SQOOP-1182: Expose compression options for Sqoop2 import

Updated Branches:
  refs/heads/sqoop2 894d21ded -> 08a829fd6


SQOOP-1182: Expose compression options for Sqoop2 import

(Raghav Kumar Gautam via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/08a829fd
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/08a829fd
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/08a829fd

Branch: refs/heads/sqoop2
Commit: 08a829fd6ca8476280066e973311fb346664d62b
Parents: 894d21d
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Sep 9 01:33:16 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Sep 9 01:33:16 2013 -0700

----------------------------------------------------------------------
 .../configuration/OutputCompression.java        | 32 ++++++++
 .../framework/configuration/OutputForm.java     |  2 +
 .../resources/framework-resources.properties    |  3 +
 .../mapreduce/MapreduceExecutionEngine.java     | 29 +++++++
 .../mapreduce/MapreduceExecutionEngineTest.java | 86 ++++++++++++++++++++
 5 files changed, 152 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
new file mode 100644
index 0000000..3b5ffc5
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.framework.configuration;
+
+/**
+ * Supported compressions
+ */
+public enum OutputCompression {
+  NONE,
+  DEFAULT,
+  DEFLATE,
+  GZIP,
+  BZIP2,
+  LZO,
+  LZ4,
+  SNAPPY,
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
index 3cb9499..18eeab3 100644
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
+++ b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
@@ -30,5 +30,7 @@ public class OutputForm {
 
   @Input public OutputFormat outputFormat;
 
+  @Input public OutputCompression compression;
+
   @Input(size = 255) public String outputDirectory;
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/core/src/main/resources/framework-resources.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/framework-resources.properties b/core/src/main/resources/framework-resources.properties
index cebc90e..a3d3330 100644
--- a/core/src/main/resources/framework-resources.properties
+++ b/core/src/main/resources/framework-resources.properties
@@ -38,6 +38,9 @@ output.storageType.help = Target on Hadoop ecosystem where to store data
 output.outputFormat.label = Output format
 output.outputFormat.help = Format in which data should be serialized
 
+output.compression.label = Compression format
+output.compression.help = Compression that should be used for the data
+
 output.outputDirectory.label = Output directory
 output.outputDirectory.help = Output directory for final data
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 767080c..392007d 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -96,6 +96,35 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
       throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024,
         "Format: " + jobConf.output.outputFormat);
     }
+    if(getCompressionCodecName(jobConf) != null) {
+      context.setString(JobConstants.HADOOP_COMPRESS_CODEC,
+        getCompressionCodecName(jobConf));
+      context.setBoolean(JobConstants.HADOOP_COMPRESS, true);
+    }
+  }
+
+  private String getCompressionCodecName(ImportJobConfiguration jobConf) {
+    if(jobConf.output.compression == null)
+      return null;
+    switch(jobConf.output.compression) {
+      case NONE:
+        return null;
+      case DEFAULT:
+        return "org.apache.hadoop.io.compress.DefaultCodec";
+      case DEFLATE:
+        return "org.apache.hadoop.io.compress.DeflateCodec";
+      case GZIP:
+        return "org.apache.hadoop.io.compress.GzipCodec";
+      case BZIP2:
+        return "org.apache.hadoop.io.compress.BZip2Codec";
+      case LZO:
+        return "com.hadoop.compression.lzo.LzoCodec";
+      case LZ4:
+        return "org.apache.hadoop.io.compress.Lz4Codec";
+      case SNAPPY:
+        return "org.apache.hadoop.io.compress.SnappyCodec";
+    }
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
new file mode 100644
index 0000000..19f5a22
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.execution.mapreduce;
+
+import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.framework.SubmissionRequest;
+import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
+import org.apache.sqoop.framework.configuration.OutputCompression;
+import org.apache.sqoop.framework.configuration.OutputFormat;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertEquals;
+
+public class MapreduceExecutionEngineTest {
+  @Test
+  public void testImportCompression() throws Exception {
+    testImportCompressionInner(OutputCompression.NONE,
+      null, false);
+
+    testImportCompressionInner(OutputCompression.DEFAULT,
+      "org.apache.hadoop.io.compress.DefaultCodec", true);
+
+    testImportCompressionInner(OutputCompression.GZIP,
+      "org.apache.hadoop.io.compress.GzipCodec", true);
+
+    testImportCompressionInner(OutputCompression.BZIP2,
+      "org.apache.hadoop.io.compress.BZip2Codec", true);
+
+    testImportCompressionInner(OutputCompression.LZO,
+      "com.hadoop.compression.lzo.LzoCodec", true);
+
+    testImportCompressionInner(OutputCompression.LZ4,
+      "org.apache.hadoop.io.compress.Lz4Codec", true);
+
+    testImportCompressionInner(OutputCompression.SNAPPY,
+      "org.apache.hadoop.io.compress.SnappyCodec", true);
+
+    testImportCompressionInner(null,
+      null, false);
+  }
+
+  private void testImportCompressionInner(OutputCompression comprssionFormat,
+    String expectedCodecName, boolean expectedCompressionFlag) {
+    MapreduceExecutionEngine executionEngine = new MapreduceExecutionEngine();
+    SubmissionRequest request = executionEngine.createSubmissionRequest();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+    jobConf.output.outputFormat = OutputFormat.TEXT_FILE;
+    jobConf.output.compression = comprssionFormat;
+    request.setConfigFrameworkJob(jobConf);
+    request.setConnectorCallbacks(new Importer(Initializer.class,
+      Partitioner.class, Extractor.class, Destroyer.class) {
+    });
+    executionEngine.prepareImportSubmission(request);
+
+    MutableMapContext context = request.getFrameworkContext();
+    final String obtainedCodecName = context.getString(
+      JobConstants.HADOOP_COMPRESS_CODEC);
+    final boolean obtainedCodecFlag =
+      context.getBoolean(JobConstants.HADOOP_COMPRESS, false);
+    assertEquals("Unexpected codec name was returned", obtainedCodecName,
+      expectedCodecName);
+    assertEquals("Unexpected codec flag was returned", obtainedCodecFlag,
+      expectedCompressionFlag);
+  }
+}