You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/06/07 16:34:36 UTC
[3/3] git commit: SQOOP-931: Integrate HCatalog with Sqoop
SQOOP-931: Integrate HCatalog with Sqoop
(Venkat Ranganathan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/5e88d43b
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/5e88d43b
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/5e88d43b
Branch: refs/heads/trunk
Commit: 5e88d43b5af024c1b9fd82029f7de4c325dcf009
Parents: b07906a
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Jun 7 07:33:21 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Jun 7 07:33:21 2013 -0700
----------------------------------------------------------------------
bin/configure-sqoop | 30 +
bin/configure-sqoop.cmd | 9 +
build.xml | 21 +-
ivy.xml | 18 +-
ivy/ivysettings.xml | 15 +-
src/docs/user/SqoopUserGuide.txt | 2 +
src/docs/user/hcatalog.txt | 313 ++++
src/java/org/apache/sqoop/SqoopOptions.java | 107 ++-
.../sqoop/config/ConfigurationConstants.java | 17 +
src/java/org/apache/sqoop/hive/HiveImport.java | 17 +
src/java/org/apache/sqoop/manager/ConnManager.java | 64 +
.../sqoop/mapreduce/DataDrivenImportJob.java | 16 +
.../org/apache/sqoop/mapreduce/ExportJobBase.java | 20 +-
.../org/apache/sqoop/mapreduce/ImportJobBase.java | 13 +
.../org/apache/sqoop/mapreduce/JdbcExportJob.java | 13 +-
src/java/org/apache/sqoop/mapreduce/JobBase.java | 4 +-
.../mapreduce/hcat/SqoopHCatExportFormat.java | 138 ++
.../mapreduce/hcat/SqoopHCatExportMapper.java | 349 +++++
.../mapreduce/hcat/SqoopHCatImportMapper.java | 343 ++++
.../sqoop/mapreduce/hcat/SqoopHCatInputSplit.java | 109 ++
.../mapreduce/hcat/SqoopHCatRecordReader.java | 153 ++
.../sqoop/mapreduce/hcat/SqoopHCatUtilities.java | 1215 +++++++++++++++
src/java/org/apache/sqoop/tool/BaseSqoopTool.java | 231 +++-
src/java/org/apache/sqoop/tool/CodeGenTool.java | 3 +
src/java/org/apache/sqoop/tool/ExportTool.java | 9 +-
src/java/org/apache/sqoop/tool/ImportTool.java | 14 +-
src/perftest/ExportStressTest.java | 2 +-
src/test/com/cloudera/sqoop/ThirdPartyTests.java | 7 +
.../com/cloudera/sqoop/hive/TestHiveImport.java | 10 +
.../cloudera/sqoop/testutil/BaseSqoopTestCase.java | 2 +-
.../cloudera/sqoop/testutil/ExportJobTestCase.java | 14 +-
.../org/apache/sqoop/hcat/HCatalogExportTest.java | 377 +++++
.../org/apache/sqoop/hcat/HCatalogImportTest.java | 712 +++++++++
.../org/apache/sqoop/hcat/HCatalogTestUtils.java | 855 ++++++++++
.../org/apache/sqoop/hcat/TestHCatalogBasic.java | 251 +++
testdata/hcatalog/conf/hive-log4j.properties | 87 +
testdata/hcatalog/conf/hive-site.xml | 26 +
testdata/hcatalog/conf/log4j.properties | 55 +
38 files changed, 5596 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/bin/configure-sqoop
----------------------------------------------------------------------
diff --git a/bin/configure-sqoop b/bin/configure-sqoop
index 61ff3f2..178720d 100755
--- a/bin/configure-sqoop
+++ b/bin/configure-sqoop
@@ -54,9 +54,22 @@ if [ -z "${HADOOP_MAPRED_HOME}" ]; then
HADOOP_MAPRED_HOME=/usr/lib/hadoop-mapreduce
fi
fi
+
+# We are setting HADOOP_HOME to HADOOP_COMMON_HOME if it is not set
+# so that hcat script works correctly on BigTop
+if [ -z "${HADOOP_HOME}" ]; then
+ if [ -n "${HADOOP_COMMON_HOME}" ]; then
+ HADOOP_HOME=${HADOOP_COMMON_HOME}
+ export HADOOP_HOME
+ fi
+fi
+
if [ -z "${HBASE_HOME}" ]; then
HBASE_HOME=/usr/lib/hbase
fi
+if [ -z "${HCAT_HOME}" ]; then
+ HCAT_HOME=/usr/lib/hcatalog
+fi
# Check: If we can't find our dependencies, give up here.
if [ ! -d "${HADOOP_COMMON_HOME}" ]; then
@@ -76,6 +89,12 @@ if [ ! -d "${HBASE_HOME}" ]; then
echo 'Please set $HBASE_HOME to the root of your HBase installation.'
fi
+## Moved to be a runtime check in sqoop.
+if [ ! -d "${HCAT_HOME}" ]; then
+ echo "Warning: $HCAT_HOME does not exist! HCatalog jobs will fail."
+ echo 'Please set $HCAT_HOME to the root of your HCatalog installation.'
+fi
+
# Where to find the main Sqoop jar
SQOOP_JAR_DIR=$SQOOP_HOME
@@ -106,6 +125,15 @@ if [ -e "$HBASE_HOME/bin/hbase" ]; then
SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
fi
+# Add HCatalog to dependency list
+if [ -e "${HCAT_HOME}/bin/hcat" ]; then
+ TMP_SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:`${HCAT_HOME}/bin/hcat -classpath`
+ if [ -z "${HIVE_CONF_DIR}" ]; then
+ TMP_SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}:${HIVE_CONF_DIR}
+ fi
+ SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
+fi
+
ZOOCFGDIR=${ZOOCFGDIR:-/etc/zookeeper}
if [ -d "${ZOOCFGDIR}" ]; then
SQOOP_CLASSPATH=$ZOOCFGDIR:$SQOOP_CLASSPATH
@@ -136,4 +164,6 @@ export HADOOP_CLASSPATH
export HADOOP_COMMON_HOME
export HADOOP_MAPRED_HOME
export HBASE_HOME
+export HCAT_HOME
+export HIVE_CONF_DIR
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/bin/configure-sqoop.cmd
----------------------------------------------------------------------
diff --git a/bin/configure-sqoop.cmd b/bin/configure-sqoop.cmd
index f5fd608..ec57e37 100644
--- a/bin/configure-sqoop.cmd
+++ b/bin/configure-sqoop.cmd
@@ -55,6 +55,15 @@ if not defined HADOOP_MAPRED_HOME (
exit /b 1
)
)
+
+:: We are setting HADOOP_HOME to HADOOP_COMMON_HOME if it is not set
+:: so that hcat script works correctly on BigTop
+if not defined HADOOP_HOME (
+ if defined HADOOP_COMMON_HOME (
+ set HADOOP_HOME=%HADOOP_COMMON_HOME%
+ )
+)
+
:: Check for HBase dependency
if not defined HBASE_HOME (
if defined HBASE_VERSION (
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 636c103..b4b08e5 100644
--- a/build.xml
+++ b/build.xml
@@ -51,6 +51,7 @@
<property name="hbase.version" value="0.90.3-cdh3u1" />
<property name="zookeeper.version" value="3.3.3-cdh3u1" />
<property name="hadoop.version.full" value="0.20" />
+ <property name="hcatalog.version" value="0.11.0" />
</then>
<elseif>
@@ -60,6 +61,7 @@
<property name="hbase.version" value="0.92.0" />
<property name="zookeeper.version" value="3.4.2" />
<property name="hadoop.version.full" value="0.23" />
+ <property name="hcatalog.version" value="0.11.0" />
</then>
</elseif>
@@ -70,6 +72,7 @@
<property name="hbase.version" value="0.92.0" />
<property name="zookeeper.version" value="3.4.2" />
<property name="hadoop.version.full" value="1.0.0" />
+ <property name="hcatalog.version" value="0.11.0" />
</then>
</elseif>
@@ -80,6 +83,7 @@
<property name="hbase.version" value="0.94.2" />
<property name="zookeeper.version" value="3.4.2" />
<property name="hadoop.version.full" value="2.0.4-alpha" />
+ <property name="hcatalog.version" value="0.11.0" />
</then>
</elseif>
@@ -600,6 +604,7 @@
<tarfileset dir="${build.dir}" mode="755">
<include name="${bin.artifact.name}/bin/*" />
<include name="${bin.artifact.name}/testdata/hive/bin/*" />
+ <include name="${bin.artifact.name}/testdata/hcatalog/conf/*" />
<include name="${bin.artifact.name}/**/*.sh" />
</tarfileset>
</tar>
@@ -643,12 +648,14 @@
<tarfileset dir="${build.dir}" mode="664">
<exclude name="${src.artifact.name}/bin/*" />
<exclude name="${src.artifact.name}/testdata/hive/bin/*" />
+ <exclude name="${src.artifact.name}/testdata/hcatalog/conf/*" />
<exclude name="${src.artifact.name}/**/*.sh" />
<include name="${src.artifact.name}/**" />
</tarfileset>
<tarfileset dir="${build.dir}" mode="755">
<include name="${src.artifact.name}/bin/*" />
<include name="${src.artifact.name}/testdata/hive/bin/*" />
+ <include name="${src.artifact.name}/testdata/hcatalog/conf/*" />
<include name="${src.artifact.name}/**/*.sh" />
</tarfileset>
</tar>
@@ -658,6 +665,9 @@
<target name="test-prep" depends="test-prep-normal,test-prep-thirdparty,
test-prep-manual"/>
+ <path id="hcatalog.conf.dir">
+ <pathelement location="${basedir}/testdata/hcatalog/conf"/>
+ </path>
<target name="test-eval-condition">
<condition property="thirdparty_or_manual">
<or>
@@ -667,6 +677,8 @@
</condition>
</target>
+
+
<target name="test-prep-normal" unless="thirdparty_or_manual"
depends="test-eval-condition">
<!-- Set this to run all the "standard" tests -->
@@ -712,7 +724,7 @@
<delete dir="${test.log.dir}"/>
<mkdir dir="${test.log.dir}"/>
<delete dir="${build.test}/data"/>
- <mkdir dir="${build.test}/data" />
+ <mkdir dir="${build.test}/data/sqoop" />
<mkdir dir="${cobertura.class.dir}" />
<junit
printsummary="yes" showoutput="${test.output}"
@@ -803,10 +815,17 @@
<sysproperty key="java.security.krb5.kdc"
value="${java.security.krb5.kdc}"/>
+ <!-- Location of Hive logs -->
+ <!--<sysproperty key="hive.log.dir"
+ value="${test.build.data}/sqoop/logs"/> -->
+
<classpath>
<!-- instrumented classes go ahead of normal classes -->
<pathelement location="${cobertura.class.dir}" />
+ <!-- Location of hive-site xml and other hadoop config files -->
+ <path refid="hcatalog.conf.dir" />
+
<!-- main classpath here. -->
<path refid="test.classpath" />
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 1fa4dd1..750adfc 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -37,10 +37,15 @@ under the License.
extends="runtime"
description="artifacts needed to compile/test the application"/>
<conf name="hbase" visibility="private" />
- <conf name="hadoop23" visibility="private" extends="common,runtime,hbase" />
- <conf name="hadoop20" visibility="private" extends="common,runtime,hbase" />
- <conf name="hadoop100" visibility="private" extends="common,runtime,hbase" />
- <conf name="hadoop200" visibility="private" extends="common,runtime,hbase" />
+ <conf name="hcatalog" visibility="private" />
+ <conf name="hadoop23" visibility="private"
+ extends="common,runtime,hbase,hcatalog" />
+ <conf name="hadoop20" visibility="private"
+ extends="common,runtime,hbase,hcatalog" />
+ <conf name="hadoop100" visibility="private"
+ extends="common,runtime,hbase,hcatalog" />
+ <conf name="hadoop200" visibility="private"
+ extends="common,runtime,hbase,hcatalog" />
<conf name="test" visibility="private" extends="common,runtime"/>
<conf name="hadoop23test" visibility="private" extends="test,hadoop23" />
@@ -172,6 +177,11 @@ under the License.
<exclude org="com.cloudera.cdh" module="zookeeper-ant" />
</dependency>
+ <dependency org="org.apache.hcatalog" name="hcatalog-core"
+ rev="${hcatalog.version}" conf="hcatalog->default">
+ <artifact name="hcatalog-core" type="jar"/>
+ </dependency>
+
<exclude org="org.apache.hadoop" module="avro"/>
<exclude org="commons-daemon" module="commons-daemon" />
<exclude type="pom" />
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/ivy/ivysettings.xml
----------------------------------------------------------------------
diff --git a/ivy/ivysettings.xml b/ivy/ivysettings.xml
index c4cc561..2920c89 100644
--- a/ivy/ivysettings.xml
+++ b/ivy/ivysettings.xml
@@ -42,6 +42,9 @@ under the License.
<property name="releases.cloudera.com"
value="https://repository.cloudera.com/content/repositories/releases/"
override="false"/>
+ <property name="www.datanucleus.org"
+ value="http://www.datanucleus.org/downloads/maven2/"
+ override="false"/>
<property name="maven2.pattern"
value="[organisation]/[module]/[revision]/[artifact]-[revision](-[classifier])"/>
<property name="repo.dir" value="${user.home}/.m2/repository"/>
@@ -52,6 +55,8 @@ under the License.
<resolvers>
<ibiblio name="maven2" root="${repo.maven.org}"
pattern="${maven2.pattern.ext}" m2compatible="true"/>
+ <ibiblio name="datanucleus" root="${www.datanucleus.org}"
+ pattern="${maven2.pattern.ext}" m2compatible="true"/>
<ibiblio name="cloudera-releases" root="${releases.cloudera.com}"
pattern="${maven2.pattern.ext}" m2compatible="true"/>
<ibiblio name="apache-snapshot" root="${snapshot.apache.org}"
@@ -67,16 +72,18 @@ under the License.
<chain name="default" dual="true" checkmodified="true"
changingPattern=".*SNAPSHOT">
<resolver ref="fs"/>
- <resolver ref="apache-snapshot"/>
+ <resolver ref="apache-snapshot"/>
+ <resolver ref="datanucleus"/>
<resolver ref="cloudera-releases"/>
- <resolver ref="cloudera-staging"/>
+ <resolver ref="cloudera-staging"/>
<resolver ref="maven2"/>
</chain>
<chain name="internal" dual="true">
<resolver ref="fs"/>
- <resolver ref="apache-snapshot"/>
- <resolver ref="cloudera-staging"/>
+ <resolver ref="apache-snapshot"/>
+ <resolver ref="datanucleus"/>
+ <resolver ref="cloudera-staging"/>
<resolver ref="maven2"/>
</chain>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/docs/user/SqoopUserGuide.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/SqoopUserGuide.txt b/src/docs/user/SqoopUserGuide.txt
index 01ac1cf..2e88887 100644
--- a/src/docs/user/SqoopUserGuide.txt
+++ b/src/docs/user/SqoopUserGuide.txt
@@ -72,6 +72,8 @@ include::help.txt[]
include::version.txt[]
+include::hcatalog.txt[]
+
include::compatibility.txt[]
include::connectors.txt[]
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/docs/user/hcatalog.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hcatalog.txt b/src/docs/user/hcatalog.txt
new file mode 100644
index 0000000..b8e495e
--- /dev/null
+++ b/src/docs/user/hcatalog.txt
@@ -0,0 +1,313 @@
+
+////
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+////
+
+Sqoop-HCatalog Integration
+--------------------------
+
+HCatalog Background
+~~~~~~~~~~~~~~~~~~~
+
+HCatalog is a table and storage management service for Hadoop that enables
+users with different data processing tools – Pig, MapReduce, and Hive –
+to more easily read and write data on the grid. HCatalog’s table abstraction
+presents users with a relational view of data in the Hadoop distributed
+file system (HDFS) and ensures that users need not worry about where or
+in what format their data is stored: RCFile format, text files, or
+SequenceFiles.
+
+HCatalog supports reading and writing files in any format for which a Hive
+SerDe (serializer-deserializer) has been written. By default, HCatalog
+supports RCFile, CSV, JSON, and SequenceFile formats. To use a custom
+format, you must provide the InputFormat and OutputFormat as well as the SerDe.
+
+The ability of HCatalog to abstract various storage formats is used in
+providing the RCFile (and future file types) support to Sqoop.
+
+Exposing HCatalog Tables to Sqoop
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+HCatalog integration with Sqoop is patterned on an existing feature set that
+supports Avro and Hive tables. Five new command line options are introduced,
+and some command line options defined for Hive are reused.
+
+New Command Line Options
+^^^^^^^^^^^^^^^^^^^^^^^^
+
++--hcatalog-database+::
+Specifies the database name for the HCatalog table. If not specified,
+the default database name +default+ is used. Providing the
++--hcatalog-database+ option without +--hcatalog-table+ is an error.
+This is not a required option.
+
++--hcatalog-table+::
+The argument value for this option is the HCatalog tablename.
+The presence of the +--hcatalog-table+ option signifies that the import
+or export job is done using HCatalog tables, and it is a required option for
+HCatalog jobs.
+
++--hcatalog-home+::
+The home directory for the HCatalog installation. The directory is
+expected to have a +lib+ subdirectory and a +share/hcatalog+ subdirectory
+with necessary HCatalog libraries. If not specified, the system property
++hcatalog.home+ will be checked and failing that, a system environment
+variable +HCAT_HOME+ will be checked. If none of these are set, the
+default value will be used and currently the default is set to
++/usr/lib/hcatalog+.
+This is not a required option.
+
++--create-hcatalog-table+::
+
+This option specifies whether an HCatalog table should be created
+automatically when importing data. By default, HCatalog tables are assumed
+to exist. The table name will be the same as the database table name
+translated to lower case. Further described in +Automatic Table Creation+
+below.
+
++--hcatalog-storage-stanza+::
+
+This option specifies the storage stanza to be appended to the table.
+Further described in +Automatic Table Creation+ below.
+
+Supported Sqoop Hive Options
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The following Sqoop options are also used along with the +--hcatalog-table+
+option to provide additional input to the HCatalog jobs. Some of the existing
+Hive import job options are reused with HCatalog jobs instead of creating
+HCatalog-specific options for the same purpose.
+
++--map-column-hive+::
+This option maps a database column to HCatalog with a specific HCatalog
+type.
+
++--hive-home+::
+The Hive home location.
+
++--hive-partition-key+::
+Used for static partitioning filter. The partitioning key should be of
+type STRING. There can be only one static partitioning key.
+
++--hive-partition-value+::
+The value associated with the partition.
+
+Unsupported Sqoop Options
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Unsupported Sqoop Hive Import Options
++++++++++++++++++++++++++++++++++++++
+
+The following Sqoop Hive import options are not supported with HCatalog jobs.
+
+* +--hive-import+
+* +--hive-overwrite+
+
+Unsupported Sqoop Export and Import Options
++++++++++++++++++++++++++++++++++++++++++++
+
+The following Sqoop export and import options are not supported with HCatalog jobs.
+
+* +--direct+
+* +--export-dir+
+* +--target-dir+
+* +--warehouse-dir+
+* +--append+
+* +--as-sequencefile+
+* +--as-avrofile+
+
+Ignored Sqoop Options
+^^^^^^^^^^^^^^^^^^^^^
+
+The following options are ignored with HCatalog jobs.
+
+* All input delimiter options are ignored.
+
+* Output delimiters are generally ignored unless either
++--hive-drop-import-delims+ or +--hive-delims-replacement+ is used. When the
++--hive-drop-import-delims+ or +--hive-delims-replacement+ option is
+specified, all +CHAR+ type database table columns will be post-processed
+to either remove or replace the delimiters, respectively. See +Delimited Text
+Formats and Field and Line Delimiter Characters+ below. This is only needed
+if the HCatalog table uses text formats.
+
+Automatic Table Creation
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+One of the key features of Sqoop is to manage and create the table metadata
+when importing into Hadoop. HCatalog import jobs also provide for this
+feature with the option +--create-hcatalog-table+. Furthermore, one of the
+important benefits of the HCatalog integration is to provide storage
+agnosticism to Sqoop data movement jobs. To provide for that feature,
+HCatalog import jobs provide an option that lets a user specifiy the
+storage format for the created table.
+
+The option +--create-hcatalog-table+ is used as an indicator that a table
+has to be created as part of the HCatalog import job. If the option
++--create-hcatalog-table+ is specified and the table exists, then the
+table creation will fail and the job will be aborted.
+
+The option +--hcatalog-storage-stanza+ can be used to specify the storage
+format of the newly created table. The default value for this option is
++stored as rcfile+. The value specified for this option is assumed to be a
+valid Hive storage format expression. It will be appended to the +create table+
+command generated by the HCatalog import job as part of automatic table
+creation. Any error in the storage stanza will cause the table creation to
+fail and the import job will be aborted.
+
+Any additional resources needed to support the storage format referenced in
+the option +--hcatalog-storage-stanza+ should be provided to the job either
+by placing them in +$HIVE_HOME/lib+ or by providing them in +HADOOP_CLASSPATH+
+and +LIBJAR+ files.
+
+If the option +--hive-partition-key+ is specified, then the value of this
+option is used as the partitioning key for the newly created table. Only
+one partitioning key can be specified with this option.
+
+Object names are mapped to the lowercase equivalents as specified below
+when mapped to an HCatalog table. This includes the table name (which
+is the same as the external store table name converted to lower case)
+and field names.
+
+Delimited Text Formats and Field and Line Delimiter Characters
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+HCatalog supports delimited text format as one of the table storage formats.
+But when delimited text is used and the imported data has fields that contain
+those delimiters, then the data may be parsed into a different number of
+fields and records by Hive, thereby losing data fidelity.
+
+For this case, one of these existing Sqoop import options can be used:
+
+* +--hive-delims-replacement+
+
+* +--hive-drop-import-delims+
+
+If either of these options is provided for import, then any column of type
+STRING will be formatted with the Hive delimiter processing and then written
+to the HCatalog table.
+
+HCatalog Table Requirements
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The HCatalog table should be created before using it as part of a Sqoop job
+if the default table creation options (with optional storage stanza) are not
+sufficient. All storage formats supported by HCatalog can be used with the
+creation of the HCatalog tables. This makes this feature readily adopt new
+storage formats that come into the Hive project, such as ORC files.
+
+Support for Partitioning
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+The Sqoop HCatalog feature supports the following table types:
+
+* Unpartitioned tables
+
+* Partitioned tables with a static partitioning key specified
+
+* Partitioned tables with dynamic partition keys from the database
+result set
+
+* Partitioned tables with a combination of a static key and additional
+dynamic partitioning keys
+
+Schema Mapping
+~~~~~~~~~~~~~~
+
+Sqoop currently does not support column name mapping. However, the user
+is allowed to override the type mapping. Type mapping loosely follows
+the Hive type mapping already present in Sqoop except that SQL types
+“FLOAT” and “REAL” are mapped to HCatalog type “float”. In the Sqoop type
+mapping for Hive, these two are mapped to “double”. Type mapping is primarily
+used for checking the column definition correctness only and can be overridden
+with the --map-column-hive option.
+
+All types except binary are assignable to a String type.
+
+Any field of number type (int, shortint, tinyint, bigint and bigdecimal,
+float and double) is assignable to another field of any number type during
+exports and imports. Depending on the precision and scale of the target type
+of assignment, truncations can occur.
+
+Furthermore, date/time/timestamps are mapped to string (the full
+date/time/timestamp representation) or bigint (the number of milliseconds
+since epoch) during imports and exports.
+
+BLOBs and CLOBs are only supported for imports. The BLOB/CLOB objects when
+imported are stored in a Sqoop-specific format and knowledge of this format
+is needed for processing these objects in a Pig/Hive job or another Map Reduce
+job.
+
+Database column names are mapped to their lowercase equivalents when mapped
+to the HCatalog fields. Currently, case-sensitive database object names are
+not supported.
+
+Projection of a set of columns from a table to an HCatalog table or loading
+to a column projection is allowed, subject to table constraints. The dynamic
+partitioning columns, if any, must be part of the projection when importing
+data into HCatalog tables.
+
+Dynamic partitioning fields should be mapped to database columns that are
+defined with the NOT NULL attribute (although this is not validated). A null
+value during import for a dynamic partitioning column will abort the Sqoop
+job.
+
+Support for HCatalog Data Types
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+All the primitive HCatalog types are supported. Currently all the complex
+HCatalog types are unsupported.
+
+BLOB/CLOB database types are only supported for imports.
+
+Providing Hive and HCatalog Libraries for the Sqoop Job
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+With the support for HCatalog added to Sqoop, any HCatalog job depends on a
+set of jar files being available both on the Sqoop client host and where the
+Map/Reduce tasks run. To run HCatalog jobs, the environment variable
++HADOOP_CLASSPATH+ must be set up as shown below before launching the Sqoop
+HCatalog jobs.
+
++HADOOP_CLASSPATH=$(hcat -classpath)+
++export HADOOP_CLASSPATH+
+
+
+The necessary HCatalog dependencies will be copied to the distributed cache
+automatically by the Sqoop job.
+
+Examples
+~~~~~~~~
+
+Create an HCatalog table, such as:
+
++hcat -e "create table txn(txn_date string, cust_id string, amount float,
+store_id int) partitioned by (cust_id string) stored as rcfile;"+
+
+
+Then Sqoop import and export of the "txn" HCatalog table can be invoked as
+follows:
+
+Import
+~~~~~~
+
++$SQOOP_HOME/bin/sqoop import --connect <jdbc-url> -table <table-name> --hcatalog-table txn <other sqoop options>+
+
+Export
+~~~~~~
+
++$SQOOP_HOME/bin/sqoop export --connect <jdbc-url> -table <table-name> --hcatalog-table txn <other sqoop options>+
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/SqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index f18d43e..4be6a6a 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -59,6 +59,10 @@ public class SqoopOptions implements Cloneable {
public static final String METASTORE_PASSWORD_KEY =
"sqoop.metastore.client.record.password";
+ // Default hive and hcat locations.
+ public static final String DEF_HIVE_HOME = "/usr/lib/hive";
+ public static final String DEF_HCAT_HOME = "/usr/lib/hcatalog";
+
public static final boolean METASTORE_PASSWORD_DEFAULT = false;
/**
@@ -151,6 +155,15 @@ public class SqoopOptions implements Cloneable {
private String hiveDelimsReplacement;
@StoredAsProperty("hive.partition.key") private String hivePartitionKey;
@StoredAsProperty("hive.partition.value") private String hivePartitionValue;
+ @StoredAsProperty("hcatalog.table.name")
+ private String hCatTableName;
+ @StoredAsProperty("hcatalog.database.name")
+ private String hCatDatabaseName;
+ @StoredAsProperty("hcatalog.create.table")
+ private boolean hCatCreateTable;
+ @StoredAsProperty("hcatalog.storage.stanza")
+ private String hCatStorageStanza;
+ private String hCatHome; // not serialized to metastore.
// User explicit mapping of types
private Properties mapColumnJava; // stored as map.colum.java
@@ -197,7 +210,9 @@ public class SqoopOptions implements Cloneable {
private DelimiterSet inputDelimiters; // codegen.input.delimiters.
private DelimiterSet outputDelimiters; // codegen.output.delimiters.
- private boolean areDelimsManuallySet;
+
+ private boolean areOutputDelimsManuallySet;
+ private boolean areInputDelimsManuallySet;
private Configuration conf;
@@ -580,7 +595,8 @@ public class SqoopOptions implements Cloneable {
// Delimiters were previously memoized; don't let the tool override
// them with defaults.
- this.areDelimsManuallySet = true;
+ this.areOutputDelimsManuallySet = true;
+ this.areInputDelimsManuallySet = true;
// If we loaded true verbose flag, we need to apply it
if (this.verbose) {
@@ -804,7 +820,21 @@ public class SqoopOptions implements Cloneable {
public static String getHiveHomeDefault() {
// Set this with $HIVE_HOME, but -Dhive.home can override.
String hiveHome = System.getenv("HIVE_HOME");
- return System.getProperty("hive.home", hiveHome);
+ hiveHome = System.getProperty("hive.home", hiveHome);
+ if (hiveHome == null) {
+ hiveHome = DEF_HIVE_HOME;
+ }
+ return hiveHome;
+ }
+
+ public static String getHCatHomeDefault() {
+ // Set this with $HCAT_HOME, but -Dhcatalog.home can override.
+ String hcatHome = System.getenv("HCAT_HOME");
+ hcatHome = System.getProperty("hcat.home", hcatHome);
+ if (hcatHome == null) {
+ hcatHome = DEF_HCAT_HOME;
+ }
+ return hcatHome;
}
private void initDefaults(Configuration baseConfiguration) {
@@ -813,6 +843,7 @@ public class SqoopOptions implements Cloneable {
this.hadoopMapRedHome = System.getenv("HADOOP_MAPRED_HOME");
this.hiveHome = getHiveHomeDefault();
+ this.hCatHome = getHCatHomeDefault();
this.inputDelimiters = new DelimiterSet(
DelimiterSet.NULL_CHAR, DelimiterSet.NULL_CHAR,
@@ -834,7 +865,8 @@ public class SqoopOptions implements Cloneable {
this.jarDirIsAuto = true;
this.layout = FileLayout.TextFile;
- this.areDelimsManuallySet = false;
+ this.areOutputDelimsManuallySet = false;
+ this.areInputDelimsManuallySet = false;
this.numMappers = DEFAULT_NUM_MAPPERS;
this.useCompression = false;
@@ -1263,6 +1295,47 @@ public class SqoopOptions implements Cloneable {
this.failIfHiveTableExists = fail;
}
+ // HCatalog support
+ public void setHCatTableName(String ht) {
+ this.hCatTableName = ht;
+ }
+
+ public String getHCatTableName() {
+ return this.hCatTableName;
+ }
+
+ public void setHCatDatabaseName(String hd) {
+ this.hCatDatabaseName = hd;
+ }
+
+ public String getHCatDatabaseName() {
+ return this.hCatDatabaseName;
+ }
+
+
+ public String getHCatHome() {
+ return hCatHome;
+ }
+
+ public void setHCatHome(String home) {
+ this.hCatHome = home;
+ }
+
+ public boolean doCreateHCatalogTable() {
+ return hCatCreateTable;
+ }
+
+ public void setCreateHCatalogTable(boolean create) {
+ this.hCatCreateTable = create;
+ }
+
+ public void setHCatStorageStanza(String stanza) {
+ this.hCatStorageStanza = stanza;
+ }
+
+ public String getHCatStorageStanza() {
+ return this.hCatStorageStanza;
+ }
/**
* @return location where .java files go; guaranteed to end with '/'.
*/
@@ -1673,18 +1746,32 @@ public class SqoopOptions implements Cloneable {
this.fetchSize = size;
}
+ /*
+ * @return true if the output delimiters have been explicitly set by the user
+ */
+ public boolean explicitOutputDelims() {
+ return areOutputDelimsManuallySet;
+ }
+
/**
- * @return true if the delimiters have been explicitly set by the user.
+ * Flag the output delimiter settings as explicit user settings, or implicit.
*/
- public boolean explicitDelims() {
- return areDelimsManuallySet;
+ public void setExplicitOutputDelims(boolean explicit) {
+ this.areOutputDelimsManuallySet = explicit;
}
/**
- * Flag the delimiter settings as explicit user settings, or implicit.
+ * @return true if the input delimiters have been explicitly set by the user.
*/
- public void setExplicitDelims(boolean explicit) {
- this.areDelimsManuallySet = explicit;
+ public boolean explicitInputDelims() {
+ return areInputDelimsManuallySet;
+ }
+
+ /**
+ * Flag the input delimiter settings as explicit user settings, or implicit.
+ */
+ public void setExplicitInputDelims(boolean explicit) {
+ this.areInputDelimsManuallySet = explicit;
}
public Configuration getConf() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/config/ConfigurationConstants.java b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
index 5354063..2070b63 100644
--- a/src/java/org/apache/sqoop/config/ConfigurationConstants.java
+++ b/src/java/org/apache/sqoop/config/ConfigurationConstants.java
@@ -60,6 +60,18 @@ public final class ConfigurationConstants {
public static final String PROP_MAPRED_JOB_TRACKER_ADDRESS =
"mapred.job.tracker";
+ /**
+ * The Configuration property identifying the job tracker address (new).
+ */
+ public static final String PROP_MAPREDUCE_JOB_TRACKER_ADDRESS =
+ "mapreduce.jobtracker.address";
+
+ /**
+ * The Configuration property identifying the framework name. If set to YARN
+ * then we will not be in local mode.
+ */
+ public static final String PROP_MAPREDUCE_FRAMEWORK_NAME =
+ "mapreduce.framework.name";
/**
* The group name of task counters.
*/
@@ -78,6 +90,11 @@ public final class ConfigurationConstants {
public static final String COUNTER_MAP_INPUT_RECORDS =
"MAP_INPUT_RECORDS";
+ /**
+ * The name of the parameter for ToolRunner to set jars to add to distcache.
+ */
+ public static final String MAPRED_DISTCACHE_CONF_PARAM = "tmpjars";
+
private ConfigurationConstants() {
// Disable Explicit Object Creation
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/hive/HiveImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hive/HiveImport.java b/src/java/org/apache/sqoop/hive/HiveImport.java
index 838f083..02596a6 100644
--- a/src/java/org/apache/sqoop/hive/HiveImport.java
+++ b/src/java/org/apache/sqoop/hive/HiveImport.java
@@ -60,6 +60,15 @@ public class HiveImport {
private ConnManager connManager;
private Configuration configuration;
private boolean generateOnly;
+ private static boolean testMode = false;
+
+ public static boolean getTestMode() {
+ return testMode;
+ }
+
+ public static void setTestMode(boolean mode) {
+ testMode = mode;
+ }
/** Entry point through which Hive invocation should be attempted. */
private static final String HIVE_MAIN_CLASS =
@@ -285,6 +294,14 @@ public class HiveImport {
throws IOException {
SubprocessSecurityManager subprocessSM = null;
+ if (testMode) {
+ // We use external mock hive process for test mode as
+ // HCatalog dependency would have brought in Hive classes.
+ LOG.debug("Using external Hive process in test mode.");
+ executeExternalHiveScript(filename, env);
+ return;
+ }
+
try {
Class cliDriverClass = Class.forName(HIVE_MAIN_CLASS);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/manager/ConnManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java
index a1ac38e..3549bda 100644
--- a/src/java/org/apache/sqoop/manager/ConnManager.java
+++ b/src/java/org/apache/sqoop/manager/ConnManager.java
@@ -164,6 +164,70 @@ public abstract class ConnManager {
return HiveTypes.toHiveType(sqlType);
}
+ /**
+ * Resolve a database-specific type to HCat data type. Largely follows Sqoop's
+ * hive translation.
+ * @param sqlType
+ * sql type
+ * @return hcat type
+ */
+ public String toHCatType(int sqlType) {
+ switch (sqlType) {
+
+ // Ideally TINYINT and SMALLINT should be mapped to their
+ // HCat equivalents tinyint and smallint respectively
+ // But the Sqoop Java type conversion has them mapped to Integer
+ // Even though the referenced Java doc clearly recommends otherwise.
+ // Chaning this now can cause many of the sequence file usages to
+ // break as value class implementations will change. So, we
+ // just use the same behavior here.
+ case Types.SMALLINT:
+ case Types.TINYINT:
+ case Types.INTEGER:
+ return "int";
+
+ case Types.VARCHAR:
+ case Types.CHAR:
+ case Types.LONGVARCHAR:
+ case Types.NVARCHAR:
+ case Types.NCHAR:
+ case Types.LONGNVARCHAR:
+ case Types.DATE:
+ case Types.TIME:
+ case Types.TIMESTAMP:
+ case Types.CLOB:
+ return "string";
+
+ case Types.FLOAT:
+ case Types.REAL:
+ return "float";
+
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ return "string";
+
+ case Types.DOUBLE:
+ return "double";
+
+ case Types.BIT:
+ case Types.BOOLEAN:
+ return "boolean";
+
+ case Types.BIGINT:
+ return "bigint";
+
+ case Types.BINARY:
+ case Types.VARBINARY:
+ case Types.BLOB:
+ case Types.LONGVARBINARY:
+ return "binary";
+
+ default:
+ throw new IllegalArgumentException(
+ "Cannot convert SQL type to HCatalog type " + sqlType);
+ }
+ }
+
/**
* Resolve a database-specific type to Avro data type.
* @param sqlType sql type
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index ef1d363..5afd90c 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
import org.apache.avro.Schema;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -30,6 +31,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.lib.LargeObjectLoader;
@@ -63,6 +65,13 @@ public class DataDrivenImportJob extends ImportJobBase {
@Override
protected void configureMapper(Job job, String tableName,
String tableClassName) throws IOException {
+ if (isHCatJob) {
+ LOG.info("Configuring mapper for HCatalog import job");
+ job.setOutputKeyClass(LongWritable.class);
+ job.setOutputValueClass(SqoopHCatUtilities.getImportValueClass());
+ job.setMapperClass(SqoopHCatUtilities.getImportMapperClass());
+ return;
+ }
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
// For text files, specify these as the output types; for
// other types, we just use the defaults.
@@ -82,6 +91,9 @@ public class DataDrivenImportJob extends ImportJobBase {
@Override
protected Class<? extends Mapper> getMapperClass() {
+ if (options.getHCatTableName() != null) {
+ return SqoopHCatUtilities.getImportMapperClass();
+ }
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
return TextImportMapper.class;
} else if (options.getFileLayout()
@@ -98,6 +110,10 @@ public class DataDrivenImportJob extends ImportJobBase {
@Override
protected Class<? extends OutputFormat> getOutputFormatClass()
throws ClassNotFoundException {
+ if (isHCatJob) {
+ LOG.debug("Returning HCatOutputFormat for output format");
+ return SqoopHCatUtilities.getOutputFormatClass();
+ }
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
return RawKeyTextOutputFormat.class;
} else if (options.getFileLayout()
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
index 1065d0b..d0be570 100644
--- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.util.LoggingUtils;
import org.apache.sqoop.util.PerfCounters;
import com.cloudera.sqoop.SqoopOptions;
@@ -57,7 +58,7 @@ public class ExportJobBase extends JobBase {
* The (inferred) type of a file or group of files.
*/
public enum FileType {
- SEQUENCE_FILE, AVRO_DATA_FILE, UNKNOWN
+ SEQUENCE_FILE, AVRO_DATA_FILE, HCATALOG_MANAGED_FILE, UNKNOWN
}
public static final Log LOG = LogFactory.getLog(
@@ -80,6 +81,7 @@ public class ExportJobBase extends JobBase {
protected ExportJobContext context;
+
public ExportJobBase(final ExportJobContext ctxt) {
this(ctxt, null, null, null);
}
@@ -195,6 +197,9 @@ public class ExportJobBase extends JobBase {
* @return the Path to the files we are going to export to the db.
*/
protected Path getInputPath() throws IOException {
+ if (isHCatJob) {
+ return null;
+ }
Path inputPath = new Path(context.getOptions().getExportDir());
Configuration conf = options.getConf();
inputPath = inputPath.makeQualified(FileSystem.get(conf));
@@ -207,7 +212,9 @@ public class ExportJobBase extends JobBase {
throws ClassNotFoundException, IOException {
super.configureInputFormat(job, tableName, tableClassName, splitByCol);
- FileInputFormat.addInputPath(job, getInputPath());
+ if (!isHCatJob) {
+ FileInputFormat.addInputPath(job, getInputPath());
+ }
}
@Override
@@ -371,6 +378,12 @@ public class ExportJobBase extends JobBase {
}
propagateOptionsToJob(job);
+ if (isHCatJob) {
+ LOG.info("Configuring HCatalog for export job");
+ SqoopHCatUtilities hCatUtils = SqoopHCatUtilities.instance();
+ hCatUtils.configureHCat(options, job, cmgr, tableName,
+ job.getConfiguration());
+ }
configureInputFormat(job, tableName, tableClassName, null);
configureOutputFormat(job, tableName, tableClassName);
configureMapper(job, tableName, tableClassName);
@@ -448,6 +461,9 @@ public class ExportJobBase extends JobBase {
}
protected FileType getInputFileType() {
+ if (isHCatJob) {
+ return FileType.HCATALOG_MANAGED_FILE;
+ }
try {
return getFileType(context.getOptions().getConf(), getInputPath());
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index 2465f3f..ab7f21e 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.util.PerfCounters;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.config.ConfigurationHelper;
@@ -92,6 +93,13 @@ public class ImportJobBase extends JobBase {
job.setOutputFormatClass(getOutputFormatClass());
+ if (isHCatJob) {
+ LOG.debug("Configuring output format for HCatalog import job");
+ SqoopHCatUtilities.configureImportOutputFormat(options, job,
+ getContext().getConnManager(), tableName, job.getConfiguration());
+ return;
+ }
+
if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
job.getConfiguration().set("mapred.output.value.class", tableClassName);
}
@@ -149,6 +157,11 @@ public class ImportJobBase extends JobBase {
perfCounters.startClock();
boolean success = doSubmitJob(job);
+
+ if (isHCatJob) {
+ SqoopHCatUtilities.instance().invokeOutputCommitterForLocalMode(job);
+ }
+
perfCounters.stopClock();
Counters jobCounters = job.getCounters();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
index 20636a0..fee78e0 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ExportJobContext;
import com.cloudera.sqoop.mapreduce.ExportJobBase;
@@ -65,7 +66,11 @@ public class JdbcExportJob extends ExportJobBase {
super.configureInputFormat(job, tableName, tableClassName, splitByCol);
- if (fileType == FileType.AVRO_DATA_FILE) {
+ if (isHCatJob) {
+ SqoopHCatUtilities.configureExportInputFormat(options, job,
+ context.getConnManager(), tableName, job.getConfiguration());
+ return;
+ } else if (fileType == FileType.AVRO_DATA_FILE) {
LOG.debug("Configuring for Avro export");
ConnManager connManager = context.getConnManager();
Map<String, Integer> columnTypeInts;
@@ -93,6 +98,9 @@ public class JdbcExportJob extends ExportJobBase {
@Override
protected Class<? extends InputFormat> getInputFormatClass()
throws ClassNotFoundException {
+ if (isHCatJob) {
+ return SqoopHCatUtilities.getInputFormatClass();
+ }
if (fileType == FileType.AVRO_DATA_FILE) {
return AvroInputFormat.class;
}
@@ -101,6 +109,9 @@ public class JdbcExportJob extends ExportJobBase {
@Override
protected Class<? extends Mapper> getMapperClass() {
+ if (isHCatJob) {
+ return SqoopHCatUtilities.getExportMapperClass();
+ }
switch (fileType) {
case SEQUENCE_FILE:
return SequenceFileExportMapper.class;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/JobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JobBase.java b/src/java/org/apache/sqoop/mapreduce/JobBase.java
index 0df1156..322df1c 100644
--- a/src/java/org/apache/sqoop/mapreduce/JobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/JobBase.java
@@ -56,6 +56,7 @@ public class JobBase {
private Job mrJob;
private ClassLoader prevClassLoader = null;
+ protected final boolean isHCatJob;
public static final String PROPERTY_VERBOSE = "sqoop.verbose";
@@ -76,6 +77,7 @@ public class JobBase {
this.mapperClass = mapperClass;
this.inputFormatClass = inputFormatClass;
this.outputFormatClass = outputFormatClass;
+ isHCatJob = options.getHCatTableName() != null;
}
/**
@@ -220,7 +222,7 @@ public class JobBase {
*/
protected void loadJars(Configuration conf, String ormJarFile,
String tableClassName) throws IOException {
-
+
boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
|| "local".equals(conf.get("mapred.job.tracker"));
if (isLocal) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java
new file mode 100644
index 0000000..47febf7
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.sqoop.mapreduce.ExportInputFormat;
+
+/**
+ * A combined HCatInputFormat equivalent that allows us to generate the number
+ * of splits to the number of map tasks.
+ *
+ * The logic is simple. We get the list of splits for HCatInputFormat. If it is
+ * less than the number of mappers, all is good. Else, we sort the splits by
+ * size and assign them to each of the mappers in a simple scheme. After
+ * assigning the splits to each of the mapper, for the next round we start with
+ * the mapper that got the last split. That way, the size of the split is
+ * distributed in a more uniform fashion than a simple round-robin assignment.
+ */
+public class SqoopHCatExportFormat extends HCatInputFormat {
+ public static final Log LOG = LogFactory
+ .getLog(SqoopHCatExportFormat.class.getName());
+
+ @Override
+ public List<InputSplit> getSplits(JobContext job)
+ throws IOException, InterruptedException {
+ List<InputSplit> hCatSplits = super.getSplits(job);
+ int hCatSplitCount = hCatSplits.size();
+ int expectedSplitCount = ExportInputFormat.getNumMapTasks(job);
+ if (expectedSplitCount == 0) {
+ expectedSplitCount = hCatSplitCount;
+ }
+ LOG.debug("Expected split count " + expectedSplitCount);
+ LOG.debug("HCatInputFormat provided split count " + hCatSplitCount);
+ // Sort the splits by length descending.
+
+ Collections.sort(hCatSplits, new Comparator<InputSplit>() {
+ @Override
+ public int compare(InputSplit is1, InputSplit is2) {
+ try {
+ return (int) (is2.getLength() - is1.getLength());
+ } catch (Exception e) {
+ LOG.warn("Exception caught while sorting Input splits " + e);
+ }
+ return 0;
+ }
+ });
+ List<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+
+ // The number of splits generated by HCatInputFormat is within
+ // our limits
+
+ if (hCatSplitCount <= expectedSplitCount) {
+ for (InputSplit split : hCatSplits) {
+ List<InputSplit> hcSplitList = new ArrayList<InputSplit>();
+ hcSplitList.add(split);
+ combinedSplits.add(new SqoopHCatInputSplit(hcSplitList));
+ }
+ return combinedSplits;
+ }
+ List<List<InputSplit>> combinedSplitList =
+ new ArrayList<List<InputSplit>>();
+ for (int i = 0; i < expectedSplitCount; i++) {
+ combinedSplitList.add(new ArrayList<InputSplit>());
+ }
+ boolean ascendingAssigment = true;
+
+ int lastSet = 0;
+ for (int i = 0; i < hCatSplitCount; ++i) {
+ int splitNum = i % expectedSplitCount;
+ int currentSet = i / expectedSplitCount;
+ if (currentSet != lastSet) {
+ ascendingAssigment = !ascendingAssigment;
+ }
+ if (ascendingAssigment) {
+ combinedSplitList.get(splitNum).add(hCatSplits.get(i));
+ } else {
+ combinedSplitList.
+ get(expectedSplitCount - 1 - splitNum).add(hCatSplits.get(i));
+ }
+ lastSet = currentSet;
+ }
+ for (int i = 0; i < expectedSplitCount; i++) {
+ SqoopHCatInputSplit sqoopSplit =
+ new SqoopHCatInputSplit(combinedSplitList.get(i));
+ combinedSplits.add(sqoopSplit);
+ }
+
+ return combinedSplits;
+
+ }
+
+ @Override
+ public RecordReader<WritableComparable, HCatRecord>
+ createRecordReader(InputSplit split,
+ TaskAttemptContext taskContext)
+ throws IOException, InterruptedException {
+ LOG.debug("Creating a SqoopHCatRecordReader");
+ return new SqoopHCatRecordReader(split, taskContext, this);
+ }
+
+ public RecordReader<WritableComparable, HCatRecord>
+ createHCatRecordReader(InputSplit split,
+ TaskAttemptContext taskContext)
+ throws IOException, InterruptedException {
+ LOG.debug("Creating a base HCatRecordReader");
+ return super.createRecordReader(split, taskContext);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
new file mode 100644
index 0000000..539cedf
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.sqoop.mapreduce.ExportJobBase;
+
+/**
+ * A mapper that works on combined hcat splits.
+ */
+public class SqoopHCatExportMapper
+ extends
+ AutoProgressMapper<WritableComparable, HCatRecord,
+ SqoopRecord, WritableComparable> {
+ public static final Log LOG = LogFactory
+ .getLog(SqoopHCatExportMapper.class.getName());
+ private InputJobInfo jobInfo;
+ private HCatSchema hCatFullTableSchema;
+ private List<HCatFieldSchema> hCatSchemaFields;
+
+ private SqoopRecord sqoopRecord;
+ private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
+ private static final String TIME_TYPE = "java.sql.Time";
+ private static final String DATE_TYPE = "java.sql.Date";
+ private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
+ private static final String FLOAT_TYPE = "Float";
+ private static final String DOUBLE_TYPE = "Double";
+ private static final String BYTE_TYPE = "Byte";
+ private static final String SHORT_TYPE = "Short";
+ private static final String INTEGER_TYPE = "Integer";
+ private static final String LONG_TYPE = "Long";
+ private static final String BOOLEAN_TYPE = "Boolean";
+ private static final String STRING_TYPE = "String";
+ private static final String BYTESWRITABLE =
+ "org.apache.hadoop.io.BytesWritable";
+ private static boolean debugHCatExportMapper = false;
+ private MapWritable colTypesJava;
+ private MapWritable colTypesSql;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+
+ colTypesJava = DefaultStringifier.load(conf,
+ SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_JAVA, MapWritable.class);
+ colTypesSql = DefaultStringifier.load(conf,
+ SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_SQL, MapWritable.class);
+ // Instantiate a copy of the user's class to hold and parse the record.
+
+ String recordClassName = conf.get(
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
+ if (null == recordClassName) {
+ throw new IOException("Export table class name ("
+ + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ + ") is not set!");
+ }
+ debugHCatExportMapper = conf.getBoolean(
+ SqoopHCatUtilities.DEBUG_HCAT_EXPORT_MAPPER_PROP, false);
+ try {
+ Class cls = Class.forName(recordClassName, true,
+ Thread.currentThread().getContextClassLoader());
+ sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+
+ if (null == sqoopRecord) {
+ throw new IOException("Could not instantiate object of type "
+ + recordClassName);
+ }
+
+ String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ jobInfo =
+ (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
+ HCatSchema tableSchema = jobInfo.getTableInfo().getDataColumns();
+ HCatSchema partitionSchema =
+ jobInfo.getTableInfo().getPartitionColumns();
+ hCatFullTableSchema = new HCatSchema(tableSchema.getFields());
+ for (HCatFieldSchema hfs : partitionSchema.getFields()) {
+ hCatFullTableSchema.append(hfs);
+ }
+ hCatSchemaFields = hCatFullTableSchema.getFields();
+
+ }
+
+ @Override
+ public void map(WritableComparable key, HCatRecord value,
+ Context context)
+ throws IOException, InterruptedException {
+ context.write(convertToSqoopRecord(value), NullWritable.get());
+ }
+
+ private SqoopRecord convertToSqoopRecord(HCatRecord hcr)
+ throws IOException {
+ Text key = new Text();
+ for (Map.Entry<String, Object> e : sqoopRecord.getFieldMap().entrySet()) {
+ String colName = e.getKey();
+ String hfn = colName.toLowerCase();
+ key.set(hfn);
+ String javaColType = colTypesJava.get(key).toString();
+ int sqlType = ((IntWritable) colTypesSql.get(key)).get();
+ HCatFieldSchema field =
+ hCatFullTableSchema.get(hfn);
+ HCatFieldSchema.Type fieldType = field.getType();
+ Object hCatVal =
+ hcr.get(hfn, hCatFullTableSchema);
+ String hCatTypeString = field.getTypeString();
+ Object sqlVal = convertToSqoop(hCatVal, fieldType,
+ javaColType, hCatTypeString);
+ if (debugHCatExportMapper) {
+ LOG.debug("hCatVal " + hCatVal + " of type "
+ + (hCatVal == null ? null : hCatVal.getClass().getName())
+ + ",sqlVal " + sqlVal + " of type "
+ + (sqlVal == null ? null : sqlVal.getClass().getName())
+ + ",java type " + javaColType + ", sql type = "
+ + SqoopHCatUtilities.sqlTypeString(sqlType));
+ }
+ sqoopRecord.setField(colName, sqlVal);
+ }
+ return sqoopRecord;
+ }
+
+ private Object convertToSqoop(Object val,
+ HCatFieldSchema.Type fieldType, String javaColType,
+ String hCatTypeString) throws IOException {
+
+ if (val == null) {
+ return null;
+ }
+
+ switch (fieldType) {
+ case INT:
+ case TINYINT:
+ case SMALLINT:
+ case FLOAT:
+ case DOUBLE:
+ val = convertNumberTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ break;
+ case BOOLEAN:
+ val = convertBooleanTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ break;
+ case BIGINT:
+ if (javaColType.equals(DATE_TYPE)) {
+ return new Date((Long) val);
+ } else if (javaColType.equals(TIME_TYPE)) {
+ return new Time((Long) val);
+ } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+ return new Timestamp((Long) val);
+ } else {
+ val = convertNumberTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ }
+ break;
+ case STRING:
+ val = convertStringTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ break;
+ case BINARY:
+ val = convertBinaryTypes(val, javaColType);
+ if (val != null) {
+ return val;
+ }
+ break;
+ case ARRAY:
+ case MAP:
+ case STRUCT:
+ default:
+ throw new IOException("Cannot convert HCatalog type "
+ + fieldType);
+ }
+ LOG.error("Cannot convert HCatalog object of "
+ + " type " + hCatTypeString + " to java object type "
+ + javaColType);
+ return null;
+ }
+
+ private Object convertBinaryTypes(Object val, String javaColType) {
+ byte[] bb = (byte[]) val;
+ if (javaColType.equals(BYTESWRITABLE)) {
+ BytesWritable bw = new BytesWritable();
+ bw.set(bb, 0, bb.length);
+ return bw;
+ }
+ return null;
+ }
+
+ private Object convertStringTypes(Object val, String javaColType) {
+ String valStr = val.toString();
+ if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+ return new BigDecimal(valStr);
+ } else if (javaColType.equals(DATE_TYPE)
+ || javaColType.equals(TIME_TYPE)
+ || javaColType.equals(TIMESTAMP_TYPE)) {
+ // Oracle expects timestamps for Date also by default based on version
+ // Just allow all date types to be assignment compatible
+ if (valStr.length() == 10) { // Date in yyyy-mm-dd format
+ Date d = Date.valueOf(valStr);
+ if (javaColType.equals(DATE_TYPE)) {
+ return d;
+ } else if (javaColType.equals(TIME_TYPE)) {
+ return new Time(d.getTime());
+ } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+ return new Timestamp(d.getTime());
+ }
+ } else if (valStr.length() == 8) { // time in hh:mm:ss
+ Time t = Time.valueOf(valStr);
+ if (javaColType.equals(DATE_TYPE)) {
+ return new Date(t.getTime());
+ } else if (javaColType.equals(TIME_TYPE)) {
+ return t;
+ } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+ return new Timestamp(t.getTime());
+ }
+ } else if (valStr.length() == 19) { // timestamp in yyyy-mm-dd hh:ss:mm
+ Timestamp ts = Timestamp.valueOf(valStr);
+ if (javaColType.equals(DATE_TYPE)) {
+ return new Date(ts.getTime());
+ } else if (javaColType.equals(TIME_TYPE)) {
+ return new Time(ts.getTime());
+ } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+ return ts;
+ }
+ } else {
+ return null;
+ }
+ } else if (javaColType.equals(STRING_TYPE)) {
+ return valStr;
+ } else if (javaColType.equals(BOOLEAN_TYPE)) {
+ return Boolean.valueOf(valStr);
+ } else if (javaColType.equals(BYTE_TYPE)) {
+ return Byte.parseByte(valStr);
+ } else if (javaColType.equals(SHORT_TYPE)) {
+ return Short.parseShort(valStr);
+ } else if (javaColType.equals(INTEGER_TYPE)) {
+ return Integer.parseInt(valStr);
+ } else if (javaColType.equals(LONG_TYPE)) {
+ return Long.parseLong(valStr);
+ } else if (javaColType.equals(FLOAT_TYPE)) {
+ return Float.parseFloat(valStr);
+ } else if (javaColType.equals(DOUBLE_TYPE)) {
+ return Double.parseDouble(valStr);
+ }
+ return null;
+ }
+
+ private Object convertBooleanTypes(Object val, String javaColType) {
+ Boolean b = (Boolean) val;
+ if (javaColType.equals(BOOLEAN_TYPE)) {
+ return b;
+ } else if (javaColType.equals(BYTE_TYPE)) {
+ return (byte) (b ? 1 : 0);
+ } else if (javaColType.equals(SHORT_TYPE)) {
+ return (short) (b ? 1 : 0);
+ } else if (javaColType.equals(INTEGER_TYPE)) {
+ return (int) (b ? 1 : 0);
+ } else if (javaColType.equals(LONG_TYPE)) {
+ return (long) (b ? 1 : 0);
+ } else if (javaColType.equals(FLOAT_TYPE)) {
+ return (float) (b ? 1 : 0);
+ } else if (javaColType.equals(DOUBLE_TYPE)) {
+ return (double) (b ? 1 : 0);
+ } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+ return new BigDecimal(b ? 1 : 0);
+ } else if (javaColType.equals(STRING_TYPE)) {
+ return val.toString();
+ }
+ return null;
+ }
+
+ private Object convertNumberTypes(Object val, String javaColType) {
+ Number n = (Number) val;
+ if (javaColType.equals(BYTE_TYPE)) {
+ return n.byteValue();
+ } else if (javaColType.equals(SHORT_TYPE)) {
+ return n.shortValue();
+ } else if (javaColType.equals(INTEGER_TYPE)) {
+ return n.intValue();
+ } else if (javaColType.equals(LONG_TYPE)) {
+ return n.longValue();
+ } else if (javaColType.equals(FLOAT_TYPE)) {
+ return n.floatValue();
+ } else if (javaColType.equals(DOUBLE_TYPE)) {
+ return n.doubleValue();
+ } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+ return new BigDecimal(n.doubleValue());
+ } else if (javaColType.equals(BOOLEAN_TYPE)) {
+ return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
+ } else if (javaColType.equals(STRING_TYPE)) {
+ return n.toString();
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
new file mode 100644
index 0000000..4f0ff1b
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.ImportJobBase;
+import org.apache.sqoop.mapreduce.SqoopMapper;
+
+import com.cloudera.sqoop.lib.BlobRef;
+import com.cloudera.sqoop.lib.ClobRef;
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.lib.FieldFormatter;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+
+/**
+ * A mapper for HCatalog import.
+ */
+public class SqoopHCatImportMapper extends
+ SqoopMapper<WritableComparable, SqoopRecord,
+ WritableComparable, HCatRecord> {
+ public static final Log LOG = LogFactory
+ .getLog(SqoopHCatImportMapper.class.getName());
+
+ private static boolean debugHCatImportMapper = false;
+
+ private InputJobInfo jobInfo;
+ private HCatSchema hCatFullTableSchema;
+ private int fieldCount;
+ private boolean bigDecimalFormatString;
+ private LargeObjectLoader lobLoader;
+ private HCatSchema partitionSchema = null;
+ private HCatSchema dataColsSchema = null;
+ private String stringDelimiterReplacements = null;
+ private ArrayWritable delimCharsArray;
+ private String hiveDelimsReplacement;
+ private boolean doHiveDelimsReplacement = false;
+ private DelimiterSet hiveDelimiters;
+ private String staticPartitionKey;
+ private int[] hCatFieldPositions;
+ private int colCount;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ jobInfo =
+ (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
+ dataColsSchema = jobInfo.getTableInfo().getDataColumns();
+ partitionSchema =
+ jobInfo.getTableInfo().getPartitionColumns();
+ StringBuilder storerInfoStr = new StringBuilder(1024);
+ StorerInfo storerInfo = jobInfo.getTableInfo().getStorerInfo();
+ storerInfoStr.append("HCatalog Storer Info : ")
+ .append("\n\tHandler = ").append(storerInfo.getStorageHandlerClass())
+ .append("\n\tInput format class = ").append(storerInfo.getIfClass())
+ .append("\n\tOutput format class = ").append(storerInfo.getOfClass())
+ .append("\n\tSerde class = ").append(storerInfo.getSerdeClass());
+ Properties storerProperties = storerInfo.getProperties();
+ if (!storerProperties.isEmpty()) {
+ storerInfoStr.append("\nStorer properties ");
+ for (Map.Entry<Object, Object> entry : storerProperties.entrySet()) {
+ String key = (String) entry.getKey();
+ Object val = entry.getValue();
+ storerInfoStr.append("\n\t").append(key).append('=').append(val);
+ }
+ }
+ storerInfoStr.append("\n");
+ LOG.info(storerInfoStr);
+
+ hCatFullTableSchema = new HCatSchema(dataColsSchema.getFields());
+ for (HCatFieldSchema hfs : partitionSchema.getFields()) {
+ hCatFullTableSchema.append(hfs);
+ }
+ fieldCount = hCatFullTableSchema.size();
+ lobLoader = new LargeObjectLoader(conf,
+ new Path(jobInfo.getTableInfo().getTableLocation()));
+ bigDecimalFormatString = conf.getBoolean(
+ ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
+ ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+ debugHCatImportMapper = conf.getBoolean(
+ SqoopHCatUtilities.DEBUG_HCAT_IMPORT_MAPPER_PROP, false);
+ IntWritable[] delimChars = DefaultStringifier.loadArray(conf,
+ SqoopHCatUtilities.HIVE_DELIMITERS_TO_REPLACE_PROP, IntWritable.class);
+ hiveDelimiters = new DelimiterSet(
+ (char) delimChars[0].get(), (char) delimChars[1].get(),
+ (char) delimChars[2].get(), (char) delimChars[3].get(),
+ delimChars[4].get() == 1 ? true : false);
+ hiveDelimsReplacement =
+ conf.get(SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_PROP);
+ if (hiveDelimsReplacement == null) {
+ hiveDelimsReplacement = "";
+ }
+ doHiveDelimsReplacement = Boolean.valueOf(conf.get(
+ SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP));
+
+ IntWritable[] fPos = DefaultStringifier.loadArray(conf,
+ SqoopHCatUtilities.HCAT_FIELD_POSITIONS_PROP, IntWritable.class);
+ hCatFieldPositions = new int[fPos.length];
+ for (int i = 0; i < fPos.length; ++i) {
+ hCatFieldPositions[i] = fPos[i].get();
+ }
+
+ LOG.debug("Hive delims replacement enabled : " + doHiveDelimsReplacement);
+ LOG.debug("Hive Delimiters : " + hiveDelimiters.toString());
+ LOG.debug("Hive delimiters replacement : " + hiveDelimsReplacement);
+ staticPartitionKey =
+ conf.get(SqoopHCatUtilities.HCAT_STATIC_PARTITION_KEY_PROP);
+ LOG.debug("Static partition key used : " + staticPartitionKey);
+
+
+ }
+
+ @Override
+ public void map(WritableComparable key, SqoopRecord value,
+ Context context)
+ throws IOException, InterruptedException {
+
+ try {
+ // Loading of LOBs was delayed until we have a Context.
+ value.loadLargeObjects(lobLoader);
+ } catch (SQLException sqlE) {
+ throw new IOException(sqlE);
+ }
+ if (colCount == -1) {
+ colCount = value.getFieldMap().size();
+ }
+ context.write(key, convertToHCatRecord(value));
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException {
+ if (null != lobLoader) {
+ lobLoader.close();
+ }
+ }
+
+ private HCatRecord convertToHCatRecord(SqoopRecord sqr)
+ throws IOException {
+ Map<String, Object> fieldMap = sqr.getFieldMap();
+ HCatRecord result = new DefaultHCatRecord(fieldCount);
+
+ for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
+ String key = entry.getKey();
+ Object val = entry.getValue();
+ String hfn = key.toLowerCase();
+ if (staticPartitionKey != null && staticPartitionKey.equals(hfn)) {
+ continue;
+ }
+ HCatFieldSchema hfs = hCatFullTableSchema.get(hfn);
+ if (debugHCatImportMapper) {
+ LOG.debug("SqoopRecordVal: field = " + key + " Val " + val
+ + " of type " + (val == null ? null : val.getClass().getName())
+ + ", hcattype " + hfs.getTypeString());
+ }
+ Object hCatVal = toHCat(val, hfs.getType(), hfs.getTypeString());
+
+ result.set(hfn, hCatFullTableSchema, hCatVal);
+ }
+
+ return result;
+ }
+
+
+ private Object toHCat(Object val, HCatFieldSchema.Type hfsType,
+ String hCatTypeString) {
+
+ if (val == null) {
+ return null;
+ }
+
+ Object retVal = null;
+
+ if (val instanceof Number) {
+ retVal = convertNumberTypes(val, hfsType);
+ } else if (val instanceof Boolean) {
+ retVal = convertBooleanTypes(val, hfsType);
+ } else if (val instanceof String) {
+ if (hfsType == HCatFieldSchema.Type.STRING) {
+ String str = (String) val;
+ if (doHiveDelimsReplacement) {
+ retVal = FieldFormatter
+ .hiveStringReplaceDelims(str, hiveDelimsReplacement,
+ hiveDelimiters);
+ } else {
+ retVal = str;
+ }
+ }
+ } else if (val instanceof java.util.Date) {
+ retVal = converDateTypes(val, hfsType);
+ } else if (val instanceof BytesWritable) {
+ if (hfsType == HCatFieldSchema.Type.BINARY) {
+ BytesWritable bw = (BytesWritable) val;
+ retVal = bw.getBytes();
+ }
+ } else if (val instanceof BlobRef) {
+ if (hfsType == HCatFieldSchema.Type.BINARY) {
+ BlobRef br = (BlobRef) val;
+ byte[] bytes = br.isExternal() ? br.toString().getBytes()
+ : br.getData();
+ retVal = bytes;
+ }
+ } else if (val instanceof ClobRef) {
+ if (hfsType == HCatFieldSchema.Type.STRING) {
+ ClobRef cr = (ClobRef) val;
+ String s = cr.isExternal() ? cr.toString() : cr.getData();
+ retVal = s;
+ }
+ } else {
+ throw new UnsupportedOperationException("Objects of type "
+ + val.getClass().getName() + " are not suported");
+ }
+ if (retVal == null) {
+ LOG.error("Objects of type "
+ + val.getClass().getName() + " can not be mapped to HCatalog type "
+ + hCatTypeString);
+ }
+ return retVal;
+ }
+
+ private Object converDateTypes(Object val,
+ HCatFieldSchema.Type hfsType) {
+ if (val instanceof java.sql.Date) {
+ if (hfsType == HCatFieldSchema.Type.BIGINT) {
+ return ((Date) val).getTime();
+ } else if (hfsType == HCatFieldSchema.Type.STRING) {
+ return val.toString();
+ }
+ } else if (val instanceof java.sql.Time) {
+ if (hfsType == HCatFieldSchema.Type.BIGINT) {
+ return ((Time) val).getTime();
+ } else if (hfsType == HCatFieldSchema.Type.STRING) {
+ return val.toString();
+ }
+ } else if (val instanceof java.sql.Timestamp) {
+ if (hfsType == HCatFieldSchema.Type.BIGINT) {
+ return ((Timestamp) val).getTime();
+ } else if (hfsType == HCatFieldSchema.Type.STRING) {
+ return val.toString();
+ }
+ }
+ return null;
+ }
+
+ private Object convertBooleanTypes(Object val,
+ HCatFieldSchema.Type hfsType) {
+ Boolean b = (Boolean) val;
+ if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
+ return b;
+ } else if (hfsType == HCatFieldSchema.Type.TINYINT) {
+ return (byte) (b ? 1 : 0);
+ } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
+ return (short) (b ? 1 : 0);
+ } else if (hfsType == HCatFieldSchema.Type.INT) {
+ return (int) (b ? 1 : 0);
+ } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
+ return (long) (b ? 1 : 0);
+ } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
+ return (float) (b ? 1 : 0);
+ } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
+ return (double) (b ? 1 : 0);
+ } else if (hfsType == HCatFieldSchema.Type.STRING) {
+ return val.toString();
+ }
+ return null;
+ }
+
+ private Object convertNumberTypes(Object val,
+ HCatFieldSchema.Type hfsType) {
+ if (!(val instanceof Number)) {
+ return null;
+ }
+ if (val instanceof BigDecimal && hfsType == HCatFieldSchema.Type.STRING) {
+ BigDecimal bd = (BigDecimal) val;
+ if (bigDecimalFormatString) {
+ return bd.toPlainString();
+ } else {
+ return bd.toString();
+ }
+ }
+ Number n = (Number) val;
+ if (hfsType == HCatFieldSchema.Type.TINYINT) {
+ return n.byteValue();
+ } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
+ return n.shortValue();
+ } else if (hfsType == HCatFieldSchema.Type.INT) {
+ return n.intValue();
+ } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
+ return n.longValue();
+ } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
+ return n.floatValue();
+ } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
+ return n.doubleValue();
+ } else if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
+ return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
+ } else if (hfsType == HCatFieldSchema.Type.STRING) {
+ return n.toString();
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5e88d43b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java
new file mode 100644
index 0000000..5a2e48a
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+
+/**
+ * An abstraction of a combined HCatSplits.
+ *
+ */
+public class SqoopHCatInputSplit extends InputSplit implements Writable {
+ private List<HCatSplit> hCatSplits;
+ private String[] hCatLocations;
+ private long inputLength;
+
+ public SqoopHCatInputSplit() {
+ }
+
+ public SqoopHCatInputSplit(List<InputSplit> splits) {
+ hCatSplits = new ArrayList<HCatSplit>();
+ Set<String> locations = new HashSet<String>();
+ for (int i = 0; i < splits.size(); ++i) {
+ HCatSplit hsSplit = (HCatSplit) splits.get(i);
+ hCatSplits.add(hsSplit);
+ this.inputLength += hsSplit.getLength();
+ locations.addAll(Arrays.asList(hsSplit.getLocations()));
+ }
+ this.hCatLocations = locations.toArray(new String[0]);
+ }
+
+ public int length() {
+ return this.hCatSplits.size();
+ }
+
+ public HCatSplit get(int index) {
+ return this.hCatSplits.get(index);
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ if (this.inputLength == 0L) {
+ for (HCatSplit split : this.hCatSplits) {
+ this.inputLength += split.getLength();
+ }
+ }
+ return this.inputLength;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ if (this.hCatLocations == null) {
+ Set<String> locations = new HashSet<String>();
+ for (HCatSplit split : this.hCatSplits) {
+ locations.addAll(Arrays.asList(split.getLocations()));
+ }
+ this.hCatLocations = locations.toArray(new String[0]);
+ }
+ return this.hCatLocations;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(this.inputLength);
+ out.writeInt(this.hCatSplits.size());
+ for (HCatSplit split : this.hCatSplits) {
+ split.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.inputLength = in.readLong();
+ int size = in.readInt();
+ this.hCatSplits = new ArrayList<HCatSplit>(size);
+ for (int i = 0; i < size; ++i) {
+ HCatSplit hs = new HCatSplit();
+ hs.readFields(in);
+ hCatSplits.add(hs);
+ }
+ }
+}
+