You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/06/07 16:34:36 UTC

[3/3] git commit: SQOOP-931: Integrate HCatalog with Sqoop

SQOOP-931: Integrate HCatalog with Sqoop

(Venkat Ranganathan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/5e88d43b
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/5e88d43b
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/5e88d43b

Branch: refs/heads/trunk
Commit: 5e88d43b5af024c1b9fd82029f7de4c325dcf009
Parents: b07906a
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Jun 7 07:33:21 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Jun 7 07:33:21 2013 -0700

----------------------------------------------------------------------
 bin/configure-sqoop                                |   30 +
 bin/configure-sqoop.cmd                            |    9 +
 build.xml                                          |   21 +-
 ivy.xml                                            |   18 +-
 ivy/ivysettings.xml                                |   15 +-
 src/docs/user/SqoopUserGuide.txt                   |    2 +
 src/docs/user/hcatalog.txt                         |  313 ++++
 src/java/org/apache/sqoop/SqoopOptions.java        |  107 ++-
 .../sqoop/config/ConfigurationConstants.java       |   17 +
 src/java/org/apache/sqoop/hive/HiveImport.java     |   17 +
 src/java/org/apache/sqoop/manager/ConnManager.java |   64 +
 .../sqoop/mapreduce/DataDrivenImportJob.java       |   16 +
 .../org/apache/sqoop/mapreduce/ExportJobBase.java  |   20 +-
 .../org/apache/sqoop/mapreduce/ImportJobBase.java  |   13 +
 .../org/apache/sqoop/mapreduce/JdbcExportJob.java  |   13 +-
 src/java/org/apache/sqoop/mapreduce/JobBase.java   |    4 +-
 .../mapreduce/hcat/SqoopHCatExportFormat.java      |  138 ++
 .../mapreduce/hcat/SqoopHCatExportMapper.java      |  349 +++++
 .../mapreduce/hcat/SqoopHCatImportMapper.java      |  343 ++++
 .../sqoop/mapreduce/hcat/SqoopHCatInputSplit.java  |  109 ++
 .../mapreduce/hcat/SqoopHCatRecordReader.java      |  153 ++
 .../sqoop/mapreduce/hcat/SqoopHCatUtilities.java   | 1215 +++++++++++++++
 src/java/org/apache/sqoop/tool/BaseSqoopTool.java  |  231 +++-
 src/java/org/apache/sqoop/tool/CodeGenTool.java    |    3 +
 src/java/org/apache/sqoop/tool/ExportTool.java     |    9 +-
 src/java/org/apache/sqoop/tool/ImportTool.java     |   14 +-
 src/perftest/ExportStressTest.java                 |    2 +-
 src/test/com/cloudera/sqoop/ThirdPartyTests.java   |    7 +
 .../com/cloudera/sqoop/hive/TestHiveImport.java    |   10 +
 .../cloudera/sqoop/testutil/BaseSqoopTestCase.java |    2 +-
 .../cloudera/sqoop/testutil/ExportJobTestCase.java |   14 +-
 .../org/apache/sqoop/hcat/HCatalogExportTest.java  |  377 +++++
 .../org/apache/sqoop/hcat/HCatalogImportTest.java  |  712 +++++++++
 .../org/apache/sqoop/hcat/HCatalogTestUtils.java   |  855 ++++++++++
 .../org/apache/sqoop/hcat/TestHCatalogBasic.java   |  251 +++
 testdata/hcatalog/conf/hive-log4j.properties       |   87 +
 testdata/hcatalog/conf/hive-site.xml               |   26 +
 testdata/hcatalog/conf/log4j.properties            |   55 +
 38 files changed, 5596 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/bin/configure-sqoop
----------------------------------------------------------------------
diff --git a/bin/configure-sqoop b/bin/configure-sqoop
index 61ff3f2..178720d 100755
--- a/bin/configure-sqoop
+++ b/bin/configure-sqoop
@@ -54,9 +54,22 @@ if [ -z "${HADOOP_MAPRED_HOME}" ]; then
     HADOOP_MAPRED_HOME=/usr/lib/hadoop-mapreduce
   fi
 fi
+
+# We are setting HADOOP_HOME to HADOOP_COMMON_HOME if it is not set
+# so that hcat script works correctly on BigTop
+if [ -z "${HADOOP_HOME}" ]; then
+  if [ -n "${HADOOP_COMMON_HOME}" ]; then
+     HADOOP_HOME=${HADOOP_COMMON_HOME}
+     export HADOOP_HOME
+  fi
+fi
+
 if [ -z "${HBASE_HOME}" ]; then
   HBASE_HOME=/usr/lib/hbase
 fi
+if [ -z "${HCAT_HOME}" ]; then
+  HCAT_HOME=/usr/lib/hcatalog
+fi
 
 # Check: If we can't find our dependencies, give up here.
 if [ ! -d "${HADOOP_COMMON_HOME}" ]; then
@@ -76,6 +89,12 @@ if [ ! -d "${HBASE_HOME}" ]; then
   echo 'Please set $HBASE_HOME to the root of your HBase installation.'
 fi
 
+## Moved to be a runtime check in sqoop.
+if [ ! -d "${HCAT_HOME}" ]; then
+  echo "Warning: $HCAT_HOME does not exist! HCatalog jobs will fail."
+  echo 'Please set $HCAT_HOME to the root of your HCatalog installation.'
+fi
+
 # Where to find the main Sqoop jar
 SQOOP_JAR_DIR=$SQOOP_HOME
 
@@ -106,6 +125,15 @@ if [ -e "$HBASE_HOME/bin/hbase" ]; then
   SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
 fi
 
+# Add HCatalog to dependency list
+if [ -e "${HCAT_HOME}/bin/hcat" ]; then
+  TMP_SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:`${HCAT_HOME}/bin/hcat -classpath`
+  if [ -z "${HIVE_CONF_DIR}" ]; then
+    TMP_SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}:${HIVE_CONF_DIR}
+  fi
+  SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
+fi
+
 ZOOCFGDIR=${ZOOCFGDIR:-/etc/zookeeper}
 if [ -d "${ZOOCFGDIR}" ]; then
   SQOOP_CLASSPATH=$ZOOCFGDIR:$SQOOP_CLASSPATH
@@ -136,4 +164,6 @@ export HADOOP_CLASSPATH
 export HADOOP_COMMON_HOME
 export HADOOP_MAPRED_HOME
 export HBASE_HOME
+export HCAT_HOME
+export HIVE_CONF_DIR
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/bin/configure-sqoop.cmd
----------------------------------------------------------------------
diff --git a/bin/configure-sqoop.cmd b/bin/configure-sqoop.cmd
index f5fd608..ec57e37 100644
--- a/bin/configure-sqoop.cmd
+++ b/bin/configure-sqoop.cmd
@@ -55,6 +55,15 @@ if not defined HADOOP_MAPRED_HOME (
     exit /b 1
   )
 )
