You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bo...@apache.org on 2018/07/20 07:42:19 UTC
sqoop git commit: SQOOP-3329: Remove Kite dependency from the Sqoop
project
Repository: sqoop
Updated Branches:
refs/heads/trunk 17461e91d -> 739bbce48
SQOOP-3329: Remove Kite dependency from the Sqoop project
(Szabolcs Vasas via Boglarka Egyed)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/739bbce4
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/739bbce4
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/739bbce4
Branch: refs/heads/trunk
Commit: 739bbce48593a82575435f1cc48ca7ebd48537c9
Parents: 17461e9
Author: Boglarka Egyed <bo...@apache.org>
Authored: Fri Jul 20 09:36:39 2018 +0200
Committer: Boglarka Egyed <bo...@apache.org>
Committed: Fri Jul 20 09:36:39 2018 +0200
----------------------------------------------------------------------
ivy.xml | 10 +-
ivy/libraries.properties | 3 +-
src/docs/user/hive-notes.txt | 8 -
src/docs/user/import.txt | 39 +---
src/java/org/apache/sqoop/SqoopOptions.java | 4 +-
.../ParquetJobConfiguratorImplementation.java | 3 +-
.../parquet/kite/KiteMergeParquetReducer.java | 36 ---
.../kite/KiteParquetExportJobConfigurator.java | 51 -----
.../parquet/kite/KiteParquetExportMapper.java | 37 ----
.../kite/KiteParquetImportJobConfigurator.java | 98 ---------
.../parquet/kite/KiteParquetImportMapper.java | 55 -----
.../kite/KiteParquetJobConfiguratorFactory.java | 45 ----
.../kite/KiteParquetMergeJobConfigurator.java | 103 ---------
.../parquet/kite/KiteParquetUtils.java | 217 -------------------
.../org/apache/sqoop/tool/BaseSqoopTool.java | 10 -
src/test/org/apache/sqoop/TestMerge.java | 15 +-
.../org/apache/sqoop/TestParquetExport.java | 47 ----
.../org/apache/sqoop/TestParquetImport.java | 42 +---
.../org/apache/sqoop/hive/TestHiveImport.java | 159 --------------
.../apache/sqoop/tool/TestBaseSqoopTool.java | 5 +-
20 files changed, 20 insertions(+), 967 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 1f587f3..796ef70 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -114,15 +114,7 @@ under the License.
conf="common->default;redist->default"/>
<dependency org="org.apache.commons" name="commons-lang3" rev="${commons-lang3.version}"
conf="common->default;redist->default"/>
- <dependency org="org.kitesdk" name="kite-data-mapreduce" rev="${kite-data.version}"
- conf="common->default;redist->default">
- <exclude org="org.apache.avro" module="avro" />
- </dependency>
- <dependency org="org.kitesdk" name="kite-data-hive" rev="${kite-data.version}"
- conf="common->default;redist->default">
- <exclude org="com.twitter" module="parquet-hive-bundle"/>
- <exclude org="org.apache.avro" module="avro" />
- </dependency>
+ <dependency org="com.twitter" name="parquet-avro" rev="${parquet.version}" conf="common->default;redist->default"/>
<dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="${jackson-databind.version}"
conf="common->default;redist->default" />
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/ivy/libraries.properties
----------------------------------------------------------------------
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 565a8bf..c506ca8 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -20,8 +20,6 @@
avro.version=1.8.1
-kite-data.version=1.1.0
-
checkstyle.version=5.0
commons-cli.version=1.2
@@ -62,3 +60,4 @@ hbase.version=1.2.4
hcatalog.version=1.2.1
jackson-databind.version=2.9.5
+parquet.version=1.6.0
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/docs/user/hive-notes.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hive-notes.txt b/src/docs/user/hive-notes.txt
index af97d94..d58c4d6 100644
--- a/src/docs/user/hive-notes.txt
+++ b/src/docs/user/hive-notes.txt
@@ -28,11 +28,3 @@ direct mapping (for example, +DATE+, +TIME+, and +TIMESTAMP+) will be coerced to
+STRING+ in Hive. The +NUMERIC+ and +DECIMAL+ SQL types will be coerced to
+DOUBLE+. In these cases, Sqoop will emit a warning in its log messages
informing you of the loss of precision.
-
-Parquet Support in Hive
-~~~~~~~~~~~~~~~~~~~~~~~
-
-When using the Kite Dataset API based Parquet implementation in order to contact the Hive MetaStore
-from a MapReduce job, a delegation token will be fetched and passed. HIVE_CONF_DIR and HIVE_HOME must be set appropriately to add
-Hive to the runtime classpath. Otherwise, importing/exporting into Hive in Parquet
-format may not work.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/docs/user/import.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt
index a2c16d9..79f7101 100644
--- a/src/docs/user/import.txt
+++ b/src/docs/user/import.txt
@@ -60,7 +60,7 @@ Argument Description
+\--as-sequencefile+ Imports data to SequenceFiles
+\--as-textfile+ Imports data as plain text (default)
+\--as-parquetfile+ Imports data to Parquet Files
-+\--parquet-configurator-implementation+ Sets the implementation used during Parquet import. Supported values: kite, hadoop.
++\--parquet-configurator-implementation+ Sets the implementation used during Parquet import. Supported value: hadoop.
+\--boundary-query <statement>+ Boundary query to use for creating splits
+\--columns <col,col,col...>+ Columns to import from table
+\--delete-target-dir+ Delete the import target directory\
@@ -448,35 +448,14 @@ and Avro files.
Parquet support
+++++++++++++++
-Sqoop has two different implementations for importing data in Parquet format:
+Sqoop has only one implementation now for importing data in Parquet format which is based on the Parquet Hadoop API.
+Note that the legacy Kite Dataset API based implementation is removed so users have to make sure that both
++\--parquet-configurator-implementation+ option and +parquetjob.configurator.implementation+ property are unset or
+set to "hadoop".
-- Kite Dataset API based implementation (default, legacy)
-- Parquet Hadoop API based implementation (recommended)
-
-The users can specify the desired implementation with the +\--parquet-configurator-implementation+ option:
-
-----
-$ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-parquetfile --parquet-configurator-implementation kite
-----
-
-----
-$ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-parquetfile --parquet-configurator-implementation hadoop
-----
-
-If the +\--parquet-configurator-implementation+ option is not present then Sqoop will check the value of +parquetjob.configurator.implementation+
-property (which can be specified using -D in the Sqoop command or in the site.xml). If that value is also absent Sqoop will
-default to Kite Dataset API based implementation.
-
-The Kite Dataset API based implementation executes the import command on a different code
-path than the text import: it creates the Hive table based on the generated Avro schema by connecting to the Hive metastore.
-This can be a disadvantage since sometimes moving from the text file format to the Parquet file format can lead to many
-unexpected behavioral changes. Kite checks the Hive table schema before importing the data into it so if the user wants
-to import some data which has a schema incompatible with the Hive table's schema Sqoop will throw an error. This implementation
-uses snappy codec for compression by default and apart from this it supports the bzip codec too.
-
-The Parquet Hadoop API based implementation builds the Hive CREATE TABLE statement and executes the
-LOAD DATA INPATH command just like the text import does. Unlike Kite it also supports connecting to HiveServer2 (using the +\--hs2-url+ option)
-so it provides better security features. This implementation does not check the Hive table's schema before importing so
+The default Parquet import implementation builds the Hive CREATE TABLE statement and executes the
+LOAD DATA INPATH command just like the text import does. It supports connecting to HiveServer2 (using the +\--hs2-url+ option)
+but it does not check the Hive table's schema before importing so
it is possible that the user can successfully import data into Hive but they get an error during a Hive read operation later.
It does not use any compression by default but supports snappy and bzip codecs.
@@ -487,6 +466,8 @@ $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-par
--parquet-configurator-implementation hadoop --hs2-url "jdbc:hive2://hs2.foo.com:10000" --hs2-keytab "/path/to/keytab"
----
+Note that +\--parquet-configurator-implementation hadoop+ is now optional.
+
Enabling Logical Types in Avro and Parquet import for numbers
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/SqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index cc1b752..f97dbfd 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -53,7 +53,7 @@ import org.apache.sqoop.util.RandomHash;
import org.apache.sqoop.util.StoredAsProperty;
import static org.apache.sqoop.Sqoop.SQOOP_RETHROW_PROPERTY;
-import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
+import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP;
import static org.apache.sqoop.orm.ClassWriter.toJavaIdentifier;
/**
@@ -1161,7 +1161,7 @@ public class SqoopOptions implements Cloneable {
// set escape column mapping to true
this.escapeColumnMappingEnabled = true;
- this.parquetConfiguratorImplementation = KITE;
+ this.parquetConfiguratorImplementation = HADOOP;
}
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java
index 050c854..c6b576d 100644
--- a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java
@@ -19,14 +19,13 @@
package org.apache.sqoop.mapreduce.parquet;
import org.apache.sqoop.mapreduce.parquet.hadoop.HadoopParquetJobConfiguratorFactory;
-import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory;
/**
* An enum containing all the implementations available for {@link ParquetJobConfiguratorFactory}.
* The enumeration constants are also used to instantiate concrete {@link ParquetJobConfiguratorFactory} objects.
*/
public enum ParquetJobConfiguratorImplementation {
- KITE(KiteParquetJobConfiguratorFactory.class), HADOOP(HadoopParquetJobConfiguratorFactory.class);
+ HADOOP(HadoopParquetJobConfiguratorFactory.class);
private Class<? extends ParquetJobConfiguratorFactory> configuratorFactoryClass;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java
deleted file mode 100644
index 02816d7..0000000
--- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.sqoop.mapreduce.MergeParquetReducer;
-
-import java.io.IOException;
-
-/**
- * An implementation of {@link MergeParquetReducer} which depends on the Kite Dataset API.
- */
-public class KiteMergeParquetReducer extends MergeParquetReducer<GenericRecord, NullWritable> {
-
- @Override
- protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
- context.write(record, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java
deleted file mode 100644
index 6ebc5a3..0000000
--- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
-import org.apache.sqoop.util.FileSystemUtil;
-import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
-
-import java.io.IOException;
-
-/**
- * An implementation of {@link ParquetExportJobConfigurator} which depends on the Kite Dataset API.
- */
-public class KiteParquetExportJobConfigurator implements ParquetExportJobConfigurator {
-
- @Override
- public void configureInputFormat(Job job, Path inputPath) throws IOException {
- String uri = "dataset:" + FileSystemUtil.makeQualified(inputPath, job.getConfiguration());
- DatasetKeyInputFormat.configure(job).readFrom(uri);
- }
-
- @Override
- public Class<? extends Mapper> getMapperClass() {
- return KiteParquetExportMapper.class;
- }
-
- @Override
- public Class<? extends InputFormat> getInputFormatClass() {
- return DatasetKeyInputFormat.class;
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java
deleted file mode 100644
index 122ff3f..0000000
--- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.sqoop.mapreduce.GenericRecordExportMapper;
-
-import java.io.IOException;
-
-/**
- * An implementation of {@link GenericRecordExportMapper} which depends on the Kite Dataset API.
- */
-public class KiteParquetExportMapper extends GenericRecordExportMapper<GenericRecord, NullWritable> {
-
- @Override
- protected void map(GenericRecord key, NullWritable val, Context context) throws IOException, InterruptedException {
- context.write(toSqoopRecord(key), NullWritable.get());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java
deleted file mode 100644
index 7e179a2..0000000
--- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.avro.Schema;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.sqoop.SqoopOptions;
-import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
-import org.apache.sqoop.util.FileSystemUtil;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
-
-import java.io.IOException;
-
-/**
- * An implementation of {@link ParquetImportJobConfigurator} which depends on the Kite Dataset API.
- */
-public class KiteParquetImportJobConfigurator implements ParquetImportJobConfigurator {
-
- public static final Log LOG = LogFactory.getLog(KiteParquetImportJobConfigurator.class.getName());
-
- @Override
- public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException {
- JobConf conf = (JobConf) job.getConfiguration();
- String uri = getKiteUri(conf, options, tableName, destination);
- KiteParquetUtils.WriteMode writeMode;
-
- if (options.doHiveImport()) {
- if (options.doOverwriteHiveTable()) {
- writeMode = KiteParquetUtils.WriteMode.OVERWRITE;
- } else {
- writeMode = KiteParquetUtils.WriteMode.APPEND;
- if (Datasets.exists(uri)) {
- LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " +
- "append data into the existing Hive table. Consider using " +
- "--hive-overwrite, if you do NOT intend to do appending.");
- }
- }
- } else {
- // Note that there is no such an import argument for overwriting HDFS
- // dataset, so overwrite mode is not supported yet.
- // Sqoop's append mode means to merge two independent datasets. We
- // choose DEFAULT as write mode.
- writeMode = KiteParquetUtils.WriteMode.DEFAULT;
- }
- KiteParquetUtils.configureImportJob(conf, schema, uri, writeMode);
- }
-
- @Override
- public Class<? extends Mapper> getMapperClass() {
- return KiteParquetImportMapper.class;
- }
-
- @Override
- public Class<? extends OutputFormat> getOutputFormatClass() {
- return DatasetKeyOutputFormat.class;
- }
-
- @Override
- public boolean isHiveImportNeeded() {
- return false;
- }
-
- private String getKiteUri(Configuration conf, SqoopOptions options, String tableName, Path destination) throws IOException {
- if (options.doHiveImport()) {
- String hiveDatabase = options.getHiveDatabaseName() == null ? "default" :
- options.getHiveDatabaseName();
- String hiveTable = options.getHiveTableName() == null ? tableName :
- options.getHiveTableName();
- return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
- } else {
- return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java
deleted file mode 100644
index 0a91e4a..0000000
--- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.sqoop.avro.AvroUtil;
-import org.apache.sqoop.lib.LargeObjectLoader;
-import org.apache.sqoop.mapreduce.ParquetImportMapper;
-
-import java.io.IOException;
-
-import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
-
-/**
- * An implementation of {@link ParquetImportMapper} which depends on the Kite Dataset API.
- */
-public class KiteParquetImportMapper extends ParquetImportMapper<GenericRecord, Void> {
-
- @Override
- protected LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
- Path workPath = new Path(conf.get("sqoop.kite.lob.extern.dir", "/tmp/sqoop-parquet-" + context.getTaskAttemptID()));
- return new LargeObjectLoader(conf, workPath);
- }
-
- @Override
- protected Schema getAvroSchema(Configuration configuration) {
- String schemaString = configuration.get(SQOOP_PARQUET_AVRO_SCHEMA_KEY);
- return AvroUtil.parseAvroSchema(schemaString);
- }
-
- @Override
- protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
- context.write(record, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java
deleted file mode 100644
index bd07c09..0000000
--- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
-import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
-import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
-import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
-
-/**
- * A concrete factory implementation which produces configurator objects using the Kite Dataset API.
- */
-public class KiteParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory {
-
- @Override
- public ParquetImportJobConfigurator createParquetImportJobConfigurator() {
- return new KiteParquetImportJobConfigurator();
- }
-
- @Override
- public ParquetExportJobConfigurator createParquetExportJobConfigurator() {
- return new KiteParquetExportJobConfigurator();
- }
-
- @Override
- public ParquetMergeJobConfigurator createParquetMergeJobConfigurator() {
- return new KiteParquetMergeJobConfigurator();
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java
deleted file mode 100644
index ed045cd..0000000
--- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.sqoop.mapreduce.MergeParquetMapper;
-import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Formats;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
-import parquet.avro.AvroParquetInputFormat;
-import parquet.avro.AvroSchemaConverter;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetFileReader;
-import parquet.schema.MessageType;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
-
-/**
- * An implementation of {@link ParquetMergeJobConfigurator} which depends on the Kite Dataset API.
- */
-public class KiteParquetMergeJobConfigurator implements ParquetMergeJobConfigurator {
-
- public static final Log LOG = LogFactory.getLog(KiteParquetMergeJobConfigurator.class.getName());
-
- @Override
- public void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
- Path finalPath) throws IOException {
- try {
- FileSystem fileSystem = finalPath.getFileSystem(conf);
- LOG.info("Trying to merge parquet files");
- job.setOutputKeyClass(GenericRecord.class);
- job.setMapperClass(MergeParquetMapper.class);
- job.setReducerClass(KiteMergeParquetReducer.class);
- job.setOutputValueClass(NullWritable.class);
-
- List<Footer> footers = new ArrayList<Footer>();
- FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath);
- FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath);
- footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true));
- footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true));
-
- MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
- AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
- Schema avroSchema = avroSchemaConverter.convert(schema);
-
- if (!fileSystem.exists(finalPath)) {
- Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath);
- DatasetKeyOutputFormat.configure(job).overwrite(dataset);
- } else {
- DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath));
- }
-
- job.setInputFormatClass(AvroParquetInputFormat.class);
- AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
-
- conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, avroSchema.toString());
- Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class;
-
- job.setOutputFormatClass(outClass);
- } catch (Exception cnfe) {
- throw new IOException(cnfe);
- }
- }
-
- public static Dataset createDataset(Schema schema, String uri) {
- DatasetDescriptor descriptor =
- new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build();
- return Datasets.create(uri, descriptor, GenericRecord.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java
deleted file mode 100644
index a4768c9..0000000
--- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.sqoop.avro.AvroSchemaMismatchException;
-import org.apache.sqoop.hive.HiveConfig;
-import org.kitesdk.data.CompressionType;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Formats;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
-import org.kitesdk.data.spi.SchemaValidationUtil;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-
-import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
-import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
-
-/**
- * Helper class using the Kite Dataset API for setting up a Parquet MapReduce job.
- */
-public final class KiteParquetUtils {
-
- public static final Log LOG = LogFactory.getLog(KiteParquetUtils.class.getName());
-
- public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
-
- public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled";
- // Purposefully choosing the same token alias as the one Oozie chooses.
- // Make sure we don't generate a new delegation token if oozie
- // has already generated one.
- public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token";
-
- public static final String INCOMPATIBLE_AVRO_SCHEMA_MSG = "Target dataset was created with an incompatible Avro schema. ";
-
- public static final String HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG = "You tried to import to an already existing Hive table in " +
- "Parquet format. Sqoop maps date/timestamp SQL types to int/bigint Hive types during Hive Parquet import" +
- " but it is possible that date/timestamp types were mapped to strings during table" +
- " creation. Consider using Sqoop option --map-column-java resolve the mismatch" +
- " (e.g. --map-column-java date_field1=String,timestamp_field1=String).";
-
- private static final String HIVE_URI_PREFIX = "dataset:hive";
-
- private KiteParquetUtils() {
- }
-
- public enum WriteMode {
- DEFAULT, APPEND, OVERWRITE
- };
-
- public static CompressionType getCompressionType(Configuration conf) {
- CompressionType defaults = Formats.PARQUET.getDefaultCompressionType();
- String codec = conf.get(SQOOP_PARQUET_OUTPUT_CODEC_KEY, defaults.getName());
- try {
- return CompressionType.forName(codec);
- } catch (IllegalArgumentException ex) {
- LOG.warn(String.format(
- "Unsupported compression type '%s'. Fallback to '%s'.",
- codec, defaults));
- }
- return defaults;
- }
-
- /**
- * Configure the import job. The import process will use a Kite dataset to
- * write data records into Parquet format internally. The input key class is
- * {@link org.apache.sqoop.lib.SqoopRecord}. The output key is
- * {@link org.apache.avro.generic.GenericRecord}.
- */
- public static void configureImportJob(JobConf conf, Schema schema,
- String uri, WriteMode writeMode) throws IOException {
- Dataset dataset;
-
- // Add hive delegation token only if we don't already have one.
- if (isHiveImport(uri)) {
- Configuration hiveConf = HiveConfig.getHiveConf(conf);
- if (isSecureMetastore(hiveConf)) {
- // Copy hive configs to job config
- HiveConfig.addHiveConfigs(hiveConf, conf);
-
- if (conf.getCredentials().getToken(new Text(HIVE_METASTORE_TOKEN_ALIAS)) == null) {
- addHiveDelegationToken(conf);
- }
- }
- }
-
- if (Datasets.exists(uri)) {
- if (WriteMode.DEFAULT.equals(writeMode)) {
- throw new IOException("Destination exists! " + uri);
- }
-
- dataset = Datasets.load(uri);
- Schema writtenWith = dataset.getDescriptor().getSchema();
- if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
- String exceptionMessage = buildAvroSchemaMismatchMessage(isHiveImport(uri));
- throw new AvroSchemaMismatchException(exceptionMessage, writtenWith, schema);
- }
- } else {
- dataset = createDataset(schema, getCompressionType(conf), uri);
- }
- conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, schema.toString());
-
- DatasetKeyOutputFormat.ConfigBuilder builder =
- DatasetKeyOutputFormat.configure(conf);
- if (WriteMode.OVERWRITE.equals(writeMode)) {
- builder.overwrite(dataset);
- } else if (WriteMode.APPEND.equals(writeMode)) {
- builder.appendTo(dataset);
- } else {
- builder.writeTo(dataset);
- }
- }
-
- private static boolean isHiveImport(String importUri) {
- return importUri.startsWith(HIVE_URI_PREFIX);
- }
-
- public static Dataset createDataset(Schema schema,
- CompressionType compressionType, String uri) {
- DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
- .schema(schema)
- .format(Formats.PARQUET)
- .compressionType(compressionType)
- .build();
- return Datasets.create(uri, descriptor, GenericRecord.class);
- }
-
- private static boolean isSecureMetastore(Configuration conf) {
- return conf != null && conf.getBoolean(HIVE_METASTORE_SASL_ENABLED, false);
- }
-
- /**
- * Add hive delegation token to credentials store.
- * @param conf
- */
- private static void addHiveDelegationToken(JobConf conf) {
- // Need to use reflection since there's no compile time dependency on the client libs.
- Class<?> HiveConfClass;
- Class<?> HiveMetaStoreClientClass;
-
- try {
- HiveMetaStoreClientClass = Class.forName(HIVE_METASTORE_CLIENT_CLASS);
- } catch (ClassNotFoundException ex) {
- LOG.error("Could not load " + HIVE_METASTORE_CLIENT_CLASS
- + " when adding hive delegation token. "
- + "Make sure HIVE_CONF_DIR is set correctly.", ex);
- throw new RuntimeException("Couldn't fetch delegation token.", ex);
- }
-
- try {
- HiveConfClass = Class.forName(HiveConfig.HIVE_CONF_CLASS);
- } catch (ClassNotFoundException ex) {
- LOG.error("Could not load " + HiveConfig.HIVE_CONF_CLASS
- + " when adding hive delegation token."
- + " Make sure HIVE_CONF_DIR is set correctly.", ex);
- throw new RuntimeException("Couldn't fetch delegation token.", ex);
- }
-
- try {
- Object client = HiveMetaStoreClientClass.getConstructor(HiveConfClass).newInstance(
- HiveConfClass.getConstructor(Configuration.class, Class.class).newInstance(conf, Configuration.class)
- );
- // getDelegationToken(String kerberosPrincial)
- Method getDelegationTokenMethod = HiveMetaStoreClientClass.getMethod("getDelegationToken", String.class);
- Object tokenStringForm = getDelegationTokenMethod.invoke(client, UserGroupInformation.getLoginUser().getShortUserName());
-
- // Load token
- Token<DelegationTokenIdentifier> metastoreToken = new Token<DelegationTokenIdentifier>();
- metastoreToken.decodeFromUrlString(tokenStringForm.toString());
- conf.getCredentials().addToken(new Text(HIVE_METASTORE_TOKEN_ALIAS), metastoreToken);
-
- LOG.debug("Successfully fetched hive metastore delegation token. " + metastoreToken);
- } catch (Exception ex) {
- LOG.error("Couldn't fetch delegation token.", ex);
- throw new RuntimeException("Couldn't fetch delegation token.", ex);
- }
- }
-
- private static String buildAvroSchemaMismatchMessage(boolean hiveImport) {
- String exceptionMessage = INCOMPATIBLE_AVRO_SCHEMA_MSG;
-
- if (hiveImport) {
- exceptionMessage += HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG;
- }
-
- return exceptionMessage;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index 87fc5e9..9dcbdd5 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -21,7 +21,6 @@ package org.apache.sqoop.tool;
import static java.lang.String.format;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY;
-import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.valueOf;
import java.io.File;
@@ -1587,15 +1586,6 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
+ "importing into SequenceFile format.");
}
- // Hive import and create hive table not compatible for ParquetFile format when using Kite
- if (options.doHiveImport()
- && options.doFailIfHiveTableExists()
- && options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile
- && options.getParquetConfiguratorImplementation() == KITE) {
- throw new InvalidOptionsException("Hive import and create hive table is not compatible with "
- + "importing into ParquetFile format using Kite.");
- }
-
if (options.doHiveImport()
&& options.getIncrementalMode().equals(IncrementalMode.DateLastModified)) {
throw new InvalidOptionsException(HIVE_IMPORT_WITH_LASTMODIFIED_NOT_SUPPORTED);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/test/org/apache/sqoop/TestMerge.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestMerge.java b/src/test/org/apache/sqoop/TestMerge.java
index 2b3280a..b283174 100644
--- a/src/test/org/apache/sqoop/TestMerge.java
+++ b/src/test/org/apache/sqoop/TestMerge.java
@@ -27,7 +27,6 @@ import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
-import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation;
import org.apache.sqoop.testutil.CommonArgs;
import org.apache.sqoop.testutil.HsqldbTestServer;
import org.apache.sqoop.manager.ConnManager;
@@ -54,8 +53,6 @@ import org.apache.sqoop.util.ParquetReader;
import org.junit.Before;
import org.junit.Test;
-import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP;
-import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
import static org.junit.Assert.fail;
/**
@@ -84,8 +81,6 @@ public class TestMerge extends BaseSqoopTestCase {
Arrays.asList(new Integer(1), new Integer(43)),
Arrays.asList(new Integer(3), new Integer(313)));
- private ParquetJobConfiguratorImplementation parquetJobConfiguratorImplementation = KITE;
-
@Before
public void setUp() {
super.setUp();
@@ -118,7 +113,6 @@ public class TestMerge extends BaseSqoopTestCase {
public SqoopOptions getSqoopOptions(Configuration conf) {
SqoopOptions options = new SqoopOptions(conf);
options.setConnectString(HsqldbTestServer.getDbUrl());
- options.setParquetConfiguratorImplementation(parquetJobConfiguratorImplementation);
return options;
}
@@ -164,14 +158,7 @@ public class TestMerge extends BaseSqoopTestCase {
}
@Test
- public void testParquetFileMergeHadoop() throws Exception {
- parquetJobConfiguratorImplementation = HADOOP;
- runMergeTest(SqoopOptions.FileLayout.ParquetFile);
- }
-
- @Test
- public void testParquetFileMergeKite() throws Exception {
- parquetJobConfiguratorImplementation = KITE;
+ public void testParquetFileMerge() throws Exception {
runMergeTest(SqoopOptions.FileLayout.ParquetFile);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/test/org/apache/sqoop/TestParquetExport.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestParquetExport.java b/src/test/org/apache/sqoop/TestParquetExport.java
index 0fab188..be1d816 100644
--- a/src/test/org/apache/sqoop/TestParquetExport.java
+++ b/src/test/org/apache/sqoop/TestParquetExport.java
@@ -18,9 +18,6 @@
package org.apache.sqoop;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.sqoop.testutil.ExportJobTestCase;
import com.google.common.collect.Lists;
@@ -32,8 +29,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import parquet.avro.AvroParquetWriter;
import java.io.IOException;
@@ -44,7 +39,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.UUID;
@@ -58,23 +52,11 @@ import static parquet.hadoop.metadata.CompressionCodecName.SNAPPY;
/**
* Test that we can export Parquet Data Files from HDFS into databases.
*/
-@RunWith(Parameterized.class)
public class TestParquetExport extends ExportJobTestCase {
- @Parameterized.Parameters(name = "parquetImplementation = {0}")
- public static Iterable<? extends Object> parquetImplementationParameters() {
- return Arrays.asList("kite", "hadoop");
- }
-
@Rule
public ExpectedException thrown = ExpectedException.none();
- private final String parquetImplementation;
-
- public TestParquetExport(String parquetImplementation) {
- this.parquetImplementation = parquetImplementation;
- }
-
/**
* @return an argv for the CodeGenTool to use when creating tables to export.
*/
@@ -144,8 +126,6 @@ public class TestParquetExport extends ExportJobTestCase {
/**
* Create a data file that gets exported to the db.
- * Sqoop uses Kite to export Parquet files so it requires a Kite metadata directory to be present next to the files
- * but since we do not use Kite in our test cases anymore we generate the .metadata directory here.
* @param numRecords how many records to write to the file.
*/
protected void createParquetFile(int numRecords,
@@ -153,7 +133,6 @@ public class TestParquetExport extends ExportJobTestCase {
Schema schema = buildSchema(extraCols);
- createMetadataDir(schema);
String fileName = UUID.randomUUID().toString() + ".parquet";
Path filePath = new Path(getTablePath(), fileName);
try (AvroParquetWriter parquetWriter = new AvroParquetWriter(filePath, schema, SNAPPY, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE)) {
@@ -167,25 +146,6 @@ public class TestParquetExport extends ExportJobTestCase {
}
}
- private void createMetadataDir(Schema schema) throws IOException {
- final String descriptorFileTemplate = "location=file\\:%s\n" +
- " version=1\n" +
- " compressionType=snappy\n" +
- " format=parquet\n";
- Path metadataDirPath = new Path(getTablePath(), ".metadata");
- Path schemaFile = new Path(metadataDirPath, "schema.avsc");
- Path descriptorFile = new Path(metadataDirPath, "descriptor.properties");
- FileSystem fileSystem = getTablePath().getFileSystem(new Configuration());
- fileSystem.mkdirs(metadataDirPath);
-
- try (FSDataOutputStream fileOs = fileSystem.create(schemaFile)) {
- fileOs.write(schema.toString().getBytes());
- }
- try (FSDataOutputStream fileOs = fileSystem.create(descriptorFile)) {
- fileOs.write(String.format(descriptorFileTemplate, getTablePath()).getBytes());
- }
- }
-
private Schema buildSchema(ColumnGenerator... extraCols) {
List<Field> fields = new ArrayList<Field>();
fields.add(buildField("id", Schema.Type.INT));
@@ -492,11 +452,4 @@ public class TestParquetExport extends ExportJobTestCase {
thrown.reportMissingExceptionWithMessage("Expected Exception on missing Parquet fields");
runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
}
-
- @Override
- protected Configuration getConf() {
- Configuration conf = super.getConf();
- conf.set("parquetjob.configurator.implementation", parquetImplementation);
- return conf;
- }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/test/org/apache/sqoop/TestParquetImport.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestParquetImport.java b/src/test/org/apache/sqoop/TestParquetImport.java
index b1488e8..2810e31 100644
--- a/src/test/org/apache/sqoop/TestParquetImport.java
+++ b/src/test/org/apache/sqoop/TestParquetImport.java
@@ -18,7 +18,6 @@
package org.apache.sqoop;
-import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.testutil.CommonArgs;
import org.apache.sqoop.testutil.HsqldbTestServer;
import org.apache.sqoop.testutil.ImportJobTestCase;
@@ -31,9 +30,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.util.ParquetReader;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -48,32 +44,15 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
/**
* Tests --as-parquetfile.
*/
-@RunWith(Parameterized.class)
public class TestParquetImport extends ImportJobTestCase {
public static final Log LOG = LogFactory
.getLog(TestParquetImport.class.getName());
- private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE = "kite";
-
- private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP = "hadoop";
-
- @Parameters(name = "parquetImplementation = {0}")
- public static Iterable<? extends Object> parquetImplementationParameters() {
- return Arrays.asList(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE, PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP);
- }
-
- private final String parquetImplementation;
-
- public TestParquetImport(String parquetImplementation) {
- this.parquetImplementation = parquetImplementation;
- }
-
/**
* Create the argv to pass to Sqoop.
*
@@ -136,27 +115,17 @@ public class TestParquetImport extends ImportJobTestCase {
}
@Test
- public void testHadoopGzipCompression() throws IOException {
- assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation));
+ public void testGzipCompression() throws IOException {
runParquetImportTest("gzip");
}
- @Test
- public void testKiteDeflateCompression() throws IOException {
- assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE.equals(parquetImplementation));
- // The current Kite-based Parquet writing implementation uses GZIP compression codec when Deflate is specified.
- // See: org.kitesdk.data.spi.filesystem.ParquetAppender.getCompressionCodecName()
- runParquetImportTest("deflate", "gzip");
- }
-
/**
* This test case is added to document that the deflate codec is not supported with
* the Hadoop Parquet implementation so Sqoop throws an exception when it is specified.
* @throws IOException
*/
@Test(expected = IOException.class)
- public void testHadoopDeflateCompression() throws IOException {
- assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation));
+ public void testDeflateCompression() throws IOException {
runParquetImportTest("deflate");
}
@@ -334,11 +303,4 @@ public class TestParquetImport extends ImportJobTestCase {
assertEquals(Type.NULL, field.schema().getTypes().get(0).getType());
assertEquals(type, field.schema().getTypes().get(1).getType());
}
-
- @Override
- protected Configuration getConf() {
- Configuration conf = super.getConf();
- conf.set("parquetjob.configurator.implementation", parquetImplementation);
- return conf;
- }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/test/org/apache/sqoop/hive/TestHiveImport.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/hive/TestHiveImport.java b/src/test/org/apache/sqoop/hive/TestHiveImport.java
index 436f0e5..a6c8e10 100644
--- a/src/test/org/apache/sqoop/hive/TestHiveImport.java
+++ b/src/test/org/apache/sqoop/hive/TestHiveImport.java
@@ -23,20 +23,12 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.List;
-import org.apache.sqoop.Sqoop;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.sqoop.avro.AvroSchemaMismatchException;
-import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetUtils;
-import org.apache.sqoop.util.ParquetReader;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -54,7 +46,6 @@ import org.apache.sqoop.tool.SqoopTool;
import org.apache.commons.cli.ParseException;
import org.junit.rules.ExpectedException;
-import static java.util.Collections.sort;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -284,54 +275,6 @@ public class TestHiveImport extends ImportJobTestCase {
getArgv(false, null), new ImportTool());
}
- /** Test that strings and ints are handled in the normal fashion as parquet
- * file. */
- @Test
- public void testNormalHiveImportAsParquet() throws IOException {
- final String TABLE_NAME = "normal_hive_import_as_parquet";
- setCurTableName(TABLE_NAME);
- setNumCols(3);
- String [] types = getTypes();
- String [] vals = { "'test'", "42", "'somestring'" };
- String [] extraArgs = {"--as-parquetfile"};
-
- runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs),
- new ImportTool());
- verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
- }
-
- private void verifyHiveDataset(Object[][] valsArray) {
- List<String> expected = getExpectedLines(valsArray);
- List<String> result = new ParquetReader(getTablePath()).readAllInCsv();
-
- sort(expected);
- sort(result);
-
- assertEquals(expected, result);
- }
-
- private List<String> getExpectedLines(Object[][] valsArray) {
- List<String> expectations = new ArrayList<>();
- if (valsArray != null) {
- for (Object[] vals : valsArray) {
- expectations.add(toCsv(vals));
- }
- }
- return expectations;
- }
-
- private String toCsv(Object[] vals) {
- StringBuilder result = new StringBuilder();
-
- for (Object val : vals) {
- result.append(val).append(",");
- }
-
- result.deleteCharAt(result.length() - 1);
-
- return result.toString();
- }
-
/** Test that table is created in hive with no data import. */
@Test
public void testCreateOnlyHiveImport() throws IOException {
@@ -365,108 +308,6 @@ public class TestHiveImport extends ImportJobTestCase {
new CreateHiveTableTool());
}
- /**
- * Test that table is created in hive and replaces the existing table if
- * any.
- */
- @Test
- public void testCreateOverwriteHiveImportAsParquet() throws IOException {
- final String TABLE_NAME = "create_overwrite_hive_import_as_parquet";
- setCurTableName(TABLE_NAME);
- setNumCols(3);
- String [] types = getTypes();
- String [] vals = { "'test'", "42", "'somestring'" };
- String [] extraArgs = {"--as-parquetfile"};
- ImportTool tool = new ImportTool();
-
- runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs), tool);
- verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
-
- String [] valsToOverwrite = { "'test2'", "24", "'somestring2'" };
- String [] extraArgsForOverwrite = {"--as-parquetfile", "--hive-overwrite"};
- runImportTest(TABLE_NAME, types, valsToOverwrite, "",
- getArgv(false, extraArgsForOverwrite), tool);
- verifyHiveDataset(new Object[][] {{"test2", 24, "somestring2"}});
- }
-
- @Test
- public void testHiveImportAsParquetWhenTableExistsWithIncompatibleSchema() throws Exception {
- final String TABLE_NAME = "HIVE_IMPORT_AS_PARQUET_EXISTING_TABLE";
- setCurTableName(TABLE_NAME);
- setNumCols(3);
-
- String [] types = { "VARCHAR(32)", "INTEGER", "DATE" };
- String [] vals = { "'test'", "42", "'2009-12-31'" };
- String [] extraArgs = {"--as-parquetfile"};
-
- createHiveDataSet(TABLE_NAME);
-
- createTableWithColTypes(types, vals);
-
- thrown.expect(AvroSchemaMismatchException.class);
- thrown.expectMessage(KiteParquetUtils.INCOMPATIBLE_AVRO_SCHEMA_MSG + KiteParquetUtils.HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG);
-
- SqoopOptions sqoopOptions = getSqoopOptions(getConf());
- sqoopOptions.setThrowOnError(true);
- Sqoop sqoop = new Sqoop(new ImportTool(), getConf(), sqoopOptions);
- sqoop.run(getArgv(false, extraArgs));
-
- }
-
- private void createHiveDataSet(String tableName) {
- Schema dataSetSchema = SchemaBuilder
- .record(tableName)
- .fields()
- .name(getColName(0)).type().nullable().stringType().noDefault()
- .name(getColName(1)).type().nullable().stringType().noDefault()
- .name(getColName(2)).type().nullable().stringType().noDefault()
- .endRecord();
- String dataSetUri = "dataset:hive:/default/" + tableName;
- KiteParquetUtils.createDataset(dataSetSchema, KiteParquetUtils.getCompressionType(new Configuration()), dataSetUri);
- }
-
- /**
- * Test that records are appended to an existing table.
- */
- @Test
- public void testAppendHiveImportAsParquet() throws IOException {
- final String TABLE_NAME = "append_hive_import_as_parquet";
- setCurTableName(TABLE_NAME);
- setNumCols(3);
- String [] types = getTypes();
- String [] vals = { "'test'", "42", "'somestring'" };
- String [] extraArgs = {"--as-parquetfile"};
- String [] args = getArgv(false, extraArgs);
- ImportTool tool = new ImportTool();
-
- runImportTest(TABLE_NAME, types, vals, "", args, tool);
- verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
-
- String [] valsToAppend = { "'test2'", "4242", "'somestring2'" };
- runImportTest(TABLE_NAME, types, valsToAppend, "", args, tool);
- verifyHiveDataset(new Object[][] {
- {"test2", 4242, "somestring2"}, {"test", 42, "somestring"}});
- }
-
- /**
- * Test hive create and --as-parquetfile options validation.
- */
- @Test
- public void testCreateHiveImportAsParquet() throws ParseException, InvalidOptionsException {
- final String TABLE_NAME = "CREATE_HIVE_IMPORT_AS_PARQUET";
- setCurTableName(TABLE_NAME);
- setNumCols(3);
- String [] extraArgs = {"--as-parquetfile", "--create-hive-table"};
- ImportTool tool = new ImportTool();
-
- thrown.expect(InvalidOptionsException.class);
- thrown.reportMissingExceptionWithMessage("Expected InvalidOptionsException during Hive table creation with " +
- "--as-parquetfile");
- tool.validateOptions(tool.parseArguments(getArgv(false, extraArgs), null,
- null, true));
- }
-
-
/** Test that dates are coerced properly to strings. */
@Test
public void testDate() throws IOException {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java b/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java
index dbda8b7..5571b25 100644
--- a/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java
+++ b/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java
@@ -28,7 +28,6 @@ import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP;
-import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
@@ -124,7 +123,7 @@ public class TestBaseSqoopTool {
public void testApplyCommonOptionsThrowsWhenInvalidParquetJobConfigurationImplementationIsSet() throws Exception {
when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn("this_is_definitely_not_valid");
- exception.expectMessage("Invalid Parquet job configurator implementation is set: this_is_definitely_not_valid. Supported values are: [KITE, HADOOP]");
+ exception.expectMessage("Invalid Parquet job configurator implementation is set: this_is_definitely_not_valid. Supported values are: [HADOOP]");
testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
}
@@ -132,6 +131,6 @@ public class TestBaseSqoopTool {
public void testApplyCommonOptionsDoesNotChangeDefaultParquetJobConfigurationImplementationWhenNothingIsSet() throws Exception {
testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
- assertEquals(KITE, testSqoopOptions.getParquetConfiguratorImplementation());
+ assertEquals(HADOOP, testSqoopOptions.getParquetConfiguratorImplementation());
}
}