You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2016/02/05 21:51:28 UTC
sqoop git commit: SQOOP-2788: Sqoop2: Parquet support for
HdfsConnector
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 2f4da466e -> 55d1db2ba
SQOOP-2788: Sqoop2: Parquet support for HdfsConnector
(Abraham Fine via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/55d1db2b
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/55d1db2b
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/55d1db2b
Branch: refs/heads/sqoop2
Commit: 55d1db2ba3cdd45e60daeb18ef5529f2be282f1f
Parents: 2f4da46
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Feb 5 12:51:08 2016 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Feb 5 12:51:08 2016 -0800
----------------------------------------------------------------------
...pache.sqoop.connector-classloader.properties | 2 +
connector/connector-hdfs/pom.xml | 10 +
.../sqoop/connector/hdfs/HdfsExtractor.java | 87 ++++++++-
.../apache/sqoop/connector/hdfs/HdfsLoader.java | 28 ++-
.../connector/hdfs/configuration/ToFormat.java | 5 +
.../hdfs/hdfsWriter/GenericHdfsWriter.java | 3 +-
.../hdfs/hdfsWriter/HdfsParquetWriter.java | 66 +++++++
.../hdfs/hdfsWriter/HdfsSequenceWriter.java | 5 +-
.../hdfs/hdfsWriter/HdfsTextWriter.java | 3 +-
.../apache/sqoop/connector/hdfs/TestLoader.java | 109 +++++++++--
.../sqoop/connector/common/SqoopAvroUtils.java | 3 +-
.../idf/AVROIntermediateDataFormat.java | 14 +-
pom.xml | 11 ++
test/pom.xml | 10 +
.../connector/hdfs/NullValueTest.java | 86 +++++++--
.../integration/connector/hdfs/ParquetTest.java | 183 +++++++++++++++++++
16 files changed, 577 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/common/src/main/resources/org.apache.sqoop.connector-classloader.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/org.apache.sqoop.connector-classloader.properties b/common/src/main/resources/org.apache.sqoop.connector-classloader.properties
index c0082cc..0311f88 100644
--- a/common/src/main/resources/org.apache.sqoop.connector-classloader.properties
+++ b/common/src/main/resources/org.apache.sqoop.connector-classloader.properties
@@ -52,6 +52,8 @@ system.classes.default=java.,\
org.apache.log4j.,\
org.apache.sqoop.,\
-org.apache.sqoop.connector.,\
+ org.apache.avro.,\
+ org.codehaus.jackson.,\
org.xerial.snappy.,\
sqoop.properties,\
sqoop_bootstrap.properties
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml
index 5996314..37cf3fa 100644
--- a/connector/connector-hdfs/pom.xml
+++ b/connector/connector-hdfs/pom.xml
@@ -73,6 +73,16 @@ limitations under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index 9ef2a05..5973463 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -19,10 +19,14 @@ package org.apache.sqoop.connector.hdfs;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
@@ -33,13 +37,18 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.util.LineReader;
import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.SqoopIDFUtils;
import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.job.etl.Extractor;
@@ -55,6 +64,10 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);
+ // the sequence of bytes that appears at the beginning and end of every
+ // parquet file
+ private static final byte[] PARQUET_MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
+
private Configuration conf = new Configuration();
private DataWriter dataWriter;
private Schema schema;
@@ -85,7 +98,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
private void extractFile(LinkConfiguration linkConfiguration,
FromJobConfiguration fromJobConfiguration,
Path file, long start, long length, String[] locations)
- throws IOException {
+ throws IOException, InterruptedException {
long end = start + length;
LOG.info("Extracting file " + file);
LOG.info("\t from offset " + start);
@@ -93,8 +106,10 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
LOG.info("\t of length " + length);
if(isSequenceFile(file)) {
extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
- } else {
- extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
+ } else if(isParquetFile(file)) {
+ extractParquetFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
+ } else {
+ extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length);
}
}
@@ -136,7 +151,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
@SuppressWarnings("resource")
private void extractTextFile(LinkConfiguration linkConfiguration,
FromJobConfiguration fromJobConfiguration,
- Path file, long start, long length, String[] locations)
+ Path file, long start, long length)
throws IOException {
LOG.info("Extracting text file");
long end = start + length;
@@ -185,6 +200,35 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
filestream.close();
}
+ private void extractParquetFile(LinkConfiguration linkConfiguration,
+ FromJobConfiguration fromJobConfiguration,
+ Path file, long start, long length,
+ String[] locations) throws IOException, InterruptedException {
+ // Parquet does not expose a way to directly deal with file splits
+ // except through the ParquetInputFormat (ParquetInputSplit is @private)
+ FileSplit fileSplit = new FileSplit(file, start, length, locations);
+ conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, AvroReadSupport.class.getName());
+ ParquetInputFormat parquetInputFormat = new ParquetInputFormat();
+
+ // ParquetReader needs a TaskAttemptContext to pass through the
+ // configuration object.
+ TaskAttemptContext taskAttemptContext = new SqoopTaskAttemptContext(conf);
+
+ RecordReader<Void, GenericRecord> recordReader = parquetInputFormat.createRecordReader(fileSplit, taskAttemptContext);
+ recordReader.initialize(fileSplit, taskAttemptContext);
+
+ AVROIntermediateDataFormat idf = new AVROIntermediateDataFormat(schema);
+ while (recordReader.nextKeyValue() != false) {
+ GenericRecord record = recordReader.getCurrentValue();
+ rowsRead++;
+ if (schema instanceof ByteArraySchema) {
+ dataWriter.writeArrayRecord(new Object[]{idf.toObject(record)});
+ } else {
+ dataWriter.writeArrayRecord(idf.toObject(record));
+ }
+ }
+ }
+
@Override
public long getRowsRead() {
return rowsRead;
@@ -207,6 +251,41 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
return true;
}
+ private boolean isParquetFile(Path file) {
+ try {
+ FileSystem fileSystem = file.getFileSystem(conf);
+ FileStatus fileStatus = fileSystem.getFileStatus(file);
+ FSDataInputStream fsDataInputStream = fileSystem.open(file);
+
+ long fileLength = fileStatus.getLen();
+
+ byte[] fileStart = new byte[PARQUET_MAGIC.length];
+ fsDataInputStream.readFully(fileStart);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.error("file start: " + new String(fileStart, Charset.forName("ASCII")));
+ }
+
+ if (!Arrays.equals(fileStart, PARQUET_MAGIC)) {
+ return false;
+ }
+
+ long fileEndIndex = fileLength - PARQUET_MAGIC.length;
+ fsDataInputStream.seek(fileEndIndex);
+
+ byte[] fileEnd = new byte[PARQUET_MAGIC.length];
+ fsDataInputStream.readFully(fileEnd);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.error("file end: " + new String(fileEnd, Charset.forName("ASCII")));
+ }
+
+ return Arrays.equals(fileEnd, PARQUET_MAGIC);
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
private void extractRow(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Text line) throws UnsupportedEncodingException {
if (schema instanceof ByteArraySchema) {
dataWriter.writeArrayRecord(new Object[] {line.toString().getBytes(SqoopIDFUtils.BYTE_FIELD_CHARSET)});
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 5de20c6..7cef93c 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -32,6 +32,7 @@ import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
+import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter;
import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter;
import org.apache.sqoop.error.code.HdfsConnectorError;
@@ -89,7 +90,7 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
GenericHdfsWriter filewriter = getWriter(toJobConfig);
- filewriter.initialize(filepath, conf, codec);
+ filewriter.initialize(filepath, context.getSchema(), conf, codec);
if (!HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig) || (context.getSchema() instanceof ByteArraySchema)) {
String record;
@@ -119,8 +120,14 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
}
private GenericHdfsWriter getWriter(ToJobConfiguration toJobConf) {
- return (toJobConf.toJobConfig.outputFormat == ToFormat.SEQUENCE_FILE) ? new HdfsSequenceWriter()
- : new HdfsTextWriter();
+ switch(toJobConf.toJobConfig.outputFormat) {
+ case SEQUENCE_FILE:
+ return new HdfsSequenceWriter();
+ case PARQUET_FILE:
+ return new HdfsParquetWriter();
+ default:
+ return new HdfsTextWriter();
+ }
}
private String getCompressionCodecName(ToJobConfiguration toJobConf) {
@@ -151,11 +158,16 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
//TODO: We should probably support configurable extensions at some point
private static String getExtension(ToJobConfiguration toJobConf, CompressionCodec codec) {
- if (toJobConf.toJobConfig.outputFormat == ToFormat.SEQUENCE_FILE)
- return ".seq";
- if (codec == null)
- return ".txt";
- return codec.getDefaultExtension();
+ switch(toJobConf.toJobConfig.outputFormat) {
+ case SEQUENCE_FILE:
+ return ".seq";
+ case PARQUET_FILE:
+ return ".parquet";
+ default:
+ if (codec == null)
+ return ".txt";
+ return codec.getDefaultExtension();
+ }
}
/* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java
index 27d121f..ffce583 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java
@@ -30,4 +30,9 @@ public enum ToFormat {
* Sequence file
*/
SEQUENCE_FILE,
+
+ /**
+ * Parquet file
+ */
+ PARQUET_FILE,
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java
index 2ccccc4..31023e7 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java
@@ -20,12 +20,13 @@ package org.apache.sqoop.connector.hdfs.hdfsWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.schema.Schema;
import java.io.IOException;
public abstract class GenericHdfsWriter {
- public abstract void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException;
+ public abstract void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException;
public abstract void write(String csv) throws IOException;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java
new file mode 100644
index 0000000..4ec813b
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.hdfsWriter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat;
+import org.apache.sqoop.schema.Schema;
+
+import java.io.IOException;
+
+public class HdfsParquetWriter extends GenericHdfsWriter {
+
+ private ParquetWriter avroParquetWriter;
+ private Schema sqoopSchema;
+ private AVROIntermediateDataFormat avroIntermediateDataFormat;
+
+ @Override
+ public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec hadoopCodec) throws IOException {
+ sqoopSchema = schema;
+ avroIntermediateDataFormat = new AVROIntermediateDataFormat(sqoopSchema);
+
+ CompressionCodecName parquetCodecName;
+ if (hadoopCodec == null) {
+ parquetCodecName = CompressionCodecName.UNCOMPRESSED;
+ } else {
+ parquetCodecName = CompressionCodecName.fromCompressionCodec(hadoopCodec.getClass());
+ }
+
+ avroParquetWriter =
+ AvroParquetWriter.builder(filepath)
+ .withSchema(avroIntermediateDataFormat.getAvroSchema())
+ .withCompressionCodec(parquetCodecName)
+ .withConf(conf).build();
+
+ }
+
+ @Override
+ public void write(String csv) throws IOException {
+ avroParquetWriter.write(avroIntermediateDataFormat.toAVRO(csv));
+ }
+
+ @Override
+ public void destroy() throws IOException {
+ avroParquetWriter.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java
index 75c2e7e..dcce861 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java
@@ -23,16 +23,17 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.schema.Schema;
import java.io.IOException;
-public class HdfsSequenceWriter extends GenericHdfsWriter {
+public class HdfsSequenceWriter extends GenericHdfsWriter {
private SequenceFile.Writer filewriter;
private Text text;
@SuppressWarnings("deprecation")
- public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException {
+ public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException {
if (codec != null) {
filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
conf, filepath, Text.class, NullWritable.class,
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java
index 78cf973..384e330 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.connector.hdfs.HdfsConstants;
+import org.apache.sqoop.schema.Schema;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
@@ -34,7 +35,7 @@ public class HdfsTextWriter extends GenericHdfsWriter {
private BufferedWriter filewriter;
@Override
- public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException {
+ public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException {
FileSystem fs = filepath.getFileSystem(conf);
DataOutputStream filestream = fs.create(filepath, false);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
index adede3a..cbd555a 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
@@ -17,9 +17,6 @@
*/
package org.apache.sqoop.connector.hdfs;
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
-
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -27,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -35,11 +33,17 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.connector.common.SqoopIDFUtils;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.ToCompression;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
@@ -47,13 +51,18 @@ import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.schema.type.FloatingPoint;
import org.apache.sqoop.schema.type.Text;
-import org.testng.annotations.AfterMethod;
+import org.apache.sqoop.utils.ClassUtils;
import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.PARQUET_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
+
public class TestLoader extends TestHdfsBase {
private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
@@ -63,6 +72,7 @@ public class TestLoader extends TestHdfsBase {
private final String outputDirectory;
private Loader loader;
private String user = "test_user";
+ private Schema schema;
@Factory(dataProvider="test-hdfs-loader")
public TestLoader(ToFormat outputFormat,
@@ -80,9 +90,10 @@ public class TestLoader extends TestHdfsBase {
for (ToCompression compression : new ToCompression[]{
ToCompression.DEFAULT,
ToCompression.BZIP2,
+ ToCompression.GZIP,
ToCompression.NONE
}) {
- for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
+ for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE, PARQUET_FILE}) {
parameters.add(new Object[]{outputFileType, compression});
}
}
@@ -100,7 +111,7 @@ public class TestLoader extends TestHdfsBase {
@Test
public void testLoader() throws Exception {
FileSystem fs = FileSystem.get(new Configuration());
- Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
+ schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
.addColumn(new FloatingPoint("col2", 4L))
.addColumn(new Text("col3"));
@@ -130,14 +141,22 @@ public class TestLoader extends TestHdfsBase {
assertTestUser(user);
return null;
}
- }, null, user);
+ }, schema, user);
LinkConfiguration linkConf = new LinkConfiguration();
ToJobConfiguration jobConf = new ToJobConfiguration();
jobConf.toJobConfig.compression = compression;
jobConf.toJobConfig.outputFormat = outputFormat;
Path outputPath = new Path(outputDirectory);
- loader.load(context, linkConf, jobConf);
+ try {
+ loader.load(context, linkConf, jobConf);
+ } catch (Exception e) {
+ // we may wait to fail if the compression format selected is not supported by the
+ // output format
+ Assert.assertTrue(compressionNotSupported());
+ return;
+ }
+
Assert.assertEquals(1, fs.listStatus(outputPath).length);
for (FileStatus status : fs.listStatus(outputPath)) {
@@ -152,10 +171,26 @@ public class TestLoader extends TestHdfsBase {
Assert.assertEquals(5, fs.listStatus(outputPath).length);
}
+ private boolean compressionNotSupported() {
+ switch (outputFormat) {
+ case SEQUENCE_FILE:
+ return compression == ToCompression.GZIP;
+ case PARQUET_FILE:
+ return compression == ToCompression.BZIP2 || compression == ToCompression.DEFAULT;
+ }
+ return false;
+ }
+
@Test
public void testOverrideNull() throws Exception {
+ // Parquet supports an actual "null" value so overriding null would not make
+ // sense here
+ if (outputFormat == PARQUET_FILE) {
+ return;
+ }
+
FileSystem fs = FileSystem.get(new Configuration());
- Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
+ schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
.addColumn(new FloatingPoint("col2", 8L))
.addColumn(new Text("col3"))
.addColumn(new Text("col4"));
@@ -199,7 +234,15 @@ public class TestLoader extends TestHdfsBase {
jobConf.toJobConfig.nullValue = "\\N";
Path outputPath = new Path(outputDirectory);
- loader.load(context, linkConf, jobConf);
+ try {
+ loader.load(context, linkConf, jobConf);
+ } catch (Exception e) {
+ // we may wait to fail if the compression format selected is not supported by the
+ // output format
+ assert(compressionNotSupported());
+ return;
+ }
+
Assert.assertEquals(1, fs.listStatus(outputPath).length);
for (FileStatus status : fs.listStatus(outputPath)) {
@@ -214,7 +257,7 @@ public class TestLoader extends TestHdfsBase {
Assert.assertEquals(5, fs.listStatus(outputPath).length);
}
- private void verifyOutput(FileSystem fs, Path file, String format) throws IOException {
+ private void verifyOutput(FileSystem fs, Path file, String format) throws Exception {
Configuration conf = new Configuration();
FSDataInputStream fsin = fs.open(file);
CompressionCodec codec;
@@ -228,7 +271,9 @@ public class TestLoader extends TestHdfsBase {
case BZIP2:
Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("BZip2") != -1);
break;
-
+ case GZIP:
+ Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Gzip") != -1);
+ break;
case DEFAULT:
if(org.apache.hadoop.util.VersionInfo.getVersion().matches("\\b1\\.\\d\\.\\d")) {
Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Default") != -1);
@@ -283,10 +328,46 @@ public class TestLoader extends TestHdfsBase {
line = new org.apache.hadoop.io.Text();
}
break;
+ case PARQUET_FILE:
+ String compressionCodecClassName = ParquetFileReader.readFooter(conf, file, ParquetMetadataConverter.NO_FILTER).getBlocks().get(0).getColumns().get(0).getCodec().getHadoopCompressionCodecClassName();
+
+ if (compressionCodecClassName == null) {
+ codec = null;
+ } else {
+ codec = (CompressionCodec) ClassUtils.loadClass(compressionCodecClassName).newInstance();
+ }
+
+ // Verify compression
+ switch(compression) {
+ case GZIP:
+ Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Gzip") != -1);
+ break;
+
+ case NONE:
+ default:
+ Assert.assertNull(codec);
+ break;
+ }
+
+
+ ParquetReader<GenericRecord> avroParquetReader = AvroParquetReader.builder(file).build();
+ AVROIntermediateDataFormat avroIntermediateDataFormat = new AVROIntermediateDataFormat();
+ avroIntermediateDataFormat.setSchema(schema);
+ GenericRecord record;
+ index = 1;
+ while ((record = avroParquetReader.read()) != null) {
+ List<Object> objects = new ArrayList<>();
+ for (int i = 0; i < record.getSchema().getFields().size(); i++) {
+ objects.add(record.get(i));
+ }
+ Assert.assertEquals(SqoopIDFUtils.toText(avroIntermediateDataFormat.toCSV(record)), formatRow(format, index++));
+ }
+
+ break;
}
}
- private void verifyOutput(FileSystem fs, Path file) throws IOException {
+ private void verifyOutput(FileSystem fs, Path file) throws Exception {
verifyOutput(fs, file, "%d,%f,%s");
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
index 985149c..89bc0f2 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
@@ -43,7 +43,8 @@ public class SqoopAvroUtils {
* Creates an Avro schema from a Sqoop schema.
*/
public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) {
- String name = sqoopSchema.getName();
+ // avro schema names cannot start with quotes, lets just remove them
+ String name = sqoopSchema.getName().replace("\"", "");
String doc = sqoopSchema.getNote();
String namespace = SQOOP_SCHEMA_NAMESPACE;
Schema schema = Schema.createRecord(name, doc, namespace, false);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
index ace1bdf..e409fc1 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
@@ -148,7 +148,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
return jars;
}
- private GenericRecord toAVRO(String csv) {
+ public GenericRecord toAVRO(String csv) {
String[] csvStringArray = parseCSVString(csv);
@@ -175,7 +175,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
return avroObject;
}
- private Object toAVRO(String csvString, Column column) {
+ public Object toAVRO(String csvString, Column column) {
Object returnValue = null;
switch (column.getType()) {
@@ -232,7 +232,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
return returnValue;
}
- private GenericRecord toAVRO(Object[] objectArray) {
+ public GenericRecord toAVRO(Object[] objectArray) {
if (objectArray == null) {
return null;
@@ -311,7 +311,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
}
@SuppressWarnings("unchecked")
- private String toCSV(GenericRecord record) {
+ public String toCSV(GenericRecord record) {
Column[] columns = this.schema.getColumnsArray();
StringBuilder csvString = new StringBuilder();
@@ -387,7 +387,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
}
@SuppressWarnings("unchecked")
- private Object[] toObject(GenericRecord record) {
+ public Object[] toObject(GenericRecord record) {
if (record == null) {
return null;
@@ -459,4 +459,8 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
}
return object;
}
+
+ public Schema getAvroSchema() {
+ return avroSchema;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cb8a973..ba0a243 100644
--- a/pom.xml
+++ b/pom.xml
@@ -124,6 +124,7 @@ limitations under the License.
<groovy.version>2.4.0</groovy.version>
<jansi.version>1.7</jansi.version>
<felix.version>2.4.0</felix.version>
+ <parquet.version>1.8.1</parquet.version>
<!-- maven plugin versions -->
<maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
</properties>
@@ -700,6 +701,16 @@ limitations under the License.
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
index 451352a..134bca1 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -175,6 +175,16 @@ limitations under the License.
<artifactId>hadoop-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ </dependency>
+
</dependencies>
<!-- Add classifier name to the JAR name -->
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java
index 3ec4f66..1e8c688 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java
@@ -20,17 +20,27 @@ package org.apache.sqoop.integration.connector.hdfs;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multiset;
+import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
import org.apache.sqoop.connector.common.SqoopIDFUtils;
import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
+import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter;
+import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.DateTime;
+import org.apache.sqoop.schema.type.FixedPoint;
import org.apache.sqoop.test.asserts.HdfsAsserts;
import org.apache.sqoop.test.infrastructure.Infrastructure;
import org.apache.sqoop.test.infrastructure.SqoopTestCase;
@@ -51,6 +61,7 @@ import org.testng.annotations.Test;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
@@ -64,6 +75,9 @@ public class NullValueTest extends SqoopTestCase {
// The custom nullValue to use (set to null if default)
private String nullValue;
+
+ private Schema sqoopSchema;
+
@DataProvider(name="nul-value-test")
public static Object[][] data(ITestContext context) {
String customNullValue = "^&*custom!@";
@@ -80,12 +94,19 @@ public class NullValueTest extends SqoopTestCase {
}
@Override
+
public String getTestName() {
return methodName + "[" + format.name() + ", " + nullValue + "]";
}
@BeforeMethod
public void setup() throws Exception {
+ sqoopSchema = new Schema("cities");
+ sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE), true));
+ sqoopSchema.addColumn(new org.apache.sqoop.schema.type.Text("country"));
+ sqoopSchema.addColumn(new DateTime("some_date", true, false));
+ sqoopSchema.addColumn(new org.apache.sqoop.schema.type.Text("city"));
+
createTableCities();
}
@@ -128,6 +149,27 @@ public class NullValueTest extends SqoopTestCase {
}
sequenceFileWriter.close();
break;
+ case PARQUET_FILE:
+ // Parquet file format does not support using custom null values
+ if (usingCustomNullValue()) {
+ return;
+ } else {
+ HdfsParquetWriter parquetWriter = new HdfsParquetWriter();
+
+ Configuration conf = new Configuration();
+ FileSystem.setDefaultUri(conf, hdfsClient.getUri());
+
+ parquetWriter.initialize(
+ new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001.parquet")),
+ sqoopSchema, conf, null);
+
+ for (String line : getCsv()) {
+ parquetWriter.write(line);
+ }
+
+ parquetWriter.destroy();
+ break;
+ }
default:
Assert.fail();
}
@@ -166,6 +208,11 @@ public class NullValueTest extends SqoopTestCase {
@Test
public void testToHdfs() throws Exception {
+ // Parquet file format does not support using custom null values
+ if (usingCustomNullValue() && format == ToFormat.PARQUET_FILE) {
+ return;
+ }
+
provider.insertRow(getTableName(), 1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco");
provider.insertRow(getTableName(), 2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null);
provider.insertRow(getTableName(), 3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
@@ -203,16 +250,16 @@ public class NullValueTest extends SqoopTestCase {
executeJob(job);
+
+ Multiset<String> setLines = HashMultiset.create(Arrays.asList(getCsv()));
+ Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
+ List<String> notFound = new ArrayList<>();
switch (format) {
case TEXT_FILE:
HdfsAsserts.assertMapreduceOutput(hdfsClient,
HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"), getCsv());
- break;
+ return;
case SEQUENCE_FILE:
- Multiset<String> setLines = HashMultiset.create(Arrays.asList(getCsv()));
- List<String> notFound = new ArrayList<>();
- Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
-
for(Path file : files) {
SequenceFile.Reader.Option optPath = SequenceFile.Reader.file(file);
SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(getHadoopConf(), optPath);
@@ -224,17 +271,32 @@ public class NullValueTest extends SqoopTestCase {
}
}
}
- if(!setLines.isEmpty() || !notFound.isEmpty()) {
- LOG.error("Output do not match expectations.");
- LOG.error("Expected lines that weren't present in the files:");
- LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'");
- LOG.error("Extra lines in files that weren't expected:");
- LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'");
- Assert.fail("Output do not match expectations.");
+ break;
+ case PARQUET_FILE:
+ AVROIntermediateDataFormat avroIntermediateDataFormat = new AVROIntermediateDataFormat(sqoopSchema);
+ notFound = new LinkedList<>();
+ for (Path file : files) {
+ ParquetReader<GenericRecord> avroParquetReader = AvroParquetReader.builder(file).build();
+ GenericRecord record;
+ while ((record = avroParquetReader.read()) != null) {
+ String recordAsCsv = avroIntermediateDataFormat.toCSV(record);
+ if (!setLines.remove(recordAsCsv)) {
+ notFound.add(recordAsCsv);
+ }
+ }
}
break;
default:
Assert.fail();
}
+
+ if(!setLines.isEmpty() || !notFound.isEmpty()) {
+ LOG.error("Output do not match expectations.");
+ LOG.error("Expected lines that weren't present in the files:");
+ LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'");
+ LOG.error("Extra lines in files that weren't expected:");
+ LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'");
+ Assert.fail("Output do not match expectations.");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java
new file mode 100644
index 0000000..222c493
--- /dev/null
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.hdfs;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
+import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.DateTime;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
+import org.apache.sqoop.test.utils.HdfsUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class ParquetTest extends SqoopTestCase {
+
+ @AfterMethod
+ public void dropTable() {
+ super.dropTable();
+ }
+
+ @Test
+ public void toParquetTest() throws Exception {
+ createAndLoadTableCities();
+
+ // RDBMS link
+ MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
+ fillRdbmsLinkConfig(rdbmsConnection);
+ saveLink(rdbmsConnection);
+
+ // HDFS link
+ MLink hdfsConnection = getClient().createLink("hdfs-connector");
+ fillHdfsLink(hdfsConnection);
+ saveLink(hdfsConnection);
+
+ // Job creation
+ MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
+
+
+ // Set rdbms "FROM" config
+ fillRdbmsFromConfig(job, "id");
+
+ // Fill the hdfs "TO" config
+ fillHdfsToConfig(job, ToFormat.PARQUET_FILE);
+
+ saveJob(job);
+ executeJob(job);
+
+ String[] expectedOutput =
+ {"'1','USA','2004-10-23 00:00:00.000','San Francisco'",
+ "'2','USA','2004-10-24 00:00:00.000','Sunnyvale'",
+ "'3','Czech Republic','2004-10-25 00:00:00.000','Brno'",
+ "'4','USA','2004-10-26 00:00:00.000','Palo Alto'"};
+
+
+ Multiset<String> setLines = HashMultiset.create(Arrays.asList(expectedOutput));
+
+ List<String> notFound = new LinkedList<>();
+
+ Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, getMapreduceDirectory());
+ for (Path file : files) {
+ ParquetReader<GenericRecord> avroParquetReader = AvroParquetReader.builder(file).build();
+ GenericRecord record;
+ while ((record = avroParquetReader.read()) != null) {
+ String recordAsLine = recordToLine(record);
+ if (!setLines.remove(recordAsLine)) {
+ notFound.add(recordAsLine);
+ }
+ }
+ }
+
+ if (!setLines.isEmpty() || !notFound.isEmpty()) {
+ fail("Output do not match expectations.");
+ }
+ }
+
+ @Test
+ public void fromParquetTest() throws Exception {
+ createTableCities();
+
+ Schema sqoopSchema = new Schema("cities");
+ sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE), true));
+ sqoopSchema.addColumn(new Text("country"));
+ sqoopSchema.addColumn(new DateTime("some_date", true, false));
+ sqoopSchema.addColumn(new Text("city"));
+
+ HdfsParquetWriter parquetWriter = new HdfsParquetWriter();
+
+ Configuration conf = new Configuration();
+ FileSystem.setDefaultUri(conf, hdfsClient.getUri());
+
+ parquetWriter.initialize(
+ new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001.parquet")),
+ sqoopSchema, conf, null);
+
+ parquetWriter.write("1,'USA','2004-10-23 00:00:00.000','San Francisco'");
+ parquetWriter.write("2,'USA','2004-10-24 00:00:00.000','Sunnyvale'");
+
+ parquetWriter.destroy();
+
+ parquetWriter.initialize(
+ new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0002.parquet")),
+ sqoopSchema, conf, null);
+
+ parquetWriter.write("3,'Czech Republic','2004-10-25 00:00:00.000','Brno'");
+ parquetWriter.write("4,'USA','2004-10-26 00:00:00.000','Palo Alto'");
+
+ parquetWriter.destroy();
+
+ // RDBMS link
+ MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
+ fillRdbmsLinkConfig(rdbmsLink);
+ saveLink(rdbmsLink);
+
+ // HDFS link
+ MLink hdfsLink = getClient().createLink("hdfs-connector");
+ fillHdfsLink(hdfsLink);
+ saveLink(hdfsLink);
+
+ // Job creation
+ MJob job = getClient().createJob(hdfsLink.getName(), rdbmsLink.getName());
+ fillHdfsFromConfig(job);
+ fillRdbmsToConfig(job);
+ saveJob(job);
+
+ executeJob(job);
+ assertEquals(provider.rowCount(getTableName()), 4);
+ assertRowInCities(1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco");
+ assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), "Sunnyvale");
+ assertRowInCities(3, "Czech Republic", Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
+ assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
+ }
+
+ public String recordToLine(GenericRecord genericRecord) {
+ String line = "";
+ line += "\'" + String.valueOf(genericRecord.get(0)) + "\',";
+ line += "\'" + String.valueOf(genericRecord.get(1)) + "\',";
+ line += "\'" + new Timestamp((Long)genericRecord.get(2)) + "00\',";
+ line += "\'" + String.valueOf(genericRecord.get(3)) + "\'";
+ return line;
+ }
+
+}