+
+:: We are setting HADOOP_HOME to HADOOP_COMMON_HOME if it is not set
+:: so that hcat script works correctly on BigTop
+if not defined HADOOP_HOME (
+  if defined HADOOP_COMMON_HOME (
+    set HADOOP_HOME=%HADOOP_COMMON_HOME%
+  )
+)
+
 :: Check for HBase dependency
 if not defined HBASE_HOME (
   if defined HBASE_VERSION (

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 636c103..b4b08e5 100644
--- a/build.xml
+++ b/build.xml
@@ -51,6 +51,7 @@
       <property name="hbase.version" value="0.90.3-cdh3u1" />
       <property name="zookeeper.version" value="3.3.3-cdh3u1" />
       <property name="hadoop.version.full" value="0.20" />
+      <property name="hcatalog.version" value="0.11.0" />
     </then>
 
     <elseif>
@@ -60,6 +61,7 @@
         <property name="hbase.version" value="0.92.0" />
         <property name="zookeeper.version" value="3.4.2" />
         <property name="hadoop.version.full" value="0.23" />
+        <property name="hcatalog.version" value="0.11.0" />
       </then>
     </elseif>
 
@@ -70,6 +72,7 @@
         <property name="hbase.version" value="0.92.0" />
         <property name="zookeeper.version" value="3.4.2" />
         <property name="hadoop.version.full" value="1.0.0" />
+        <property name="hcatalog.version" value="0.11.0" />
       </then>
     </elseif>
 
@@ -80,6 +83,7 @@
         <property name="hbase.version" value="0.94.2" />
         <property name="zookeeper.version" value="3.4.2" />
         <property name="hadoop.version.full" value="2.0.4-alpha" />
+        <property name="hcatalog.version" value="0.11.0" />
       </then>
     </elseif>
 
@@ -600,6 +604,7 @@
       <tarfileset dir="${build.dir}" mode="755">
         <include name="${bin.artifact.name}/bin/*" />
         <include name="${bin.artifact.name}/testdata/hive/bin/*" />
+        <include name="${bin.artifact.name}/testdata/hcatalog/conf/*" />
         <include name="${bin.artifact.name}/**/*.sh" />
       </tarfileset>
     </tar>
@@ -643,12 +648,14 @@
       <tarfileset dir="${build.dir}" mode="664">
         <exclude name="${src.artifact.name}/bin/*" />
         <exclude name="${src.artifact.name}/testdata/hive/bin/*" />
+        <exclude name="${src.artifact.name}/testdata/hcatalog/conf/*" />
         <exclude name="${src.artifact.name}/**/*.sh" />
         <include name="${src.artifact.name}/**" />
       </tarfileset>
       <tarfileset dir="${build.dir}" mode="755">
         <include name="${src.artifact.name}/bin/*" />
         <include name="${src.artifact.name}/testdata/hive/bin/*" />
+        <include name="${src.artifact.name}/testdata/hcatalog/conf/*" />
         <include name="${src.artifact.name}/**/*.sh" />
       </tarfileset>
     </tar>
@@ -658,6 +665,9 @@
   <target name="test-prep" depends="test-prep-normal,test-prep-thirdparty,
                                     test-prep-manual"/>
 
+  <path id="hcatalog.conf.dir">
+     <pathelement location="${basedir}/testdata/hcatalog/conf"/>
+  </path>
   <target name="test-eval-condition">
     <condition property="thirdparty_or_manual">
       <or>
@@ -667,6 +677,8 @@
     </condition>
   </target>
 
+
+
   <target name="test-prep-normal" unless="thirdparty_or_manual"
                                   depends="test-eval-condition">
     <!-- Set this to run all the "standard" tests -->
@@ -712,7 +724,7 @@
     <delete dir="${test.log.dir}"/>
     <mkdir dir="${test.log.dir}"/>
     <delete dir="${build.test}/data"/>
-    <mkdir dir="${build.test}/data" />
+    <mkdir dir="${build.test}/data/sqoop" />
     <mkdir dir="${cobertura.class.dir}" />
     <junit
       printsummary="yes" showoutput="${test.output}"
@@ -803,10 +815,17 @@
       <sysproperty key="java.security.krb5.kdc"
                    value="${java.security.krb5.kdc}"/>
 
+      <!-- Location of Hive logs -->
+      <!--<sysproperty key="hive.log.dir"
+                   value="${test.build.data}/sqoop/logs"/> -->
+
       <classpath>
         <!-- instrumented classes go ahead of normal classes -->
         <pathelement location="${cobertura.class.dir}" />
 
+        <!-- Location of hive-site xml and other hadoop config files -->
+        <path refid="hcatalog.conf.dir" />
+
         <!-- main classpath here. -->
         <path refid="test.classpath" />
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 1fa4dd1..750adfc 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -37,10 +37,15 @@ under the License.
       extends="runtime"
       description="artifacts needed to compile/test the application"/>
     <conf name="hbase" visibility="private" />
-    <conf name="hadoop23" visibility="private" extends="common,runtime,hbase" />
-    <conf name="hadoop20" visibility="private" extends="common,runtime,hbase" />
-    <conf name="hadoop100" visibility="private" extends="common,runtime,hbase" />
-    <conf name="hadoop200" visibility="private" extends="common,runtime,hbase" />
+    <conf name="hcatalog" visibility="private" />
+    <conf name="hadoop23" visibility="private"
+      extends="common,runtime,hbase,hcatalog" />
+    <conf name="hadoop20" visibility="private"
+      extends="common,runtime,hbase,hcatalog" />
+    <conf name="hadoop100" visibility="private"
+      extends="common,runtime,hbase,hcatalog" />
+    <conf name="hadoop200" visibility="private"
+      extends="common,runtime,hbase,hcatalog" />
 
     <conf name="test" visibility="private" extends="common,runtime"/>
     <conf name="hadoop23test" visibility="private" extends="test,hadoop23" />
@@ -172,6 +177,11 @@ under the License.
       <exclude org="com.cloudera.cdh" module="zookeeper-ant" />
     </dependency>
 
+    <dependency org="org.apache.hcatalog" name="hcatalog-core"
+      rev="${hcatalog.version}" conf="hcatalog->default">
+      <artifact name="hcatalog-core" type="jar"/>
+    </dependency>
+
     <exclude org="org.apache.hadoop" module="avro"/>
     <exclude org="commons-daemon" module="commons-daemon" />
     <exclude type="pom" />

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/ivy/ivysettings.xml
----------------------------------------------------------------------
diff --git a/ivy/ivysettings.xml b/ivy/ivysettings.xml
index c4cc561..2920c89 100644
--- a/ivy/ivysettings.xml
+++ b/ivy/ivysettings.xml
@@ -42,6 +42,9 @@ under the License.
   <property name="releases.cloudera.com"
       value="https://repository.cloudera.com/content/repositories/releases/"
       override="false"/>
+  <property name="www.datanucleus.org"
+      value="http://www.datanucleus.org/downloads/maven2/"
+      override="false"/>
   <property name="maven2.pattern"
       value="[organisation]/[module]/[revision]/[artifact]-[revision](-[classifier])"/>
   <property name="repo.dir" value="${user.home}/.m2/repository"/>
@@ -52,6 +55,8 @@ under the License.
   <resolvers>
     <ibiblio name="maven2" root="${repo.maven.org}"
         pattern="${maven2.pattern.ext}" m2compatible="true"/>
+    <ibiblio name="datanucleus" root="${www.datanucleus.org}"
+        pattern="${maven2.pattern.ext}" m2compatible="true"/>
     <ibiblio name="cloudera-releases" root="${releases.cloudera.com}"
         pattern="${maven2.pattern.ext}" m2compatible="true"/>
     <ibiblio name="apache-snapshot" root="${snapshot.apache.org}"
@@ -67,16 +72,18 @@ under the License.
     <chain name="default" dual="true" checkmodified="true"
         changingPattern=".*SNAPSHOT">
       <resolver ref="fs"/>
-      <resolver ref="apache-snapshot"/> 
+      <resolver ref="apache-snapshot"/>
+      <resolver ref="datanucleus"/>
       <resolver ref="cloudera-releases"/>
-      <resolver ref="cloudera-staging"/> 
+      <resolver ref="cloudera-staging"/>
       <resolver ref="maven2"/>
     </chain>
 
     <chain name="internal" dual="true">
       <resolver ref="fs"/>
-      <resolver ref="apache-snapshot"/> 
-      <resolver ref="cloudera-staging"/> 
+      <resolver ref="apache-snapshot"/>
+      <resolver ref="datanucleus"/>
+      <resolver ref="cloudera-staging"/>
       <resolver ref="maven2"/>
     </chain>
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/docs/user/SqoopUserGuide.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/SqoopUserGuide.txt b/src/docs/user/SqoopUserGuide.txt
index 01ac1cf..2e88887 100644
--- a/src/docs/user/SqoopUserGuide.txt
+++ b/src/docs/user/SqoopUserGuide.txt
@@ -72,6 +72,8 @@ include::help.txt[]
 
 include::version.txt[]
 
+include::hcatalog.txt[]
+
 include::compatibility.txt[]
 
 include::connectors.txt[]

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/docs/user/hcatalog.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hcatalog.txt b/src/docs/user/hcatalog.txt
new file mode 100644
index 0000000..b8e495e
--- /dev/null
+++ b/src/docs/user/hcatalog.txt
@@ -0,0 +1,313 @@
+
+////
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+////
+
+Sqoop-HCatalog Integration
+--------------------------
+
+HCatalog Background
+~~~~~~~~~~~~~~~~~~~
+
+HCatalog is a table and storage management service for Hadoop that enables
+users with different data processing tools – Pig, MapReduce, and Hive –
+to more easily read and write data on the grid. HCatalog’s table abstraction
+presents users with a relational view of data in the Hadoop distributed
+file system (HDFS) and ensures that users need not worry about where or
+in what format their data is stored: RCFile format, text files, or
+SequenceFiles.
+
+HCatalog supports reading and writing files in any format for which a Hive
+SerDe (serializer-deserializer) has been written. By default, HCatalog
+supports RCFile, CSV, JSON, and SequenceFile formats. To use a custom
+format, you must provide the InputFormat and OutputFormat as well as the SerDe.
+
+The ability of HCatalog to abstract various storage formats is used in
+providing the RCFile (and future file types) support to Sqoop.
+
+Exposing HCatalog Tables to Sqoop
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+HCatalog integration with Sqoop is patterned on an existing feature set that
+supports Avro and Hive tables. Five new command line options are introduced,
+and some command line options defined for Hive are reused.
+
+New Command Line Options
+^^^^^^^^^^^^^^^^^^^^^^^^
+
++--hcatalog-database+::
+Specifies the database name for the HCatalog table. If not specified,
+the default database name +default+ is used. Providing the
++--hcatalog-database+ option without +--hcatalog-table+ is an error.
+This is not a required option.
+
++--hcatalog-table+::
+The argument value for this option is the HCatalog tablename.
+The presence of the +--hcatalog-table+ option signifies that the import
+or export job is done using HCatalog tables, and it is a required option for
+HCatalog jobs.
+
++--hcatalog-home+::
+The home directory for the HCatalog installation. The directory is
+expected to have a +lib+ subdirectory and a +share/hcatalog+ subdirectory
+with necessary HCatalog libraries. If not specified, the system property
++hcatalog.home+ will be checked and failing that, a system environment
+variable +HCAT_HOME+ will be checked.  If none of these are set, the
+default value will be used and currently the default is set to
++/usr/lib/hcatalog+.
+This is not a required option.
+
++--create-hcatalog-table+::
+
+This option specifies whether an HCatalog table should be created
+automatically when importing data. By default, HCatalog tables are assumed
+to exist. The table name will be the same as the database table name
+translated to lower case. Further described in +Automatic Table Creation+
+below.
+
++--hcatalog-storage-stanza+::
+
+This option specifies the storage stanza to be appended to the table.
+Further described in +Automatic Table Creation+ below.
+
+Supported Sqoop Hive Options
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The following Sqoop options are also used along with the +--hcatalog-table+
+option to provide additional input to the HCatalog jobs. Some of the existing
+Hive import job options are reused with HCatalog jobs instead of creating
+HCatalog-specific options for the same purpose.
+
++--map-column-hive+::
+This option maps a database column to HCatalog with a specific HCatalog
+type.
+
++--hive-home+::
+The Hive home location.
+
++--hive-partition-key+::
+Used for static partitioning filter. The partitioning key should be of
+type STRING. There can be only one static partitioning key.
+
++--hive-partition-value+::
+The value associated with the partition.
+
+Unsupported Sqoop Options
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Unsupported Sqoop Hive Import Options
++++++++++++++++++++++++++++++++++++++
+
+The following Sqoop Hive import options are not supported with HCatalog jobs.
+
+* +--hive-import+
+* +--hive-overwrite+
+
+Unsupported Sqoop Export and Import Options
++++++++++++++++++++++++++++++++++++++++++++
+
+The following Sqoop export and import options are not supported with HCatalog jobs.
+
+* +--direct+
+* +--export-dir+
+* +--target-dir+
+* +--warehouse-dir+
+* +--append+
+* +--as-sequencefile+
+* +--as-avrofile+
+
+Ignored Sqoop Options
+^^^^^^^^^^^^^^^^^^^^^
+
+The following options are ignored with HCatalog jobs.
+
+* All input delimiter options are ignored.
+
+* Output delimiters are generally ignored unless either
++--hive-drop-import-delims+ or +--hive-delims-replacement+ is used. When the
++--hive-drop-import-delims+ or +--hive-delims-replacement+ option is
+specified, all +CHAR+ type database table columns will be post-processed
+to either remove or replace the delimiters, respectively. See +Delimited Text
+Formats and Field and Line Delimiter Characters+ below. This is only needed
+if the HCatalog table uses text formats.
+
+Automatic Table Creation
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+One of the key features of Sqoop is to manage and create the table metadata
+when importing into Hadoop. HCatalog import jobs also provide for this
+feature with the option +--create-hcatalog-table+. Furthermore, one of the
+important benefits of the HCatalog integration is to provide storage
+agnosticism to Sqoop data movement jobs. To provide for that feature,
+HCatalog import jobs provide an option that lets a user specifiy the
+storage format for the created table.
+
+The option +--create-hcatalog-table+ is used as an indicator that a table
+has to be created as part of the HCatalog import job.  If the option 
++--create-hcatalog-table+ is specified and the table exists, then the
+table creation will fail and the job will be aborted.
+
+The option +--hcatalog-storage-stanza+ can be used to specify the storage
+format of the newly created table. The default value for this option is
++stored as rcfile+. The value specified for this option is assumed to be a
+valid Hive storage format expression. It will be appended to the +create table+
+command generated by the HCatalog import job as part of automatic table
+creation. Any error in the storage stanza will cause the table creation to
+fail and the import job will be aborted.
+
+Any additional resources needed to support the storage format referenced in
+the option +--hcatalog-storage-stanza+ should be provided to the job either
+by placing them in +$HIVE_HOME/lib+ or by providing them in +HADOOP_CLASSPATH+
+and +LIBJAR+ files.
+
+If the option +--hive-partition-key+ is specified, then the value of this
+option is used as the partitioning key for the newly created table. Only
+one partitioning key can be specified with this option.
+
+Object names are mapped to the lowercase equivalents as specified below
+when mapped to an HCatalog table. This includes the table name (which
+is the same as the external store table name converted to lower case)
+and field names.
+
+Delimited Text Formats and Field and Line Delimiter Characters
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+HCatalog supports delimited text format as one of the table storage formats.
+But when delimited text is used and the imported data has fields that contain
+those delimiters, then the data may be parsed into a different number of
+fields and records by Hive, thereby losing data fidelity.
+
+For this case, one of these existing Sqoop import options can be used:
+
+* +--hive-delims-replacement+
+
+* +--hive-drop-import-delims+
+
+If either of these options is provided for import, then any column of type
+STRING will be formatted with the Hive delimiter processing and then written
+to the HCatalog table.
+
+HCatalog Table Requirements
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The HCatalog table should be created before using it as part of a Sqoop job
+if the default table creation options (with optional storage stanza) are not
+sufficient. All storage formats supported by HCatalog can be used with the
+creation of the HCatalog tables. This makes this feature readily adopt new
+storage formats that come into the Hive project, such as ORC files.
+
+Support for Partitioning
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+The Sqoop HCatalog feature supports the following table types:
+
+* Unpartitioned tables
+
+* Partitioned tables with a static partitioning key specified
+
+* Partitioned tables with dynamic partition keys from the database
+result set
+
+* Partitioned tables with a combination of a static key and additional
+dynamic partitioning keys
+
+Schema Mapping
+~~~~~~~~~~~~~~
+
+Sqoop currently does not support column name mapping. However, the user
+is allowed to override the type mapping. Type mapping loosely follows
+the Hive type mapping already present in Sqoop except that SQL types
+“FLOAT” and “REAL” are mapped to HCatalog type “float”. In the Sqoop type
+mapping for Hive, these two are mapped to “double”. Type mapping is primarily
+used for checking the column definition correctness only and can be overridden
+with the --map-column-hive option.
+
+All types except binary are assignable to a String type.
+
+Any field of number type (int, shortint, tinyint, bigint and bigdecimal,
+float and double) is assignable to another field of any number type during
+exports and imports. Depending on the precision and scale of the target type
+of assignment, truncations can occur.
+
+Furthermore, date/time/timestamps are mapped to string (the full
+date/time/timestamp representation) or bigint (the number of milliseconds
+since epoch) during imports and exports.
+
+BLOBs and CLOBs are only supported for imports. The BLOB/CLOB objects when
+imported are stored in a Sqoop-specific format and knowledge of this format
+is needed for processing these objects in a Pig/Hive job or another Map Reduce
+job.
+
+Database column names are mapped to their lowercase equivalents when mapped
+to the HCatalog fields. Currently, case-sensitive database object names are
+not supported.
+
+Projection of a set of columns from a table to an HCatalog table or loading
+to a column projection is allowed, subject to table constraints. The dynamic
+partitioning columns, if any, must be part of the projection when importing
+data into HCatalog tables.
+
+Dynamic partitioning fields should be mapped to database columns that are
+defined with the NOT NULL attribute (although this is not validated). A null
+value during import for a dynamic partitioning column will abort the Sqoop
+job.
+
+Support for HCatalog Data Types
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+All the primitive HCatalog types are supported. Currently all the complex
+HCatalog types are unsupported.
+
+BLOB/CLOB database types are only supported for imports.
+
+Providing Hive and HCatalog Libraries for the Sqoop Job
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+With the support for HCatalog added to Sqoop, any HCatalog job depends on a
+set of jar files being available both on the Sqoop client host and where the
+Map/Reduce tasks run. To run HCatalog jobs, the environment variable
++HADOOP_CLASSPATH+ must be set up as shown below before launching the Sqoop
+HCatalog jobs.
+
++HADOOP_CLASSPATH=$(hcat -classpath)+
++export HADOOP_CLASSPATH+
+
+
+The necessary HCatalog dependencies will be copied to the distributed cache
+automatically by the Sqoop job.
+
+Examples
+~~~~~~~~
+
+Create an HCatalog table, such as:
+
++hcat -e "create table txn(txn_date string, cust_id string, amount float,
+store_id int) partitioned by (cust_id string) stored as rcfile;"+
+
+
+Then Sqoop import and export of the "txn" HCatalog table can be invoked as
+follows:
+
+Import
+~~~~~~
+
++$SQOOP_HOME/bin/sqoop import --connect <jdbc-url> -table <table-name> --hcatalog-table txn <other sqoop options>+
+
+Export
+~~~~~~
+
++$SQOOP_HOME/bin/sqoop export --connect <jdbc-url> -table <table-name> --hcatalog-table txn <other sqoop options>+

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/SqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index f18d43e..4be6a6a 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -59,6 +59,10 @@ public class SqoopOptions implements Cloneable {
   public static final String METASTORE_PASSWORD_KEY =
       "sqoop.metastore.client.record.password";
 
+  // Default hive and hcat locations.
+  public static final String DEF_HIVE_HOME = "/usr/lib/hive";
+  public static final String DEF_HCAT_HOME = "/usr/lib/hcatalog";
+
   public static final boolean METASTORE_PASSWORD_DEFAULT = false;
 
   /**
@@ -151,6 +155,15 @@ public class SqoopOptions implements Cloneable {
   private String hiveDelimsReplacement;
   @StoredAsProperty("hive.partition.key") private String hivePartitionKey;
   @StoredAsProperty("hive.partition.value") private String hivePartitionValue;
+  @StoredAsProperty("hcatalog.table.name")
+  private String hCatTableName;
+  @StoredAsProperty("hcatalog.database.name")
+  private String hCatDatabaseName;
+  @StoredAsProperty("hcatalog.create.table")
+  private boolean hCatCreateTable;
+  @StoredAsProperty("hcatalog.storage.stanza")
+  private String hCatStorageStanza;
+  private String hCatHome; // not serialized to metastore.
 
   // User explicit mapping of types
   private Properties mapColumnJava; // stored as map.colum.java
@@ -197,7 +210,9 @@ public class SqoopOptions implements Cloneable {
 
   private DelimiterSet inputDelimiters; // codegen.input.delimiters.
   private DelimiterSet outputDelimiters; // codegen.output.delimiters.
-  private boolean areDelimsManuallySet;
+
+  private boolean areOutputDelimsManuallySet;
+  private boolean areInputDelimsManuallySet;
 
   private Configuration conf;
 
@@ -580,7 +595,8 @@ public class SqoopOptions implements Cloneable {
 
     // Delimiters were previously memoized; don't let the tool override
     // them with defaults.
-    this.areDelimsManuallySet = true;
+    this.areOutputDelimsManuallySet = true;
+    this.areInputDelimsManuallySet = true;
 
     // If we loaded true verbose flag, we need to apply it
     if (this.verbose) {
@@ -804,7 +820,21 @@ public class SqoopOptions implements Cloneable {
   public static String getHiveHomeDefault() {
     // Set this with $HIVE_HOME, but -Dhive.home can override.
     String hiveHome = System.getenv("HIVE_HOME");
-    return System.getProperty("hive.home", hiveHome);
+    hiveHome = System.getProperty("hive.home", hiveHome);
+    if (hiveHome == null) {
+      hiveHome = DEF_HIVE_HOME;
+    }
+    return hiveHome;
+  }
+
+  public static String getHCatHomeDefault() {
+    // Set this with $HCAT_HOME, but -Dhcatalog.home can override.
+    String hcatHome = System.getenv("HCAT_HOME");
+    hcatHome = System.getProperty("hcat.home", hcatHome);
+    if (hcatHome == null) {
+      hcatHome = DEF_HCAT_HOME;
+    }
+    return hcatHome;
   }
 
   private void initDefaults(Configuration baseConfiguration) {
@@ -813,6 +843,7 @@ public class SqoopOptions implements Cloneable {
     this.hadoopMapRedHome = System.getenv("HADOOP_MAPRED_HOME");
 
     this.hiveHome = getHiveHomeDefault();
+    this.hCatHome = getHCatHomeDefault();
 
     this.inputDelimiters = new DelimiterSet(
         DelimiterSet.NULL_CHAR, DelimiterSet.NULL_CHAR,
@@ -834,7 +865,8 @@ public class SqoopOptions implements Cloneable {
     this.jarDirIsAuto = true;
     this.layout = FileLayout.TextFile;
 
-    this.areDelimsManuallySet = false;
+    this.areOutputDelimsManuallySet = false;
+    this.areInputDelimsManuallySet = false;
 
     this.numMappers = DEFAULT_NUM_MAPPERS;
     this.useCompression = false;
@@ -1263,6 +1295,47 @@ public class SqoopOptions implements Cloneable {
     this.failIfHiveTableExists = fail;
   }
 
+  // HCatalog support
+  public void setHCatTableName(String ht) {
+    this.hCatTableName = ht;
+  }
+
+  public String getHCatTableName() {
+    return this.hCatTableName;
+  }
+
+  public void setHCatDatabaseName(String hd) {
+    this.hCatDatabaseName = hd;
+  }
+
+  public String getHCatDatabaseName() {
+    return this.hCatDatabaseName;
+  }
+
+
+  public String getHCatHome() {
+    return hCatHome;
+  }
+
+  public void setHCatHome(String home) {
+    this.hCatHome = home;
+  }
+
+  public boolean doCreateHCatalogTable() {
+    return hCatCreateTable;
+  }
+
+  public void setCreateHCatalogTable(boolean create) {
+    this.hCatCreateTable = create;
+  }
+
+  public void setHCatStorageStanza(String stanza) {
+    this.hCatStorageStanza = stanza;
+  }
+
+  public String getHCatStorageStanza() {
+    return this.hCatStorageStanza;
+  }
   /**
    * @return location where .java files go; guaranteed to end with '/'.
    */
@@ -1673,18 +1746,32 @@ public class SqoopOptions implements Cloneable {
     this.fetchSize = size;
   }
 
+  /*
+   * @return true if the output delimiters have been explicitly set by the user
+   */
+  public boolean explicitOutputDelims() {
+    return areOutputDelimsManuallySet;
+  }
+
   /**
-   * @return true if the delimiters have been explicitly set by the user.
+   * Flag the output delimiter settings as explicit user settings, or implicit.
    */
-  public boolean explicitDelims() {
-    return areDelimsManuallySet;
+  public void setExplicitOutputDelims(boolean explicit) {
+    this.areOutputDelimsManuallySet = explicit;
   }
 
   /**
-   * Flag the delimiter settings as explicit user settings, or implicit.
+   * @return true if the input delimiters have been explicitly set by the user.
    */
-  public void setExplicitDelims(boolean explicit) {
-    this.areDelimsManuallySet = explicit;
+  public boolean explicitInputDelims() {
+    return areInputDelimsManuallySet;
+  }
+
+  /**
+   * Flag the input delimiter settings as explicit user settings, or implicit.
+    */
+  public void setExplicitInputDelims(boolean explicit) {
+    this.areInputDelimsManuallySet = explicit;
   }
 
   public Configuration getConf() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/config/ConfigurationConstants.java b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
index 5354063..2070b63 100644
--- a/src/java/org/apache/sqoop/config/ConfigurationConstants.java
+++ b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
@@ -60,6 +60,18 @@ public final class ConfigurationConstants {
   public static final String PROP_MAPRED_JOB_TRACKER_ADDRESS =
                                 "mapred.job.tracker";
 
+   /**
+   * The Configuration property identifying the job tracker address (new).
+   */
+  public static final String PROP_MAPREDUCE_JOB_TRACKER_ADDRESS =
+    "mapreduce.jobtracker.address";
+
+  /**
+   * The Configuration property identifying the framework name. If set to YARN
+   * then we will not be in local mode.
+   */
+  public static final String PROP_MAPREDUCE_FRAMEWORK_NAME =
+    "mapreduce.framework.name";
   /**
    * The group name of task counters.
    */
@@ -78,6 +90,11 @@ public final class ConfigurationConstants {
   public static final String COUNTER_MAP_INPUT_RECORDS =
                                 "MAP_INPUT_RECORDS";
 
+  /**
+   * The name of the parameter for ToolRunner to set jars to add to distcache.
+   */
+  public static final String MAPRED_DISTCACHE_CONF_PARAM = "tmpjars";
+
   private ConfigurationConstants() {
     // Disable Explicit Object Creation
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/hive/HiveImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hive/HiveImport.java b/src/java/org/apache/sqoop/hive/HiveImport.java
index 838f083..02596a6 100644
--- a/src/java/org/apache/sqoop/hive/HiveImport.java
+++ b/src/java/org/apache/sqoop/hive/HiveImport.java
@@ -60,6 +60,15 @@ public class HiveImport {
   private ConnManager connManager;
   private Configuration configuration;
   private boolean generateOnly;
+  private static boolean testMode = false;
+
+  public static boolean getTestMode() {
+    return testMode;
+  }
+
+  public static void setTestMode(boolean mode) {
+    testMode = mode;
+  }
 
   /** Entry point through which Hive invocation should be attempted. */
   private static final String HIVE_MAIN_CLASS =
@@ -285,6 +294,14 @@ public class HiveImport {
       throws IOException {
     SubprocessSecurityManager subprocessSM = null;
 
+    if (testMode) {
+      // We use external mock hive process for test mode as
+      // HCatalog dependency would have brought in Hive classes.
+      LOG.debug("Using external Hive process in test mode.");
+      executeExternalHiveScript(filename, env);
+      return;
+    }
+
     try {
       Class cliDriverClass = Class.forName(HIVE_MAIN_CLASS);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/manager/ConnManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java
index a1ac38e..3549bda 100644
--- a/src/java/org/apache/sqoop/manager/ConnManager.java
+++ b/src/java/org/apache/sqoop/manager/ConnManager.java
@@ -164,6 +164,70 @@ public abstract class ConnManager {
     return HiveTypes.toHiveType(sqlType);
   }
 
+   /**
+   * Resolve a database-specific type to HCat data type. Largely follows Sqoop's
+   * hive translation.
+   * @param sqlType
+   *          sql type
+   * @return hcat type
+   */
+  public String toHCatType(int sqlType) {
+    switch (sqlType) {
+
+    // Ideally TINYINT and SMALLINT should be mapped to their
+    // HCat equivalents tinyint and smallint respectively
+    // But the Sqoop Java type conversion has them mapped to Integer
+    // Even though the referenced Java doc clearly recommends otherwise.
+    // Chaning this now can cause many of the sequence file usages to
+    // break as value class implementations will change. So, we
+    // just use the same behavior here.
+      case Types.SMALLINT:
+      case Types.TINYINT:
+      case Types.INTEGER:
+        return "int";
+
+      case Types.VARCHAR:
+      case Types.CHAR:
+      case Types.LONGVARCHAR:
+      case Types.NVARCHAR:
+      case Types.NCHAR:
+      case Types.LONGNVARCHAR:
+      case Types.DATE:
+      case Types.TIME:
+      case Types.TIMESTAMP:
+      case Types.CLOB:
+        return "string";
+
+      case Types.FLOAT:
+      case Types.REAL:
+        return "float";
+
+      case Types.NUMERIC:
+      case Types.DECIMAL:
+        return "string";
+
+      case Types.DOUBLE:
+        return "double";
+
+      case Types.BIT:
+      case Types.BOOLEAN:
+        return "boolean";
+
+      case Types.BIGINT:
+        return "bigint";
+
+      case Types.BINARY:
+      case Types.VARBINARY:
+      case Types.BLOB:
+      case Types.LONGVARBINARY:
+        return "binary";
+
+      default:
+        throw new IllegalArgumentException(
+          "Cannot convert SQL type to HCatalog type " + sqlType);
+    }
+  }
+
   /**
    * Resolve a database-specific type to Avro data type.
    * @param sqlType     sql type

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index ef1d363..5afd90c 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
 import org.apache.avro.Schema;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -30,6 +31,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.config.ConfigurationHelper;
 import com.cloudera.sqoop.lib.LargeObjectLoader;
@@ -63,6 +65,13 @@ public class DataDrivenImportJob extends ImportJobBase {
   @Override
   protected void configureMapper(Job job, String tableName,
       String tableClassName) throws IOException {
+    if (isHCatJob) {
+      LOG.info("Configuring mapper for HCatalog import job");
+      job.setOutputKeyClass(LongWritable.class);
+      job.setOutputValueClass(SqoopHCatUtilities.getImportValueClass());
+      job.setMapperClass(SqoopHCatUtilities.getImportMapperClass());
+      return;
+    }
     if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
       // For text files, specify these as the output types; for
       // other types, we just use the defaults.
@@ -82,6 +91,9 @@ public class DataDrivenImportJob extends ImportJobBase {
 
   @Override
   protected Class<? extends Mapper> getMapperClass() {
+    if (options.getHCatTableName() != null) {
+      return SqoopHCatUtilities.getImportMapperClass();
+    }
     if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
       return TextImportMapper.class;
     } else if (options.getFileLayout()
@@ -98,6 +110,10 @@ public class DataDrivenImportJob extends ImportJobBase {
   @Override
   protected Class<? extends OutputFormat> getOutputFormatClass()
       throws ClassNotFoundException {
+    if (isHCatJob) {
+      LOG.debug("Returning HCatOutputFormat for output format");
+      return SqoopHCatUtilities.getOutputFormatClass();
+    }
     if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
       return RawKeyTextOutputFormat.class;
     } else if (options.getFileLayout()

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
index 1065d0b..d0be570 100644
--- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 import org.apache.sqoop.util.LoggingUtils;
 import org.apache.sqoop.util.PerfCounters;
 import com.cloudera.sqoop.SqoopOptions;
@@ -57,7 +58,7 @@ public class ExportJobBase extends JobBase {
    * The (inferred) type of a file or group of files.
    */
   public enum FileType {
-    SEQUENCE_FILE, AVRO_DATA_FILE, UNKNOWN
+    SEQUENCE_FILE, AVRO_DATA_FILE, HCATALOG_MANAGED_FILE, UNKNOWN
   }
 
   public static final Log LOG = LogFactory.getLog(
@@ -80,6 +81,7 @@ public class ExportJobBase extends JobBase {
 
   protected ExportJobContext context;
 
+
   public ExportJobBase(final ExportJobContext ctxt) {
     this(ctxt, null, null, null);
   }
@@ -195,6 +197,9 @@ public class ExportJobBase extends JobBase {
    * @return the Path to the files we are going to export to the db.
    */
   protected Path getInputPath() throws IOException {
+    if (isHCatJob) {
+      return null;
+    }
     Path inputPath = new Path(context.getOptions().getExportDir());
     Configuration conf = options.getConf();
     inputPath = inputPath.makeQualified(FileSystem.get(conf));
@@ -207,7 +212,9 @@ public class ExportJobBase extends JobBase {
       throws ClassNotFoundException, IOException {
 
     super.configureInputFormat(job, tableName, tableClassName, splitByCol);
-    FileInputFormat.addInputPath(job, getInputPath());
+    if (!isHCatJob) {
+      FileInputFormat.addInputPath(job, getInputPath());
+    }
   }
 
   @Override
@@ -371,6 +378,12 @@ public class ExportJobBase extends JobBase {
       }
 
       propagateOptionsToJob(job);
+      if (isHCatJob) {
+        LOG.info("Configuring HCatalog for export job");
+        SqoopHCatUtilities hCatUtils = SqoopHCatUtilities.instance();
+        hCatUtils.configureHCat(options, job, cmgr, tableName,
+          job.getConfiguration());
+      }
       configureInputFormat(job, tableName, tableClassName, null);
       configureOutputFormat(job, tableName, tableClassName);
       configureMapper(job, tableName, tableClassName);
@@ -448,6 +461,9 @@ public class ExportJobBase extends JobBase {
   }
 
   protected FileType getInputFileType() {
+    if (isHCatJob) {
+      return FileType.HCATALOG_MANAGED_FILE;
+    }
     try {
       return getFileType(context.getOptions().getConf(), getInputPath());
     } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index 2465f3f..ab7f21e 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 import org.apache.sqoop.util.PerfCounters;
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.config.ConfigurationHelper;
@@ -92,6 +93,13 @@ public class ImportJobBase extends JobBase {
 
     job.setOutputFormatClass(getOutputFormatClass());
 
+    if (isHCatJob) {
+      LOG.debug("Configuring output format for HCatalog  import job");
+      SqoopHCatUtilities.configureImportOutputFormat(options, job,
+        getContext().getConnManager(), tableName, job.getConfiguration());
+      return;
+    }
+
     if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
       job.getConfiguration().set("mapred.output.value.class", tableClassName);
     }
@@ -149,6 +157,11 @@ public class ImportJobBase extends JobBase {
     perfCounters.startClock();
 
     boolean success = doSubmitJob(job);
+
+    if (isHCatJob) {
+      SqoopHCatUtilities.instance().invokeOutputCommitterForLocalMode(job);
+    }
+
     perfCounters.stopClock();
 
     Counters jobCounters = job.getCounters();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
index 20636a0..fee78e0 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ExportJobContext;
 import com.cloudera.sqoop.mapreduce.ExportJobBase;
@@ -65,7 +66,11 @@ public class JdbcExportJob extends ExportJobBase {
 
     super.configureInputFormat(job, tableName, tableClassName, splitByCol);
 
-    if (fileType == FileType.AVRO_DATA_FILE) {
+    if (isHCatJob) {
+      SqoopHCatUtilities.configureExportInputFormat(options, job,
+        context.getConnManager(), tableName, job.getConfiguration());
+      return;
+    } else if (fileType == FileType.AVRO_DATA_FILE) {
       LOG.debug("Configuring for Avro export");
       ConnManager connManager = context.getConnManager();
       Map<String, Integer> columnTypeInts;
@@ -93,6 +98,9 @@ public class JdbcExportJob extends ExportJobBase {
   @Override
   protected Class<? extends InputFormat> getInputFormatClass()
       throws ClassNotFoundException {
+    if (isHCatJob) {
+      return SqoopHCatUtilities.getInputFormatClass();
+    }
     if (fileType == FileType.AVRO_DATA_FILE) {
       return AvroInputFormat.class;
     }
@@ -101,6 +109,9 @@ public class JdbcExportJob extends ExportJobBase {
 
   @Override
   protected Class<? extends Mapper> getMapperClass() {
+    if (isHCatJob) {
+      return SqoopHCatUtilities.getExportMapperClass();
+    }
     switch (fileType) {
       case SEQUENCE_FILE:
         return SequenceFileExportMapper.class;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/JobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JobBase.java b/src/java/org/apache/sqoop/mapreduce/JobBase.java
index 0df1156..322df1c 100644
--- a/src/java/org/apache/sqoop/mapreduce/JobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/JobBase.java
@@ -56,6 +56,7 @@ public class JobBase {
   private Job mrJob;
 
   private ClassLoader prevClassLoader = null;
+  protected final boolean isHCatJob;
 
   public static final String PROPERTY_VERBOSE = "sqoop.verbose";
 
@@ -76,6 +77,7 @@ public class JobBase {
     this.mapperClass = mapperClass;
     this.inputFormatClass = inputFormatClass;
     this.outputFormatClass = outputFormatClass;
+    isHCatJob = options.getHCatTableName() != null;
   }
 
   /**
@@ -220,7 +222,7 @@ public class JobBase {
    */
   protected void loadJars(Configuration conf, String ormJarFile,
       String tableClassName) throws IOException {
- 
+
     boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
         || "local".equals(conf.get("mapred.job.tracker"));
     if (isLocal) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java
new file mode 100644
index 0000000..47febf7
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.sqoop.mapreduce.ExportInputFormat;
+
+/**
+ * A combined HCatInputFormat equivalent that allows us to generate the number
+ * of splits to the number of map tasks.
+ *
+ * The logic is simple. We get the list of splits for HCatInputFormat. If it is
+ * less than the number of mappers, all is good. Else, we sort the splits by
+ * size and assign them to each of the mappers in a simple scheme. After
+ * assigning the splits to each of the mapper, for the next round we start with
+ * the mapper that got the last split. That way, the size of the split is
+ * distributed in a more uniform fashion than a simple round-robin assignment.
+ */
+public class SqoopHCatExportFormat extends HCatInputFormat {
+  public static final Log LOG = LogFactory
+    .getLog(SqoopHCatExportFormat.class.getName());
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job)
+    throws IOException, InterruptedException {
+    List<InputSplit> hCatSplits = super.getSplits(job);
+    int hCatSplitCount = hCatSplits.size();
+    int expectedSplitCount = ExportInputFormat.getNumMapTasks(job);
+    if (expectedSplitCount == 0) {
+      expectedSplitCount = hCatSplitCount;
+    }
+    LOG.debug("Expected split count " + expectedSplitCount);
+    LOG.debug("HCatInputFormat provided split count " + hCatSplitCount);
+    // Sort the splits by length descending.
+
+    Collections.sort(hCatSplits, new Comparator<InputSplit>() {
+      @Override
+      public int compare(InputSplit is1, InputSplit is2) {
+        try {
+          return (int) (is2.getLength() - is1.getLength());
+        } catch (Exception e) {
+          LOG.warn("Exception caught while sorting Input splits " + e);
+        }
+        return 0;
+      }
+    });
+    List<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+
+    // The number of splits generated by HCatInputFormat is within
+    // our limits
+
+    if (hCatSplitCount <= expectedSplitCount) {
+      for (InputSplit split : hCatSplits) {
+        List<InputSplit> hcSplitList = new ArrayList<InputSplit>();
+        hcSplitList.add(split);
+        combinedSplits.add(new SqoopHCatInputSplit(hcSplitList));
+      }
+      return combinedSplits;
+    }
+    List<List<InputSplit>> combinedSplitList =
+      new ArrayList<List<InputSplit>>();
+    for (int i = 0; i < expectedSplitCount; i++) {
+      combinedSplitList.add(new ArrayList<InputSplit>());
+    }
+    boolean ascendingAssigment = true;
+
+    int lastSet = 0;
+    for (int i = 0; i < hCatSplitCount; ++i) {
+      int splitNum = i % expectedSplitCount;
+      int currentSet = i / expectedSplitCount;
+      if (currentSet != lastSet) {
+        ascendingAssigment = !ascendingAssigment;
+      }
+      if (ascendingAssigment) {
+        combinedSplitList.get(splitNum).add(hCatSplits.get(i));
+      } else {
+        combinedSplitList.
+          get(expectedSplitCount - 1 - splitNum).add(hCatSplits.get(i));
+      }
+      lastSet = currentSet;
+    }
+    for (int i = 0; i < expectedSplitCount; i++) {
+      SqoopHCatInputSplit sqoopSplit =
+        new SqoopHCatInputSplit(combinedSplitList.get(i));
+      combinedSplits.add(sqoopSplit);
+    }
+
+    return combinedSplits;
+
+  }
+
+  @Override
+  public RecordReader<WritableComparable, HCatRecord>
+    createRecordReader(InputSplit split,
+      TaskAttemptContext taskContext)
+      throws IOException, InterruptedException {
+    LOG.debug("Creating a SqoopHCatRecordReader");
+    return new SqoopHCatRecordReader(split, taskContext, this);
+  }
+
+  public RecordReader<WritableComparable, HCatRecord>
+    createHCatRecordReader(InputSplit split,
+      TaskAttemptContext taskContext)
+      throws IOException, InterruptedException {
+    LOG.debug("Creating a base HCatRecordReader");
+    return super.createRecordReader(split, taskContext);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
new file mode 100644
index 0000000..539cedf
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.sqoop.mapreduce.ExportJobBase;
+
+/**
+ * A mapper that works on combined hcat splits.
+ */
+public class SqoopHCatExportMapper
+    extends
+  AutoProgressMapper<WritableComparable, HCatRecord,
+  SqoopRecord, WritableComparable> {
+  public static final Log LOG = LogFactory
+    .getLog(SqoopHCatExportMapper.class.getName());
+  private InputJobInfo jobInfo;
+  private HCatSchema hCatFullTableSchema;
+  private List<HCatFieldSchema> hCatSchemaFields;
+
+  private SqoopRecord sqoopRecord;
+  private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
+  private static final String TIME_TYPE = "java.sql.Time";
+  private static final String DATE_TYPE = "java.sql.Date";
+  private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
+  private static final String FLOAT_TYPE = "Float";
+  private static final String DOUBLE_TYPE = "Double";
+  private static final String BYTE_TYPE = "Byte";
+  private static final String SHORT_TYPE = "Short";
+  private static final String INTEGER_TYPE = "Integer";
+  private static final String LONG_TYPE = "Long";
+  private static final String BOOLEAN_TYPE = "Boolean";
+  private static final String STRING_TYPE = "String";
+  private static final String BYTESWRITABLE =
+    "org.apache.hadoop.io.BytesWritable";
+  private static boolean debugHCatExportMapper = false;
+  private MapWritable colTypesJava;
+  private MapWritable colTypesSql;
+
+  @Override
+  protected void setup(Context context)
+    throws IOException, InterruptedException {
+    super.setup(context);
+
+    Configuration conf = context.getConfiguration();
+
+    colTypesJava = DefaultStringifier.load(conf,
+      SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_JAVA, MapWritable.class);
+    colTypesSql = DefaultStringifier.load(conf,
+      SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_SQL, MapWritable.class);
+    // Instantiate a copy of the user's class to hold and parse the record.
+
+    String recordClassName = conf.get(
+      ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
+    if (null == recordClassName) {
+      throw new IOException("Export table class name ("
+        + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+        + ") is not set!");
+    }
+    debugHCatExportMapper = conf.getBoolean(
+      SqoopHCatUtilities.DEBUG_HCAT_EXPORT_MAPPER_PROP, false);
+    try {
+      Class cls = Class.forName(recordClassName, true,
+        Thread.currentThread().getContextClassLoader());
+      sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    }
+
+    if (null == sqoopRecord) {
+      throw new IOException("Could not instantiate object of type "
+        + recordClassName);
+    }
+
+    String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+    jobInfo =
+      (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
+    HCatSchema tableSchema = jobInfo.getTableInfo().getDataColumns();
+    HCatSchema partitionSchema =
+      jobInfo.getTableInfo().getPartitionColumns();
+    hCatFullTableSchema = new HCatSchema(tableSchema.getFields());
+    for (HCatFieldSchema hfs : partitionSchema.getFields()) {
+      hCatFullTableSchema.append(hfs);
+    }
+    hCatSchemaFields = hCatFullTableSchema.getFields();
+
+  }
+
+  @Override
+  public void map(WritableComparable key, HCatRecord value,
+    Context context)
+    throws IOException, InterruptedException {
+    context.write(convertToSqoopRecord(value), NullWritable.get());
+  }
+
+  private SqoopRecord convertToSqoopRecord(HCatRecord hcr)
+    throws IOException {
+    Text key = new Text();
+    for (Map.Entry<String, Object> e : sqoopRecord.getFieldMap().entrySet()) {
+      String colName = e.getKey();
+      String hfn = colName.toLowerCase();
+      key.set(hfn);
+      String javaColType = colTypesJava.get(key).toString();
+      int sqlType = ((IntWritable) colTypesSql.get(key)).get();
+      HCatFieldSchema field =
+        hCatFullTableSchema.get(hfn);
+      HCatFieldSchema.Type fieldType = field.getType();
+      Object hCatVal =
+        hcr.get(hfn, hCatFullTableSchema);
+      String hCatTypeString = field.getTypeString();
+      Object sqlVal = convertToSqoop(hCatVal, fieldType,
+        javaColType, hCatTypeString);
+      if (debugHCatExportMapper) {
+        LOG.debug("hCatVal " + hCatVal + " of type "
+          + (hCatVal == null ? null : hCatVal.getClass().getName())
+          + ",sqlVal " + sqlVal + " of type "
+          + (sqlVal == null ? null : sqlVal.getClass().getName())
+          + ",java type " + javaColType + ", sql type = "
+          + SqoopHCatUtilities.sqlTypeString(sqlType));
+      }
+      sqoopRecord.setField(colName, sqlVal);
+    }
+    return sqoopRecord;
+  }
+
+  private Object convertToSqoop(Object val,
+    HCatFieldSchema.Type fieldType, String javaColType,
+    String hCatTypeString) throws IOException {
+
+    if (val == null) {
+      return null;
+    }
+
+    switch (fieldType) {
+      case INT:
+      case TINYINT:
+      case SMALLINT:
+      case FLOAT:
+      case DOUBLE:
+        val = convertNumberTypes(val, javaColType);
+        if (val != null) {
+          return val;
+        }
+        break;
+      case BOOLEAN:
+        val = convertBooleanTypes(val, javaColType);
+        if (val != null) {
+          return val;
+        }
+        break;
+      case BIGINT:
+        if (javaColType.equals(DATE_TYPE)) {
+          return new Date((Long) val);
+        } else if (javaColType.equals(TIME_TYPE)) {
+          return new Time((Long) val);
+        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+          return new Timestamp((Long) val);
+        } else {
+          val = convertNumberTypes(val, javaColType);
+          if (val != null) {
+            return val;
+          }
+        }
+        break;
+      case STRING:
+        val = convertStringTypes(val, javaColType);
+        if (val != null) {
+          return val;
+        }
+        break;
+      case BINARY:
+        val = convertBinaryTypes(val, javaColType);
+        if (val != null) {
+          return val;
+        }
+        break;
+      case ARRAY:
+      case MAP:
+      case STRUCT:
+      default:
+        throw new IOException("Cannot convert HCatalog type "
+          + fieldType);
+    }
+    LOG.error("Cannot convert HCatalog object of "
+      + " type " + hCatTypeString + " to java object type "
+      + javaColType);
+    return null;
+  }
+
+  private Object convertBinaryTypes(Object val, String javaColType) {
+    byte[] bb = (byte[]) val;
+    if (javaColType.equals(BYTESWRITABLE)) {
+      BytesWritable bw = new BytesWritable();
+      bw.set(bb, 0, bb.length);
+      return bw;
+    }
+    return null;
+  }
+
+  private Object convertStringTypes(Object val, String javaColType) {
+    String valStr = val.toString();
+    if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+      return new BigDecimal(valStr);
+    } else if (javaColType.equals(DATE_TYPE)
+      || javaColType.equals(TIME_TYPE)
+      || javaColType.equals(TIMESTAMP_TYPE)) {
+      // Oracle expects timestamps for Date also by default based on version
+      // Just allow all date types to be assignment compatible
+      if (valStr.length() == 10) { // Date in yyyy-mm-dd format
+        Date d = Date.valueOf(valStr);
+        if (javaColType.equals(DATE_TYPE)) {
+          return d;
+        } else if (javaColType.equals(TIME_TYPE)) {
+          return new Time(d.getTime());
+        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+          return new Timestamp(d.getTime());
+        }
+      } else if (valStr.length() == 8) { // time in hh:mm:ss
+        Time t = Time.valueOf(valStr);
+        if (javaColType.equals(DATE_TYPE)) {
+          return new Date(t.getTime());
+        } else if (javaColType.equals(TIME_TYPE)) {
+          return t;
+        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+          return new Timestamp(t.getTime());
+        }
+      } else if (valStr.length() == 19) { // timestamp in yyyy-mm-dd hh:ss:mm
+        Timestamp ts = Timestamp.valueOf(valStr);
+        if (javaColType.equals(DATE_TYPE)) {
+          return new Date(ts.getTime());
+        } else if (javaColType.equals(TIME_TYPE)) {
+          return new Time(ts.getTime());
+        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+          return ts;
+        }
+      } else {
+        return null;
+      }
+    } else if (javaColType.equals(STRING_TYPE)) {
+      return valStr;
+    } else if (javaColType.equals(BOOLEAN_TYPE)) {
+      return Boolean.valueOf(valStr);
+    } else if (javaColType.equals(BYTE_TYPE)) {
+      return Byte.parseByte(valStr);
+    } else if (javaColType.equals(SHORT_TYPE)) {
+      return Short.parseShort(valStr);
+    } else if (javaColType.equals(INTEGER_TYPE)) {
+      return Integer.parseInt(valStr);
+    } else if (javaColType.equals(LONG_TYPE)) {
+      return Long.parseLong(valStr);
+    } else if (javaColType.equals(FLOAT_TYPE)) {
+      return Float.parseFloat(valStr);
+    } else if (javaColType.equals(DOUBLE_TYPE)) {
+      return Double.parseDouble(valStr);
+    }
+    return null;
+  }
+
+  private Object convertBooleanTypes(Object val, String javaColType) {
+    Boolean b = (Boolean) val;
+    if (javaColType.equals(BOOLEAN_TYPE)) {
+      return b;
+    } else if (javaColType.equals(BYTE_TYPE)) {
+      return (byte) (b ? 1 : 0);
+    } else if (javaColType.equals(SHORT_TYPE)) {
+      return (short) (b ? 1 : 0);
+    } else if (javaColType.equals(INTEGER_TYPE)) {
+      return (int) (b ? 1 : 0);
+    } else if (javaColType.equals(LONG_TYPE)) {
+      return (long) (b ? 1 : 0);
+    } else if (javaColType.equals(FLOAT_TYPE)) {
+      return (float) (b ? 1 : 0);
+    } else if (javaColType.equals(DOUBLE_TYPE)) {
+      return (double) (b ? 1 : 0);
+    } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+      return new BigDecimal(b ? 1 : 0);
+    } else if (javaColType.equals(STRING_TYPE)) {
+      return val.toString();
+    }
+    return null;
+  }
+
+  private Object convertNumberTypes(Object val, String javaColType) {
+    Number n = (Number) val;
+    if (javaColType.equals(BYTE_TYPE)) {
+      return n.byteValue();
+    } else if (javaColType.equals(SHORT_TYPE)) {
+      return n.shortValue();
+    } else if (javaColType.equals(INTEGER_TYPE)) {
+      return n.intValue();
+    } else if (javaColType.equals(LONG_TYPE)) {
+      return n.longValue();
+    } else if (javaColType.equals(FLOAT_TYPE)) {
+      return n.floatValue();
+    } else if (javaColType.equals(DOUBLE_TYPE)) {
+      return n.doubleValue();
+    } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+      return new BigDecimal(n.doubleValue());
+    } else if (javaColType.equals(BOOLEAN_TYPE)) {
+      return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
+    } else if (javaColType.equals(STRING_TYPE)) {
+      return n.toString();
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
new file mode 100644
index 0000000..4f0ff1b
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.ImportJobBase;
+import org.apache.sqoop.mapreduce.SqoopMapper;
+
+import com.cloudera.sqoop.lib.BlobRef;
+import com.cloudera.sqoop.lib.ClobRef;
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.lib.FieldFormatter;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+
+/**
+ * A mapper for HCatalog import.
+ */
+public class SqoopHCatImportMapper extends
+  SqoopMapper<WritableComparable, SqoopRecord,
+  WritableComparable, HCatRecord> {
+  public static final Log LOG = LogFactory
+    .getLog(SqoopHCatImportMapper.class.getName());
+
+  private static boolean debugHCatImportMapper = false;
+
+  private InputJobInfo jobInfo;
+  private HCatSchema hCatFullTableSchema;
+  private int fieldCount;
+  private boolean bigDecimalFormatString;
+  private LargeObjectLoader lobLoader;
+  private HCatSchema partitionSchema = null;
+  private HCatSchema dataColsSchema = null;
+  private String stringDelimiterReplacements = null;
+  private ArrayWritable delimCharsArray;
+  private String hiveDelimsReplacement;
+  private boolean doHiveDelimsReplacement = false;
+  private DelimiterSet hiveDelimiters;
+  private String staticPartitionKey;
+  private int[] hCatFieldPositions;
+  private int colCount;
+
+  @Override
+  protected void setup(Context context)
+    throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+    jobInfo =
+      (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
+    dataColsSchema = jobInfo.getTableInfo().getDataColumns();
+    partitionSchema =
+      jobInfo.getTableInfo().getPartitionColumns();
+    StringBuilder storerInfoStr = new StringBuilder(1024);
+    StorerInfo storerInfo = jobInfo.getTableInfo().getStorerInfo();
+    storerInfoStr.append("HCatalog Storer Info : ")
+      .append("\n\tHandler = ").append(storerInfo.getStorageHandlerClass())
+      .append("\n\tInput format class = ").append(storerInfo.getIfClass())
+      .append("\n\tOutput format class = ").append(storerInfo.getOfClass())
+      .append("\n\tSerde class = ").append(storerInfo.getSerdeClass());
+    Properties storerProperties = storerInfo.getProperties();
+    if (!storerProperties.isEmpty()) {
+      storerInfoStr.append("\nStorer properties ");
+      for (Map.Entry<Object, Object> entry : storerProperties.entrySet()) {
+        String key = (String) entry.getKey();
+        Object val = entry.getValue();
+        storerInfoStr.append("\n\t").append(key).append('=').append(val);
+      }
+    }
+    storerInfoStr.append("\n");
+    LOG.info(storerInfoStr);
+
+    hCatFullTableSchema = new HCatSchema(dataColsSchema.getFields());
+    for (HCatFieldSchema hfs : partitionSchema.getFields()) {
+      hCatFullTableSchema.append(hfs);
+    }
+    fieldCount = hCatFullTableSchema.size();
+    lobLoader = new LargeObjectLoader(conf,
+      new Path(jobInfo.getTableInfo().getTableLocation()));
+    bigDecimalFormatString = conf.getBoolean(
+      ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
+      ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+    debugHCatImportMapper = conf.getBoolean(
+      SqoopHCatUtilities.DEBUG_HCAT_IMPORT_MAPPER_PROP, false);
+    IntWritable[] delimChars = DefaultStringifier.loadArray(conf,
+        SqoopHCatUtilities.HIVE_DELIMITERS_TO_REPLACE_PROP, IntWritable.class);
+    hiveDelimiters = new DelimiterSet(
+      (char) delimChars[0].get(), (char) delimChars[1].get(),
+      (char) delimChars[2].get(), (char) delimChars[3].get(),
+      delimChars[4].get() == 1 ? true : false);
+    hiveDelimsReplacement =
+      conf.get(SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_PROP);
+    if (hiveDelimsReplacement == null) {
+      hiveDelimsReplacement = "";
+    }
+    doHiveDelimsReplacement = Boolean.valueOf(conf.get(
+      SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP));
+
+    IntWritable[] fPos = DefaultStringifier.loadArray(conf,
+        SqoopHCatUtilities.HCAT_FIELD_POSITIONS_PROP, IntWritable.class);
+    hCatFieldPositions = new int[fPos.length];
+    for (int i = 0; i < fPos.length; ++i) {
+      hCatFieldPositions[i] = fPos[i].get();
+    }
+
+    LOG.debug("Hive delims replacement enabled : " + doHiveDelimsReplacement);
+    LOG.debug("Hive Delimiters : " + hiveDelimiters.toString());
+    LOG.debug("Hive delimiters replacement : " + hiveDelimsReplacement);
+    staticPartitionKey =
+      conf.get(SqoopHCatUtilities.HCAT_STATIC_PARTITION_KEY_PROP);
+    LOG.debug("Static partition key used : " + staticPartitionKey);
+
+
+  }
+
+  @Override
+  public void map(WritableComparable key, SqoopRecord value,
+    Context context)
+    throws IOException, InterruptedException {
+
+    try {
+      // Loading of LOBs was delayed until we have a Context.
+      value.loadLargeObjects(lobLoader);
+    } catch (SQLException sqlE) {
+      throw new IOException(sqlE);
+    }
+    if (colCount == -1) {
+      colCount = value.getFieldMap().size();
+    }
+    context.write(key, convertToHCatRecord(value));
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException {
+    if (null != lobLoader) {
+      lobLoader.close();
+    }
+  }
+
+  private HCatRecord convertToHCatRecord(SqoopRecord sqr)
+    throws IOException {
+    Map<String, Object> fieldMap = sqr.getFieldMap();
+    HCatRecord result = new DefaultHCatRecord(fieldCount);
+
+    for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
+      String key = entry.getKey();
+      Object val = entry.getValue();
+      String hfn = key.toLowerCase();
+      if (staticPartitionKey != null && staticPartitionKey.equals(hfn)) {
+        continue;
+      }
+      HCatFieldSchema hfs = hCatFullTableSchema.get(hfn);
+      if (debugHCatImportMapper) {
+        LOG.debug("SqoopRecordVal: field = " + key + " Val " + val
+          + " of type " + (val == null ? null : val.getClass().getName())
+          + ", hcattype " + hfs.getTypeString());
+      }
+      Object hCatVal = toHCat(val, hfs.getType(), hfs.getTypeString());
+
+      result.set(hfn, hCatFullTableSchema, hCatVal);
+    }
+
+    return result;
+  }
+
+
+  private Object toHCat(Object val, HCatFieldSchema.Type hfsType,
+    String hCatTypeString) {
+
+    if (val == null) {
+      return null;
+    }
+
+    Object retVal = null;
+
+    if (val instanceof Number) {
+      retVal = convertNumberTypes(val, hfsType);
+    } else if (val instanceof Boolean) {
+      retVal = convertBooleanTypes(val, hfsType);
+    } else if (val instanceof String) {
+      if (hfsType == HCatFieldSchema.Type.STRING) {
+        String str = (String) val;
+        if (doHiveDelimsReplacement) {
+          retVal = FieldFormatter
+            .hiveStringReplaceDelims(str, hiveDelimsReplacement,
+                hiveDelimiters);
+        } else {
+          retVal = str;
+        }
+      }
+    } else if (val instanceof java.util.Date) {
+      retVal = converDateTypes(val, hfsType);
+    } else if (val instanceof BytesWritable) {
+      if (hfsType == HCatFieldSchema.Type.BINARY) {
+        BytesWritable bw = (BytesWritable) val;
+        retVal = bw.getBytes();
+      }
+    } else if (val instanceof BlobRef) {
+      if (hfsType == HCatFieldSchema.Type.BINARY) {
+        BlobRef br = (BlobRef) val;
+        byte[] bytes = br.isExternal() ? br.toString().getBytes()
+          : br.getData();
+        retVal = bytes;
+      }
+    } else if (val instanceof ClobRef) {
+      if (hfsType == HCatFieldSchema.Type.STRING) {
+        ClobRef cr = (ClobRef) val;
+        String s = cr.isExternal() ? cr.toString() : cr.getData();
+        retVal = s;
+      }
+    } else {
+      throw new UnsupportedOperationException("Objects of type "
+        + val.getClass().getName() + " are not suported");
+    }
+    if (retVal == null) {
+      LOG.error("Objects of type "
+        + val.getClass().getName() + " can not be mapped to HCatalog type "
+        + hCatTypeString);
+    }
+    return retVal;
+  }
+
+  private Object converDateTypes(Object val,
+    HCatFieldSchema.Type hfsType) {
+    if (val instanceof java.sql.Date) {
+      if (hfsType == HCatFieldSchema.Type.BIGINT) {
+        return ((Date) val).getTime();
+      } else if (hfsType == HCatFieldSchema.Type.STRING) {
+        return val.toString();
+      }
+    } else if (val instanceof java.sql.Time) {
+      if (hfsType == HCatFieldSchema.Type.BIGINT) {
+        return ((Time) val).getTime();
+      } else if (hfsType == HCatFieldSchema.Type.STRING) {
+        return val.toString();
+      }
+    } else if (val instanceof java.sql.Timestamp) {
+      if (hfsType == HCatFieldSchema.Type.BIGINT) {
+        return ((Timestamp) val).getTime();
+      } else if (hfsType == HCatFieldSchema.Type.STRING) {
+        return val.toString();
+      }
+    }
+    return null;
+  }
+
+  private Object convertBooleanTypes(Object val,
+    HCatFieldSchema.Type hfsType) {
+    Boolean b = (Boolean) val;
+    if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
+      return b;
+    } else if (hfsType == HCatFieldSchema.Type.TINYINT) {
+      return (byte) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
+      return (short) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.INT) {
+      return (int) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
+      return (long) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
+      return (float) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
+      return (double) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.STRING) {
+      return val.toString();
+    }
+    return null;
+  }
+
+  private Object convertNumberTypes(Object val,
+    HCatFieldSchema.Type hfsType) {
+    if (!(val instanceof Number)) {
+      return null;
+    }
+    if (val instanceof BigDecimal && hfsType == HCatFieldSchema.Type.STRING) {
+      BigDecimal bd = (BigDecimal) val;
+      if (bigDecimalFormatString) {
+        return bd.toPlainString();
+      } else {
+        return bd.toString();
+      }
+    }
+    Number n = (Number) val;
+    if (hfsType == HCatFieldSchema.Type.TINYINT) {
+      return n.byteValue();
+    } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
+      return n.shortValue();
+    } else if (hfsType == HCatFieldSchema.Type.INT) {
+      return n.intValue();
+    } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
+      return n.longValue();
+    } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
+      return n.floatValue();
+    } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
+      return n.doubleValue();
+    } else if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
+      return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
+    } else if (hfsType == HCatFieldSchema.Type.STRING) {
+      return n.toString();
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java
new file mode 100644
index 0000000..5a2e48a
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+
+/**
+ * An abstraction of a combined HCatSplits.
+ *
+ */
+public class SqoopHCatInputSplit extends InputSplit implements Writable {
+  private List<HCatSplit> hCatSplits;
+  private String[] hCatLocations;
+  private long inputLength;
+
+  public SqoopHCatInputSplit() {
+  }
+
+  public SqoopHCatInputSplit(List<InputSplit> splits) {
+    hCatSplits = new ArrayList<HCatSplit>();
+    Set<String> locations = new HashSet<String>();
+    for (int i = 0; i < splits.size(); ++i) {
+      HCatSplit hsSplit = (HCatSplit) splits.get(i);
+      hCatSplits.add(hsSplit);
+      this.inputLength += hsSplit.getLength();
+      locations.addAll(Arrays.asList(hsSplit.getLocations()));
+    }
+    this.hCatLocations = locations.toArray(new String[0]);
+  }
+
+  public int length() {
+    return this.hCatSplits.size();
+  }
+
+  public HCatSplit get(int index) {
+    return this.hCatSplits.get(index);
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    if (this.inputLength == 0L) {
+      for (HCatSplit split : this.hCatSplits) {
+        this.inputLength += split.getLength();
+      }
+    }
+    return this.inputLength;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    if (this.hCatLocations == null) {
+      Set<String> locations = new HashSet<String>();
+      for (HCatSplit split : this.hCatSplits) {
+        locations.addAll(Arrays.asList(split.getLocations()));
+      }
+      this.hCatLocations = locations.toArray(new String[0]);
+    }
+    return this.hCatLocations;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(this.inputLength);
+    out.writeInt(this.hCatSplits.size());
+    for (HCatSplit split : this.hCatSplits) {
+      split.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.inputLength = in.readLong();
+    int size = in.readInt();
+    this.hCatSplits = new ArrayList<HCatSplit>(size);
+    for (int i = 0; i < size; ++i) {
+      HCatSplit hs = new HCatSplit();
+      hs.readFields(in);
+      hCatSplits.add(hs);
+    }
+  }
+}
+