You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/09/09 10:34:13 UTC
git commit: SQOOP-1182: Expose compression options for Sqoop2 import
Updated Branches:
refs/heads/sqoop2 894d21ded -> 08a829fd6
SQOOP-1182: Expose compression options for Sqoop2 import
(Raghav Kumar Gautam via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/08a829fd
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/08a829fd
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/08a829fd
Branch: refs/heads/sqoop2
Commit: 08a829fd6ca8476280066e973311fb346664d62b
Parents: 894d21d
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Sep 9 01:33:16 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Sep 9 01:33:16 2013 -0700
----------------------------------------------------------------------
.../configuration/OutputCompression.java | 32 ++++++++
.../framework/configuration/OutputForm.java | 2 +
.../resources/framework-resources.properties | 3 +
.../mapreduce/MapreduceExecutionEngine.java | 29 +++++++
.../mapreduce/MapreduceExecutionEngineTest.java | 86 ++++++++++++++++++++
5 files changed, 152 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
new file mode 100644
index 0000000..3b5ffc5
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.framework.configuration;
+
+/**
+ * Supported compressions
+ */
+public enum OutputCompression {
+ NONE,
+ DEFAULT,
+ DEFLATE,
+ GZIP,
+ BZIP2,
+ LZO,
+ LZ4,
+ SNAPPY,
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
index 3cb9499..18eeab3 100644
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
+++ b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
@@ -30,5 +30,7 @@ public class OutputForm {
@Input public OutputFormat outputFormat;
+ @Input public OutputCompression compression;
+
@Input(size = 255) public String outputDirectory;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/core/src/main/resources/framework-resources.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/framework-resources.properties b/core/src/main/resources/framework-resources.properties
index cebc90e..a3d3330 100644
--- a/core/src/main/resources/framework-resources.properties
+++ b/core/src/main/resources/framework-resources.properties
@@ -38,6 +38,9 @@ output.storageType.help = Target on Hadoop ecosystem where to store data
output.outputFormat.label = Output format
output.outputFormat.help = Format in which data should be serialized
+output.compression.label = Compression format
+output.compression.help = Compression that should be used for the data
+
output.outputDirectory.label = Output directory
output.outputDirectory.help = Output directory for final data
http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 767080c..392007d 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -96,6 +96,35 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024,
"Format: " + jobConf.output.outputFormat);
}
+ if(getCompressionCodecName(jobConf) != null) {
+ context.setString(JobConstants.HADOOP_COMPRESS_CODEC,
+ getCompressionCodecName(jobConf));
+ context.setBoolean(JobConstants.HADOOP_COMPRESS, true);
+ }
+ }
+
+ private String getCompressionCodecName(ImportJobConfiguration jobConf) {
+ if(jobConf.output.compression == null)
+ return null;
+ switch(jobConf.output.compression) {
+ case NONE:
+ return null;
+ case DEFAULT:
+ return "org.apache.hadoop.io.compress.DefaultCodec";
+ case DEFLATE:
+ return "org.apache.hadoop.io.compress.DeflateCodec";
+ case GZIP:
+ return "org.apache.hadoop.io.compress.GzipCodec";
+ case BZIP2:
+ return "org.apache.hadoop.io.compress.BZip2Codec";
+ case LZO:
+ return "com.hadoop.compression.lzo.LzoCodec";
+ case LZ4:
+ return "org.apache.hadoop.io.compress.Lz4Codec";
+ case SNAPPY:
+ return "org.apache.hadoop.io.compress.SnappyCodec";
+ }
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/08a829fd/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
new file mode 100644
index 0000000..19f5a22
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.execution.mapreduce;
+
+import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.framework.SubmissionRequest;
+import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
+import org.apache.sqoop.framework.configuration.OutputCompression;
+import org.apache.sqoop.framework.configuration.OutputFormat;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertEquals;
+
+public class MapreduceExecutionEngineTest {
+ @Test
+ public void testImportCompression() throws Exception {
+ testImportCompressionInner(OutputCompression.NONE,
+ null, false);
+
+ testImportCompressionInner(OutputCompression.DEFAULT,
+ "org.apache.hadoop.io.compress.DefaultCodec", true);
+
+ testImportCompressionInner(OutputCompression.GZIP,
+ "org.apache.hadoop.io.compress.GzipCodec", true);
+
+ testImportCompressionInner(OutputCompression.BZIP2,
+ "org.apache.hadoop.io.compress.BZip2Codec", true);
+
+ testImportCompressionInner(OutputCompression.LZO,
+ "com.hadoop.compression.lzo.LzoCodec", true);
+
+ testImportCompressionInner(OutputCompression.LZ4,
+ "org.apache.hadoop.io.compress.Lz4Codec", true);
+
+ testImportCompressionInner(OutputCompression.SNAPPY,
+ "org.apache.hadoop.io.compress.SnappyCodec", true);
+
+ testImportCompressionInner(null,
+ null, false);
+ }
+
+ private void testImportCompressionInner(OutputCompression comprssionFormat,
+ String expectedCodecName, boolean expectedCompressionFlag) {
+ MapreduceExecutionEngine executionEngine = new MapreduceExecutionEngine();
+ SubmissionRequest request = executionEngine.createSubmissionRequest();
+ ImportJobConfiguration jobConf = new ImportJobConfiguration();
+ jobConf.output.outputFormat = OutputFormat.TEXT_FILE;
+ jobConf.output.compression = comprssionFormat;
+ request.setConfigFrameworkJob(jobConf);
+ request.setConnectorCallbacks(new Importer(Initializer.class,
+ Partitioner.class, Extractor.class, Destroyer.class) {
+ });
+ executionEngine.prepareImportSubmission(request);
+
+ MutableMapContext context = request.getFrameworkContext();
+ final String obtainedCodecName = context.getString(
+ JobConstants.HADOOP_COMPRESS_CODEC);
+ final boolean obtainedCodecFlag =
+ context.getBoolean(JobConstants.HADOOP_COMPRESS, false);
+ assertEquals("Unexpected codec name was returned", obtainedCodecName,
+ expectedCodecName);
+ assertEquals("Unexpected codec flag was returned", obtainedCodecFlag,
+ expectedCompressionFlag);
+ }
+}