You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/08/19 17:43:10 UTC

git commit: SQOOP-1390: Import data to HDFS as a set of Parquet files

Repository: sqoop
Updated Branches:
  refs/heads/trunk 74ec89452 -> 2e1e09422


SQOOP-1390: Import data to HDFS as a set of Parquet files

(Qian Xu via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/2e1e0942
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/2e1e0942
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/2e1e0942

Branch: refs/heads/trunk
Commit: 2e1e09422cdeb1a0d6ead104599687fae6ac55f4
Parents: 74ec894
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Aug 19 08:42:48 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Aug 19 08:42:48 2014 -0700

----------------------------------------------------------------------
 ivy.xml                                         |   2 +
 ivy/libraries.properties                        |   2 +
 src/docs/man/import-args.txt                    |   3 +
 src/docs/man/sqoop-import-all-tables.txt        |   3 +
 src/docs/user/hcatalog.txt                      |   1 +
 src/docs/user/help.txt                          |   1 +
 src/docs/user/import-all-tables.txt             |   1 +
 src/docs/user/import.txt                        |   1 +
 src/java/com/cloudera/sqoop/SqoopOptions.java   |   3 +-
 src/java/org/apache/sqoop/avro/AvroUtil.java    |  69 +++++++
 .../org/apache/sqoop/lib/SqoopAvroRecord.java   |  57 ++++++
 .../sqoop/mapreduce/AvroImportMapper.java       |  48 +----
 .../sqoop/mapreduce/DataDrivenImportJob.java    |  23 +++
 .../sqoop/mapreduce/ParquetImportMapper.java    |  70 +++++++
 .../org/apache/sqoop/mapreduce/ParquetJob.java  |  77 +++++++
 src/java/org/apache/sqoop/orm/ClassWriter.java  |  57 +++++-
 .../org/apache/sqoop/tool/BaseSqoopTool.java    |   1 +
 src/java/org/apache/sqoop/tool/ImportTool.java  |  12 +-
 src/java/org/apache/sqoop/util/AppendUtils.java |  13 +-
 src/licenses/LICENSE-BIN.txt                    |  12 ++
 .../com/cloudera/sqoop/TestParquetImport.java   | 200 +++++++++++++++++++
 21 files changed, 601 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index abc12a1..e5334f1 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -183,6 +183,8 @@ under the License.
       conf="common->default;redist->default"/>
     <dependency org="commons-io" name="commons-io" rev="${commons-io.version}"
       conf="common->default;redist->default"/>
+    <dependency org="org.kitesdk" name="kite-data-mapreduce" rev="${kite-data-mapreduce.version}"
+      conf="avro->default;redist->default"/>
 
     <!-- dependencies for static analysis -->
     <dependency org="checkstyle" name="checkstyle" rev="${checkstyle.version}"

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/ivy/libraries.properties
----------------------------------------------------------------------
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index a59471e..cbcbf0d 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -20,6 +20,8 @@
 
 avro.version=1.7.5
 
+kite-data-mapreduce.version=0.15.0
+
 checkstyle.version=5.0
 
 commons-cli.version=1.2

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/docs/man/import-args.txt
----------------------------------------------------------------------
diff --git a/src/docs/man/import-args.txt b/src/docs/man/import-args.txt
index a4ce4ec..2bb69ba 100644
--- a/src/docs/man/import-args.txt
+++ b/src/docs/man/import-args.txt
@@ -33,6 +33,9 @@ Import control options
 --as-textfile::
   Imports data as plain text (default)
 
+--as-parquetfile::
+  Imports data to Parquet Files
+
 --boundary-query (query)::
   Using following query to select minimal and maximal value of '--split-by' column for creating splits
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/docs/man/sqoop-import-all-tables.txt
----------------------------------------------------------------------
diff --git a/src/docs/man/sqoop-import-all-tables.txt b/src/docs/man/sqoop-import-all-tables.txt
index 6b639f5..6db38ad 100644
--- a/src/docs/man/sqoop-import-all-tables.txt
+++ b/src/docs/man/sqoop-import-all-tables.txt
@@ -36,6 +36,9 @@ Import control options
 --as-textfile::
   Imports data as plain text (default)
 
+--as-parquetfile::
+  Imports data to Parquet Files
+
 --direct::
   Use direct import fast path (mysql only)
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/docs/user/hcatalog.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hcatalog.txt b/src/docs/user/hcatalog.txt
index cd1dde3..98f4ac5 100644
--- a/src/docs/user/hcatalog.txt
+++ b/src/docs/user/hcatalog.txt
@@ -169,6 +169,7 @@ The following Sqoop export and import options are not supported with HCatalog jo
 * +--append+
 * +--as-sequencefile+
 * +--as-avrofile+
+* +--as-parquetfile+
 
 Ignored Sqoop Options
 ^^^^^^^^^^^^^^^^^^^^^

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/docs/user/help.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/help.txt b/src/docs/user/help.txt
index a9e1e89..8a0d147 100644
--- a/src/docs/user/help.txt
+++ b/src/docs/user/help.txt
@@ -81,6 +81,7 @@ Import control arguments:
    --as-avrodatafile             Imports data to Avro Data Files
    --as-sequencefile             Imports data to SequenceFiles
    --as-textfile                 Imports data as plain text (default)
+   --as-parquetfile              Imports data to Parquet Data Files
 ...
 ----
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/docs/user/import-all-tables.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/import-all-tables.txt b/src/docs/user/import-all-tables.txt
index 60645f1..166825a 100644
--- a/src/docs/user/import-all-tables.txt
+++ b/src/docs/user/import-all-tables.txt
@@ -47,6 +47,7 @@ Argument                     Description
 +\--as-avrodatafile+         Imports data to Avro Data Files
 +\--as-sequencefile+         Imports data to SequenceFiles
 +\--as-textfile+             Imports data as plain text (default)
++\--as-parquetfile+          Imports data to Parquet Files
 +\--direct+                  Use direct import fast path
 +\--inline-lob-limit <n>+    Set the maximum size for an inline LOB
 +-m,\--num-mappers <n>+      Use 'n' map tasks to import in parallel

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/docs/user/import.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt
index 192e97e..c5ffa50 100644
--- a/src/docs/user/import.txt
+++ b/src/docs/user/import.txt
@@ -59,6 +59,7 @@ Argument                          Description
 +\--as-avrodatafile+              Imports data to Avro Data Files
 +\--as-sequencefile+              Imports data to SequenceFiles
 +\--as-textfile+                  Imports data as plain text (default)
++\--as-parquetfile+               Imports data to Parquet Files
 +\--boundary-query <statement>+   Boundary query to use for creating splits
 +\--columns <col,col,col...>+     Columns to import from table
 +\--delete-target-dir+            Delete the import target directory\

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/java/com/cloudera/sqoop/SqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/java/com/cloudera/sqoop/SqoopOptions.java b/src/java/com/cloudera/sqoop/SqoopOptions.java
index ffec2dc..f4ababe 100644
--- a/src/java/com/cloudera/sqoop/SqoopOptions.java
+++ b/src/java/com/cloudera/sqoop/SqoopOptions.java
@@ -39,7 +39,8 @@ public class SqoopOptions
   public enum FileLayout {
     TextFile,
     SequenceFile,
-    AvroDataFile
+    AvroDataFile,
+    ParquetFile
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/java/org/apache/sqoop/avro/AvroUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java
new file mode 100644
index 0000000..4b37d58
--- /dev/null
+++ b/src/java/org/apache/sqoop/avro/AvroUtil.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.avro;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.sqoop.lib.BlobRef;
+import org.apache.sqoop.lib.ClobRef;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * The service class provides methods for creating and converting Avro objects.
+ */
+public final class AvroUtil {
+
+  /**
+   * Convert the Avro representation of a Java type (that has already been
+   * converted from the SQL equivalent). Note that the method is taken from
+   * {@link org.apache.sqoop.mapreduce.AvroImportMapper}
+   */
+  public static Object toAvro(Object o, boolean bigDecimalFormatString) {
+    if (o instanceof BigDecimal) {
+      if (bigDecimalFormatString) {
+        return ((BigDecimal)o).toPlainString();
+      } else {
+        return o.toString();
+      }
+    } else if (o instanceof Date) {
+      return ((Date) o).getTime();
+    } else if (o instanceof Time) {
+      return ((Time) o).getTime();
+    } else if (o instanceof Timestamp) {
+      return ((Timestamp) o).getTime();
+    } else if (o instanceof BytesWritable) {
+      BytesWritable bw = (BytesWritable) o;
+      return ByteBuffer.wrap(bw.getBytes(), 0, bw.getLength());
+    } else if (o instanceof BlobRef) {
+      BlobRef br = (BlobRef) o;
+      // If blob data is stored in an external .lob file, save the ref file
+      // as Avro bytes. If materialized inline, save blob data as Avro bytes.
+      byte[] bytes = br.isExternal() ? br.toString().getBytes() : br.getData();
+      return ByteBuffer.wrap(bytes);
+    } else if (o instanceof ClobRef) {
+      throw new UnsupportedOperationException("ClobRef not supported");
+    }
+    // primitive types (Integer, etc) are left unchanged
+    return o;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java b/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java
new file mode 100644
index 0000000..80875d2
--- /dev/null
+++ b/src/java/org/apache/sqoop/lib/SqoopAvroRecord.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.sqoop.avro.AvroUtil;
+
+/**
+ * The abstract class extends {@link org.apache.sqoop.lib.SqoopRecord}. It also
+ * implements the interface GenericRecord which is a generic instance of an Avro
+ * record schema. Fields are accessible by name as well as by index.
+ */
+public abstract class SqoopAvroRecord extends SqoopRecord implements GenericRecord {
+
+  public abstract boolean getBigDecimalFormatString();
+
+  @Override
+  public void put(String key, Object v) {
+    getFieldMap().put(key, v);
+  }
+
+  @Override
+  public Object get(String key) {
+    Object o = getFieldMap().get(key);
+    return AvroUtil.toAvro(o, getBigDecimalFormatString());
+  }
+
+  @Override
+  public void put(int i, Object v) {
+    put(getFieldNameByIndex(i), v);
+  }
+
+  @Override
+  public Object get(int i) {
+    return get(getFieldNameByIndex(i));
+  }
+
+  private String getFieldNameByIndex(int i) {
+    return getSchema().getFields().get(i).name();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
index 289eb28..6fc656f 100644
--- a/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
@@ -19,27 +19,20 @@
 package org.apache.sqoop.mapreduce;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.sql.Date;
 import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
 import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroWrapper;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import com.cloudera.sqoop.lib.BlobRef;
-import com.cloudera.sqoop.lib.ClobRef;
 import com.cloudera.sqoop.lib.LargeObjectLoader;
 import com.cloudera.sqoop.lib.SqoopRecord;
 import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.sqoop.avro.AvroUtil;
 
 /**
  * Imports records by transforming them to Avro records in an Avro data file.
@@ -92,45 +85,10 @@ public class AvroImportMapper
     Map<String, Object> fieldMap = val.getFieldMap();
     GenericRecord record = new GenericData.Record(schema);
     for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
-      record.put(entry.getKey(), toAvro(entry.getValue()));
+      Object avro = AvroUtil.toAvro(entry.getValue(), bigDecimalFormatString);
+      record.put(entry.getKey(), avro);
     }
     return record;
   }
 
-  /**
-   * Convert the Avro representation of a Java type (that has already been
-   * converted from the SQL equivalent).
-   * @param o
-   * @return
-   */
-  private Object toAvro(Object o) {
-    if (o instanceof BigDecimal) {
-      if (bigDecimalFormatString) {
-        return ((BigDecimal)o).toPlainString();
-      } else {
-        return o.toString();
-      }
-    } else if (o instanceof Date) {
-      return ((Date) o).getTime();
-    } else if (o instanceof Time) {
-      return ((Time) o).getTime();
-    } else if (o instanceof Timestamp) {
-      return ((Timestamp) o).getTime();
-    } else if (o instanceof BytesWritable) {
-      BytesWritable bw = (BytesWritable) o;
-      return ByteBuffer.wrap(bw.getBytes(), 0, bw.getLength());
-    } else if (o instanceof BlobRef) {
-      BlobRef br = (BlobRef) o;
-      // If blob data is stored in an external .lob file, save the ref file
-      // as Avro bytes. If materialized inline, save blob data as Avro bytes.
-      byte[] bytes = br.isExternal() ? br.toString().getBytes() : br.getData();
-      return ByteBuffer.wrap(bytes);
-    } else if (o instanceof ClobRef) {
-      throw new UnsupportedOperationException("ClobRef not suported");
-    }
-    // primitive types (Integer, etc) are left unchanged
-    return o;
-  }
-
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index 6dcfebb..300406a 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -26,6 +26,8 @@ import org.apache.avro.Schema;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -45,6 +47,7 @@ import com.cloudera.sqoop.mapreduce.ImportJobBase;
 import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
 import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
 import com.cloudera.sqoop.orm.AvroSchemaGenerator;
+import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
 
 /**
  * Actually runs a jdbc import job using the ORM files generated by the
@@ -95,6 +98,20 @@ public class DataDrivenImportJob extends ImportJobBase {
       }
 
       AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
+    } else if (options.getFileLayout()
+        == SqoopOptions.FileLayout.ParquetFile) {
+      Configuration conf = job.getConfiguration();
+      // An Avro schema is required for creating a dataset that manages
+      // Parquet data records. The import will fail, if schema is invalid.
+      Schema schema = new Schema.Parser().parse(conf.get("avro.schema"));
+      String uri = "";
+      if (options.doHiveImport()) {
+        // TODO: SQOOP-1393
+      } else {
+        FileSystem fs = FileSystem.get(conf);
+        uri = "dataset:" + fs.makeQualified(getContext().getDestination());
+      }
+      ParquetJob.configureImportJob(conf, schema, uri, options.isAppendMode());
     }
 
     job.setMapperClass(getMapperClass());
@@ -129,6 +146,9 @@ public class DataDrivenImportJob extends ImportJobBase {
     } else if (options.getFileLayout()
         == SqoopOptions.FileLayout.AvroDataFile) {
       return AvroImportMapper.class;
+    } else if (options.getFileLayout()
+        == SqoopOptions.FileLayout.ParquetFile) {
+      return ParquetImportMapper.class;
     }
 
     return null;
@@ -149,6 +169,9 @@ public class DataDrivenImportJob extends ImportJobBase {
     } else if (options.getFileLayout()
         == SqoopOptions.FileLayout.AvroDataFile) {
       return AvroOutputFormat.class;
+    } else if (options.getFileLayout()
+        == SqoopOptions.FileLayout.ParquetFile) {
+      return DatasetKeyOutputFormat.class;
     }
 
     return null;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
new file mode 100644
index 0000000..cc2982c
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.lib.SqoopAvroRecord;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+/**
+ * Imports records by writing them to a Parquet File.
+ */
+public class ParquetImportMapper
+    extends AutoProgressMapper<LongWritable, SqoopAvroRecord,
+        GenericRecord, NullWritable> {
+
+  private LargeObjectLoader lobLoader = null;
+
+  @Override
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    Path workPath = new Path("/tmp/sqoop-parquet-" + context.getTaskAttemptID());
+    lobLoader = new LargeObjectLoader(conf, workPath);
+  }
+
+  @Override
+  protected void map(LongWritable key, SqoopAvroRecord val, Context context)
+      throws IOException, InterruptedException {
+    try {
+      // Loading of LOBs was delayed until we have a Context.
+      val.loadLargeObjects(lobLoader);
+    } catch (SQLException sqlE) {
+      throw new IOException(sqlE);
+    }
+
+    context.write(val, null);
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException {
+    if (null != lobLoader) {
+      lobLoader.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
new file mode 100644
index 0000000..a74432a
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.DatasetNotFoundException;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Formats;
+import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
+import org.kitesdk.data.spi.SchemaValidationUtil;
+
+import java.io.IOException;
+
+/**
+ * Helper class for setting up a Parquet MapReduce job.
+ */
+public final class ParquetJob {
+
+  private ParquetJob() {
+  }
+
+  /**
+   * Configure the import job. The import process will use a Kite dataset to
+   * write data records into Parquet format internally. The input key class is
+   * {@link org.apache.sqoop.lib.SqoopAvroRecord}. The output key is
+   * {@link org.apache.avro.generic.GenericRecord}.
+   */
+  public static void configureImportJob(Configuration conf, Schema schema,
+      String uri, boolean doAppend) throws IOException {
+    Dataset dataset;
+    if (doAppend) {
+      try {
+        dataset = Datasets.load(uri);
+      } catch (DatasetNotFoundException ex) {
+        dataset = createDataset(schema, uri);
+      }
+      Schema writtenWith = dataset.getDescriptor().getSchema();
+      if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
+        throw new IOException(
+            String.format("Expected schema: %s%nActual schema: %s",
+                writtenWith, schema));
+      }
+    } else {
+      dataset = createDataset(schema, uri);
+    }
+    DatasetKeyOutputFormat.configure(conf).writeTo(dataset);
+  }
+
+  private static Dataset createDataset(Schema schema, String uri) {
+    DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
+        .schema(schema)
+        .format(Formats.PARQUET)
+        .build();
+    return Datasets.create(uri, descriptor, GenericRecord.class);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/java/org/apache/sqoop/orm/ClassWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/orm/ClassWriter.java b/src/java/org/apache/sqoop/orm/ClassWriter.java
index 94ff576..4f9dedd 100644
--- a/src/java/org/apache/sqoop/orm/ClassWriter.java
+++ b/src/java/org/apache/sqoop/orm/ClassWriter.java
@@ -30,9 +30,11 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.avro.Schema;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.sqoop.lib.SqoopAvroRecord;
 import org.apache.sqoop.mapreduce.ImportJobBase;
 
 import com.cloudera.sqoop.SqoopOptions;
@@ -1108,6 +1110,26 @@ public class ClassWriter {
     }
   }
 
+  private void generateSqoopAvroRecordMethods(String className, Schema schema, StringBuilder sb) {
+    // Define shared immutable attributes as static
+    sb.append("  private final static boolean bigDecimalFormatString;\n");
+    sb.append("  private final static Schema schema;\n");
+    sb.append("  static {\n");
+    sb.append("    bigDecimalFormatString = " + bigDecimalFormatString + ";\n");
+    sb.append("    schema = new Schema.Parser().parse(\"");
+    sb.append(schema.toString().replaceAll("\"", "\\\\\""));
+    sb.append("\");\n");
+    sb.append("  }\n");
+    sb.append("  @Override\n");
+    sb.append("  public boolean getBigDecimalFormatString() {\n");
+    sb.append("    return bigDecimalFormatString;\n");
+    sb.append("  }\n");
+    sb.append("  @Override\n");
+    sb.append("  public Schema getSchema() {\n");
+    sb.append("    return schema;\n");
+    sb.append("  }\n");
+  }
+
   /**
    * Generate the setField() method.
    * @param columnTypes - mapping from column names to sql types
@@ -1728,9 +1750,15 @@ public class ClassWriter {
       }
     }
 
+    Schema schema = null;
+    if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
+      schema = generateAvroSchemaForTable(tableName);
+      options.getConf().set("avro.schema", schema.toString());
+    }
+
     // Generate the Java code.
     StringBuilder sb = generateClassForColumns(columnTypes,
-        cleanedColNames, cleanedDbWriteColNames);
+        cleanedColNames, cleanedDbWriteColNames, schema);
     // Write this out to a file in the jar output directory.
     // We'll move it to the user-visible CodeOutputDir after compiling.
     String codeOutDir = options.getJarOutputDir();
@@ -1788,6 +1816,12 @@ public class ClassWriter {
     }
   }
 
+  private Schema generateAvroSchemaForTable(String tableName) throws IOException {
+    AvroSchemaGenerator generator = new AvroSchemaGenerator(options,
+        connManager, tableName);
+    return generator.generate();
+  }
+
   protected String[] getColumnNames(Map<String, Integer> columnTypes) {
     String [] colNames = options.getColumns();
     if (null == colNames) {
@@ -1838,15 +1872,18 @@ public class ClassWriter {
    * @param colNames - ordered list of column names for table.
    * @param dbWriteColNames - ordered list of column names for the db
    * write() method of the class.
+   * @param schema - If a valid Avro schema is specified, the base class will
+   * be SqoopAvroRecord
    * @return - A StringBuilder that contains the text of the class code.
    */
   private StringBuilder generateClassForColumns(
       Map<String, Integer> columnTypes,
-      String [] colNames, String [] dbWriteColNames) {
+      String [] colNames, String [] dbWriteColNames, Schema schema) {
     if (colNames.length ==0) {
       throw new IllegalArgumentException("Attempted to generate class with "
           + "no columns!");
     }
+
     StringBuilder sb = new StringBuilder();
     sb.append("// ORM class for table '" + tableName + "'\n");
     sb.append("// WARNING: This class is AUTO-GENERATED. "
@@ -1878,7 +1915,13 @@ public class ClassWriter {
     sb.append("import " + BlobRef.class.getCanonicalName() + ";\n");
     sb.append("import " + ClobRef.class.getCanonicalName() + ";\n");
     sb.append("import " + LargeObjectLoader.class.getCanonicalName() + ";\n");
-    sb.append("import " + SqoopRecord.class.getCanonicalName() + ";\n");
+
+    Class baseClass = SqoopRecord.class;
+    if (null != schema) {
+      sb.append("import org.apache.avro.Schema;\n");
+      baseClass = SqoopAvroRecord.class;
+    }
+    sb.append("import " + baseClass.getCanonicalName() + ";\n");
     sb.append("import java.sql.PreparedStatement;\n");
     sb.append("import java.sql.ResultSet;\n");
     sb.append("import java.sql.SQLException;\n");
@@ -1898,8 +1941,8 @@ public class ClassWriter {
     sb.append("\n");
 
     String className = tableNameInfo.getShortClassForTable(tableName);
-    sb.append("public class " + className + " extends SqoopRecord "
-        + " implements DBWritable, Writable {\n");
+    sb.append("public class " + className + " extends " + baseClass.getSimpleName()
+          + " implements DBWritable, Writable {\n");
     sb.append("  private final int PROTOCOL_VERSION = "
         + CLASS_WRITER_VERSION + ";\n");
     sb.append(
@@ -1918,6 +1961,10 @@ public class ClassWriter {
     generateGetFieldMap(columnTypes, colNames, sb);
     generateSetField(columnTypes, colNames, sb);
 
+    if (baseClass == SqoopAvroRecord.class) {
+      generateSqoopAvroRecordMethods(className, schema, sb);
+    }
+
     // TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a
     // WritableComparable
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index b77b1ea..26950cc 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -98,6 +98,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
   public static final String FMT_SEQUENCEFILE_ARG = "as-sequencefile";
   public static final String FMT_TEXTFILE_ARG = "as-textfile";
   public static final String FMT_AVRODATAFILE_ARG = "as-avrodatafile";
+  public static final String FMT_PARQUETFILE_ARG = "as-parquetfile";
   public static final String HIVE_IMPORT_ARG = "hive-import";
   public static final String HIVE_TABLE_ARG = "hive-table";
   public static final String HIVE_DATABASE_ARG = "hive-database";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/java/org/apache/sqoop/tool/ImportTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index a3a2d0d..54e618e 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -708,6 +708,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
         .withDescription("Imports data to Avro data files")
         .withLongOpt(FMT_AVRODATAFILE_ARG)
         .create());
+    importOpts.addOption(OptionBuilder
+        .withDescription("Imports data to Parquet files")
+        .withLongOpt(BaseSqoopTool.FMT_PARQUETFILE_ARG)
+        .create());
     importOpts.addOption(OptionBuilder.withArgName("n")
         .hasArg().withDescription("Use 'n' map tasks to import in parallel")
         .withLongOpt(NUM_MAPPERS_ARG)
@@ -923,6 +927,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
         out.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
       }
 
+      if (in.hasOption(FMT_PARQUETFILE_ARG)) {
+        out.setFileLayout(SqoopOptions.FileLayout.ParquetFile);
+      }
+
       if (in.hasOption(NUM_MAPPERS_ARG)) {
         out.setNumMappers(Integer.parseInt(in.getOptionValue(NUM_MAPPERS_ARG)));
       }
@@ -1020,8 +1028,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
             && options.getFileLayout() != SqoopOptions.FileLayout.TextFile
             && options.getConnectString().contains("jdbc:mysql://")) {
       throw new InvalidOptionsException(
-            "MySQL direct import currently supports only text output format."
-             + "Parameters --as-sequencefile and --as-avrodatafile are not "
+            "MySQL direct import currently supports only text output format. "
+             + "Parameters --as-sequencefile --as-avrodatafile and --as-parquetfile are not "
              + "supported with --direct params in MySQL case.");
     } else if (options.isDirect()
             && options.doHiveDropDelims()) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/java/org/apache/sqoop/util/AppendUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/util/AppendUtils.java b/src/java/org/apache/sqoop/util/AppendUtils.java
index 5eaaa95..b6bbc18 100644
--- a/src/java/org/apache/sqoop/util/AppendUtils.java
+++ b/src/java/org/apache/sqoop/util/AppendUtils.java
@@ -228,8 +228,17 @@ public class AppendUtils {
         } while (!fs.rename(fileStatus.getPath(), new Path(targetDir, destFilename.toString())));
 
         LOG.debug("Filename: " + sourceFilename + " repartitioned to: " + destFilename.toString());
-      } else {    // ignore everything else
-        LOG.debug("Filename: " + sourceFilename + " ignored");
+      } else {
+        // Generated Parquet files do not follow the pattern "part-m-([0-9]{5}).ext", so that these
+        // files cannot be moved to target directory expectedly. We simply check file extension.
+        boolean fileMoved = false;
+        if (sourceFilename.endsWith(".parquet")) {
+          Path targetFilename = new Path(targetDir, sourceFilename.toString());
+          fileMoved = fs.rename(fileStatus.getPath(), targetFilename);
+        }
+        if (!fileMoved) {    // ignore everything else
+          LOG.debug("Filename: " + sourceFilename + " ignored");
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/licenses/LICENSE-BIN.txt
----------------------------------------------------------------------
diff --git a/src/licenses/LICENSE-BIN.txt b/src/licenses/LICENSE-BIN.txt
index 4215d26..8cec1ba 100644
--- a/src/licenses/LICENSE-BIN.txt
+++ b/src/licenses/LICENSE-BIN.txt
@@ -372,6 +372,18 @@ For lib/avro-<version>.jar:
 
   The Apache License, Version 2.0
 
+For lib/kite-data-core-<version>.jar:
+
+  The Apache License, Version 2.0
+
+For lib/kite-data-mapreduce-<version>.jar:
+
+  The Apache License, Version 2.0
+
+For lib/kite-hadoop-compatibility-<version>.jar:
+
+  The Apache License, Version 2.0
+
 For lib/avro-ipc-<version>.jar:
 
   The Apache License, Version 2.0

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2e1e0942/src/test/com/cloudera/sqoop/TestParquetImport.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestParquetImport.java b/src/test/com/cloudera/sqoop/TestParquetImport.java
new file mode 100644
index 0000000..2224719
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/TestParquetImport.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop;
+
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.HsqldbTestServer;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetReader;
+import org.kitesdk.data.Datasets;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests --as-parquetfile.
+ */
+public class TestParquetImport extends ImportJobTestCase {
+
+  public static final Log LOG = LogFactory
+      .getLog(TestParquetImport.class.getName());
+
+  /**
+   * Create the argv to pass to Sqoop.
+   *
+   * @return the argv as an array of strings.
+   */
+  protected String[] getOutputArgv(boolean includeHadoopFlags,
+          String[] extraArgs) {
+    ArrayList<String> args = new ArrayList<String>();
+
+    if (includeHadoopFlags) {
+      CommonArgs.addHadoopFlags(args);
+    }
+
+    args.add("--table");
+    args.add(getTableName());
+    args.add("--connect");
+    args.add(HsqldbTestServer.getUrl());
+    args.add("--warehouse-dir");
+    args.add(getWarehouseDir());
+    args.add("--split-by");
+    args.add("INTFIELD1");
+    args.add("--as-parquetfile");
+    if (extraArgs != null) {
+      args.addAll(Arrays.asList(extraArgs));
+    }
+
+    return args.toArray(new String[args.size()]);
+  }
+
+  public void testParquetImport() throws IOException {
+    String[] types = {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
+        "VARBINARY(2)",};
+    String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
+    createTableWithColTypes(types, vals);
+
+    runImport(getOutputArgv(true, null));
+
+    Schema schema = getSchema();
+    assertEquals(Type.RECORD, schema.getType());
+    List<Field> fields = schema.getFields();
+    assertEquals(types.length, fields.size());
+    checkField(fields.get(0), "DATA_COL0", Type.BOOLEAN);
+    checkField(fields.get(1), "DATA_COL1", Type.INT);
+    checkField(fields.get(2), "DATA_COL2", Type.LONG);
+    checkField(fields.get(3), "DATA_COL3", Type.FLOAT);
+    checkField(fields.get(4), "DATA_COL4", Type.DOUBLE);
+    checkField(fields.get(5), "DATA_COL5", Type.STRING);
+    checkField(fields.get(6), "DATA_COL6", Type.BYTES);
+
+    DatasetReader<GenericRecord> reader = getReader();
+    try {
+      GenericRecord record1 = reader.next();
+      //assertNull(record1);
+      assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
+      assertEquals("DATA_COL1", 100, record1.get("DATA_COL1"));
+      assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2"));
+      assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3"));
+      assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4"));
+      assertEquals("DATA_COL5", "s", record1.get("DATA_COL5"));
+      Object object = record1.get("DATA_COL6");
+      assertTrue(object instanceof ByteBuffer);
+      ByteBuffer b = ((ByteBuffer) object);
+      assertEquals((byte) 1, b.get(0));
+      assertEquals((byte) 2, b.get(1));
+    } finally {
+      reader.close();
+    }
+  }
+
+  public void testOverrideTypeMapping() throws IOException {
+    String [] types = { "INT" };
+    String [] vals = { "10" };
+    createTableWithColTypes(types, vals);
+
+    String [] extraArgs = { "--map-column-java", "DATA_COL0=String"};
+    runImport(getOutputArgv(true, extraArgs));
+
+    Schema schema = getSchema();
+    assertEquals(Type.RECORD, schema.getType());
+    List<Field> fields = schema.getFields();
+    assertEquals(types.length, fields.size());
+    checkField(fields.get(0), "DATA_COL0", Type.STRING);
+
+    DatasetReader<GenericRecord> reader = getReader();
+    try {
+      GenericRecord record1 = reader.next();
+      assertEquals("DATA_COL0", "10", record1.get("DATA_COL0"));
+    } finally {
+      reader.close();
+    }
+  }
+
+  public void testFirstUnderscoreInColumnName() throws IOException {
+    String [] names = { "_NAME" };
+    String [] types = { "INT" };
+    String [] vals = { "1987" };
+    createTableWithColTypesAndNames(names, types, vals);
+
+    runImport(getOutputArgv(true, null));
+
+    Schema schema = getSchema();
+    assertEquals(Type.RECORD, schema.getType());
+    List<Field> fields = schema.getFields();
+    assertEquals(types.length, fields.size());
+    checkField(fields.get(0), "__NAME", Type.INT);
+
+    DatasetReader<GenericRecord> reader = getReader();
+    try {
+      GenericRecord record1 = reader.next();
+      assertEquals("__NAME", 1987, record1.get("__NAME"));
+    } finally {
+      reader.close();
+    }
+  }
+
+  public void testNullableParquetImport() throws IOException, SQLException {
+    String [] types = { "INT" };
+    String [] vals = { null };
+    createTableWithColTypes(types, vals);
+
+    runImport(getOutputArgv(true, null));
+
+    DatasetReader<GenericRecord> reader = getReader();
+    try {
+      GenericRecord record1 = reader.next();
+      assertNull(record1.get("DATA_COL0"));
+    } finally {
+      reader.close();
+    }
+  }
+
+  private Schema getSchema() {
+    return getDataset().getDescriptor().getSchema();
+  }
+
+  private DatasetReader<GenericRecord> getReader() {
+    return getDataset().newReader();
+  }
+
+  private Dataset<GenericRecord> getDataset() {
+    String uri = "dataset:file:" + getTablePath();
+    return Datasets.load(uri, GenericRecord.class);
+  }
+
+  private void checkField(Field field, String name, Type type) {
+    assertEquals(name, field.name());
+    assertEquals(Type.UNION, field.schema().getType());
+    assertEquals(type, field.schema().getTypes().get(0).getType());
+    assertEquals(Type.NULL, field.schema().getTypes().get(1).getType());
+  }
+
+}