You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/09/09 17:20:11 UTC

[3/3] git commit: SQOOP-1167: Enhance HCatalog support to allow direct mode connection manager implementations

SQOOP-1167: Enhance HCatalog support to allow direct mode connection manager implementations

(Venkat Ranganathan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/06183f7a
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/06183f7a
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/06183f7a

Branch: refs/heads/trunk
Commit: 06183f7abae012e961f2431dbb62282a0bf2bbf0
Parents: 56b5a37
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Sep 9 08:19:11 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Sep 9 08:19:11 2013 -0700

----------------------------------------------------------------------
 build.xml                                       |  14 +
 src/docs/user/hcatalog.txt                      |  20 +-
 .../org/apache/sqoop/manager/ConnManager.java   |   8 +
 .../sqoop/manager/DirectNetezzaManager.java     |   8 +
 .../apache/sqoop/mapreduce/ExportJobBase.java   |  10 +
 .../apache/sqoop/mapreduce/ImportJobBase.java   |   8 +
 .../NetezzaExternalTableExportMapper.java       |  37 +-
 .../NetezzaExternalTableHCatExportMapper.java   |  58 +++
 .../NetezzaExternalTableHCatImportMapper.java   |  76 ++++
 .../NetezzaExternalTableImportMapper.java       |  29 +-
 .../NetezzaExternalTableTextImportMapper.java   |  41 +++
 .../db/netezza/NetezzaJDBCStatementRunner.java  |   2 +-
 .../mapreduce/hcat/SqoopHCatExportHelper.java   | 325 +++++++++++++++++
 .../mapreduce/hcat/SqoopHCatExportMapper.java   | 296 +--------------
 .../mapreduce/hcat/SqoopHCatImportHelper.java   | 316 ++++++++++++++++
 .../mapreduce/hcat/SqoopHCatImportMapper.java   | 291 +--------------
 .../netezza/NetezzaExternalTableExportJob.java  |  95 +++--
 .../netezza/NetezzaExternalTableImportJob.java  |  43 ++-
 .../org/apache/sqoop/tool/BaseSqoopTool.java    |   5 +-
 .../com/cloudera/sqoop/ThirdPartyTests.java     |  14 +
 .../manager/DirectNetezzaExportManualTest.java  | 236 ------------
 .../sqoop/manager/NetezzaExportManualTest.java  | 259 -------------
 .../sqoop/manager/NetezzaImportManualTest.java  | 360 ------------------
 .../sqoop/manager/NetezzaTestUtils.java         |  93 -----
 .../apache/sqoop/hcat/HCatalogExportTest.java   |   4 +-
 .../apache/sqoop/hcat/HCatalogImportTest.java   |  26 +-
 .../apache/sqoop/hcat/HCatalogTestUtils.java    |   4 +-
 .../apache/sqoop/hcat/TestHCatalogBasic.java    |  22 +-
 .../netezza/DirectNetezzaExportManualTest.java  | 239 ++++++++++++
 .../DirectNetezzaHCatExportManualTest.java      | 165 +++++++++
 .../DirectNetezzaHCatImportManualTest.java      | 191 ++++++++++
 .../netezza/NetezzaExportManualTest.java        | 215 +++++++++++
 .../netezza/NetezzaImportManualTest.java        | 361 +++++++++++++++++++
 .../sqoop/manager/netezza/NetezzaTestUtils.java | 143 ++++++++
 34 files changed, 2367 insertions(+), 1647 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 39c6337..a186b68 100644
--- a/build.xml
+++ b/build.xml
@@ -322,6 +322,13 @@
   <property name="sqoop.test.db2.connectstring.username" value="SQOOP" />
   <property name="sqoop.test.db2.connectstring.password" value="SQOOP" />
 
+  <property name="sqoop.test.netezza.host" value="nz-host" />
+  <property name="sqoop.test.netezza.port" value="5480" />
+  <property name="sqoop.test.netezza.username" value="ADMIN" />
+  <property name="sqoop.test.netezza.password" value="password" />
+  <property name="sqoop.test.netezza.db.name" value="SQOOP" />
+  <property name="sqoop.test.netezza.table.name" value="EMPNZ" />
+
 
   <condition property="windows">
     <os family="windows" />
@@ -901,6 +908,13 @@
       <sysproperty key="sqoop.test.db2.connectstring.username" value="${sqoop.test.db2.connectstring.username}" />
       <sysproperty key="sqoop.test.db2.connectstring.password" value="${sqoop.test.db2.connectstring.password}" />
 
+      <sysproperty key="sqoop.test.netezza.host" value="${sqoop.test.netezza.host}" />
+      <sysproperty key="sqoop.test.netezza.port" value="${sqoop.test.netezza.port}" />
+      <sysproperty key="sqoop.test.netezza.username" value="${sqoop.test.netezza.username}" />
+      <sysproperty key="sqoop.test.netezza.password" value="${sqoop.test.netezza.password}" />
+      <sysproperty key="sqoop.test.netezza.db.name" value="${sqoop.test.netezza.db.name}" />
+      <sysproperty key="sqoop.test.netezza.table.name" value="${sqoop.test.netezza.table.name}" />
+
       <!-- Location of Hive logs -->
       <!--<sysproperty key="hive.log.dir"
                    value="${test.build.data}/sqoop/logs"/> -->

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/docs/user/hcatalog.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hcatalog.txt b/src/docs/user/hcatalog.txt
index b8e495e..b1a3982 100644
--- a/src/docs/user/hcatalog.txt
+++ b/src/docs/user/hcatalog.txt
@@ -24,8 +24,8 @@ HCatalog Background
 ~~~~~~~~~~~~~~~~~~~
 
 HCatalog is a table and storage management service for Hadoop that enables
-users with different data processing tools – Pig, MapReduce, and Hive –
-to more easily read and write data on the grid. HCatalog’s table abstraction
+users with different data processing tools Pig, MapReduce, and Hive
+to more easily read and write data on the grid. HCatalog's table abstraction
 presents users with a relational view of data in the Hadoop distributed
 file system (HDFS) and ensures that users need not worry about where or
 in what format their data is stored: RCFile format, text files, or
@@ -106,6 +106,17 @@ type STRING. There can be only one static partitioning key.
 +--hive-partition-value+::
 The value associated with the partition.
 
+Direct Mode support
+^^^^^^^^^^^^^^^^^^^
+
+HCatalog integration in Sqoop has been enhanced to support direct mode
+connectors (which are high performance connectors specific to a database).
+Netezza direct mode connector has been enhanced to take advatange of this
+feature.
+
+IMPORTANT: Only Netezza direct mode connector is currently enabled to work
+with HCatalog.
+
 Unsupported Sqoop Options
 ^^^^^^^^^^^^^^^^^^^^^^^^^
 
@@ -122,7 +133,6 @@ Unsupported Sqoop Export and Import Options
 
 The following Sqoop export and import options are not supported with HCatalog jobs.
 
-* +--direct+
 * +--export-dir+
 * +--target-dir+
 * +--warehouse-dir+
@@ -231,8 +241,8 @@ Schema Mapping
 Sqoop currently does not support column name mapping. However, the user
 is allowed to override the type mapping. Type mapping loosely follows
 the Hive type mapping already present in Sqoop except that SQL types
-“FLOAT” and “REAL” are mapped to HCatalog type “float”. In the Sqoop type
-mapping for Hive, these two are mapped to “double”. Type mapping is primarily
+FLOAT and REAL are mapped to HCatalog type float. In the Sqoop type
+mapping for Hive, these two are mapped to double. Type mapping is primarily
 used for checking the column definition correctness only and can be overridden
 with the --map-column-hive option.
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/manager/ConnManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java
index f4b22f9..773d246 100644
--- a/src/java/org/apache/sqoop/manager/ConnManager.java
+++ b/src/java/org/apache/sqoop/manager/ConnManager.java
@@ -773,5 +773,13 @@ public abstract class ConnManager {
         || (columnType == Types.LONGNVARCHAR);
   }
 
+  /**
+   * Determine if HCat integration from direct mode of the connector is
+   * allowed.  By default direct mode is not compatible with HCat
+   * @return Whether direct mode is allowed.
+   */
+  public boolean isDirectModeHCatSupported() {
+    return false;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
index 4f36bf6..89deceb 100644
--- a/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
+++ b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
@@ -281,6 +281,14 @@ public class DirectNetezzaManager extends NetezzaManager {
 
   @Override
   public boolean isORMFacilitySelfManaged() {
+    if (options.getHCatTableName() != null) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean isDirectModeHCatSupported() {
     return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
index d0be570..53de9c9 100644
--- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
@@ -315,6 +315,16 @@ public class ExportJobBase extends JobBase {
 
     String tableName = outputTableName;
     boolean stagingEnabled = false;
+
+    // Check if there are runtime error checks to do
+    if (isHCatJob && options.isDirect()
+        && !context.getConnManager().isDirectModeHCatSupported()) {
+      throw new IOException("Direct import is not compatible with "
+        + "HCatalog operations using the connection manager "
+        + context.getConnManager().getClass().getName()
+        + ". Please remove the parameter --direct");
+    }
+
     if (stagingTableName != null) { // user has specified the staging table
       if (cmgr.supportsStagingForExport()) {
         LOG.info("Data will be staged in the table: " + stagingTableName);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index ab7f21e..36959e1 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -203,6 +203,14 @@ public class ImportJobBase extends JobBase {
    */
   public void runImport(String tableName, String ormJarFile, String splitByCol,
       Configuration conf) throws IOException, ImportException {
+    // Check if there are runtime error checks to do
+    if (isHCatJob && options.isDirect()
+        && !context.getConnManager().isDirectModeHCatSupported()) {
+      throw new IOException("Direct import is not compatible with "
+        + "HCatalog operations using the connection manager "
+        + context.getConnManager().getClass().getName()
+        + ". Please remove the parameter --direct");
+    }
 
     if (null != tableName) {
       LOG.info("Beginning import of " + tableName);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
index 22b7af5..3613ff2 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
@@ -45,7 +45,7 @@ import com.cloudera.sqoop.lib.DelimiterSet;
  * Netezza export mapper using external tables.
  */
 public abstract class NetezzaExternalTableExportMapper<K, V> extends
-    SqoopMapper<K, V, NullWritable, NullWritable> {
+  SqoopMapper<K, V, NullWritable, NullWritable> {
   /**
    * Create a named FIFO, and start the Netezza JDBC thread connected to that
    * FIFO. A File object representing the FIFO is in 'fifoFile'.
@@ -57,27 +57,27 @@ public abstract class NetezzaExternalTableExportMapper<K, V> extends
   private Connection con;
   private OutputStream recordWriter;
   public static final Log LOG = LogFactory
-      .getLog(NetezzaExternalTableImportMapper.class.getName());
+    .getLog(NetezzaExternalTableImportMapper.class.getName());
   private NetezzaJDBCStatementRunner extTableThread;
   private PerfCounters counter;
   private DelimiterSet outputDelimiters;
 
-  private String getSqlStatement() throws IOException {
+  private String getSqlStatement(DelimiterSet delimiters) throws IOException {
 
-    char fd = (char) conf.getInt(DelimiterSet.INPUT_FIELD_DELIM_KEY, ',');
-    char qc = (char) conf.getInt(DelimiterSet.INPUT_ENCLOSED_BY_KEY, 0);
-    char ec = (char) conf.getInt(DelimiterSet.INPUT_ESCAPED_BY_KEY, 0);
+    char fd = delimiters.getFieldsTerminatedBy();
+    char qc = delimiters.getEnclosedBy();
+    char ec = delimiters.getEscapedBy();
 
     String nullValue = conf.get(DirectNetezzaManager.NETEZZA_NULL_VALUE);
 
     int errorThreshold = conf.getInt(
-        DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
+      DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
     String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
 
     StringBuilder sqlStmt = new StringBuilder(2048);
 
     sqlStmt.append("INSERT INTO ");
-    sqlStmt.append(dbc.getInputTableName());
+    sqlStmt.append(dbc.getOutputTableName());
     sqlStmt.append(" SELECT * FROM EXTERNAL '");
     sqlStmt.append(fifoFile.getAbsolutePath());
     sqlStmt.append("' USING (REMOTESOURCE 'JDBC' ");
@@ -133,11 +133,16 @@ public abstract class NetezzaExternalTableExportMapper<K, V> extends
   }
 
   private void initNetezzaExternalTableExport(Context context)
-      throws IOException {
+    throws IOException {
     this.conf = context.getConfiguration();
     dbc = new DBConfiguration(conf);
     File taskAttemptDir = TaskId.getLocalWorkPath(conf);
-    this.outputDelimiters = new DelimiterSet(',', '\n', '\000', '\\', false);
+
+    char fd = (char) conf.getInt(DelimiterSet.INPUT_FIELD_DELIM_KEY, ',');
+    char qc = (char) conf.getInt(DelimiterSet.INPUT_ENCLOSED_BY_KEY, 0);
+    char ec = (char) conf.getInt(DelimiterSet.INPUT_ESCAPED_BY_KEY, 0);
+
+    this.outputDelimiters = new DelimiterSet(fd, '\n', qc, ec, false);
     this.fifoFile = new File(taskAttemptDir, ("nzexttable-export.txt"));
     String filename = fifoFile.toString();
     NamedFifo nf;
@@ -150,14 +155,14 @@ public abstract class NetezzaExternalTableExportMapper<K, V> extends
       LOG.error("Could not create FIFO file " + filename);
       this.fifoFile = null;
       throw new IOException(
-          "Could not create FIFO for netezza external table import", ioe);
+        "Could not create FIFO for netezza external table import", ioe);
     }
-    String sqlStmt = getSqlStatement();
+    String sqlStmt = getSqlStatement(outputDelimiters);
     boolean cleanup = false;
     try {
       con = dbc.getConnection();
       extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(),
-          con, sqlStmt);
+        con, sqlStmt);
     } catch (SQLException sqle) {
       cleanup = true;
       throw new IOException(sqle);
@@ -204,14 +209,14 @@ public abstract class NetezzaExternalTableExportMapper<K, V> extends
         LOG.info("Transferred " + counter.toString());
         if (extTableThread.hasExceptions()) {
           extTableThread.printException();
-          throw new IOException(extTableThread.getExcepton());
+          throw new IOException(extTableThread.getException());
         }
       }
     }
   }
 
   protected void writeTextRecord(Text record) throws IOException,
-      InterruptedException {
+    InterruptedException {
     String outputStr = record.toString() + "\n";
     byte[] outputBytes = outputStr.getBytes("UTF-8");
     counter.addBytes(outputBytes.length);
@@ -219,7 +224,7 @@ public abstract class NetezzaExternalTableExportMapper<K, V> extends
   }
 
   protected void writeSqoopRecord(SqoopRecord sqr) throws IOException,
-      InterruptedException {
+    InterruptedException {
     String outputStr = sqr.toString(this.outputDelimiters);
     byte[] outputBytes = outputStr.getBytes("UTF-8");
     counter.addBytes(outputBytes.length);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableHCatExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableHCatExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableHCatExportMapper.java
new file mode 100644
index 0000000..a139090
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableHCatExportMapper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatExportHelper;
+
+/**
+ * Netezza export mapper using external tables for HCat integration.
+ */
+public class NetezzaExternalTableHCatExportMapper extends
+  NetezzaExternalTableExportMapper<LongWritable, HCatRecord> {
+  private SqoopHCatExportHelper helper;
+  public static final Log LOG = LogFactory
+    .getLog(NetezzaExternalTableHCatExportMapper.class.getName());
+
+  @Override
+  protected void setup(Context context)
+    throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration conf = context.getConfiguration();
+    helper = new SqoopHCatExportHelper(conf);
+    // Force escaped by
+    conf.setInt(DelimiterSet.OUTPUT_ESCAPED_BY_KEY, '\'');
+
+  }
+
+  @Override
+  public void map(LongWritable key, HCatRecord hcr, Context context)
+    throws IOException, InterruptedException {
+    SqoopRecord sqr = helper.convertToSqoopRecord(hcr);
+    writeSqoopRecord(sqr);
+    context.progress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableHCatImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableHCatImportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableHCatImportMapper.java
new file mode 100644
index 0000000..6f163e9
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableHCatImportMapper.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.sqoop.config.ConfigurationHelper;
+import org.apache.sqoop.lib.RecordParser;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatImportHelper;
+
+/**
+ * Netezza import mapper using external tables for HCat integration.
+ */
+public class NetezzaExternalTableHCatImportMapper
+  extends NetezzaExternalTableImportMapper<NullWritable, HCatRecord> {
+  private SqoopHCatImportHelper helper;
+  private SqoopRecord sqoopRecord;
+
+  @Override
+  protected void setup(Context context)
+    throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    helper = new SqoopHCatImportHelper(conf);
+    String recordClassName = conf.get(ConfigurationHelper
+      .getDbInputClassProperty());
+    if (null == recordClassName) {
+      throw new IOException("DB Input class name is not set!");
+    }
+    try {
+      Class<?> cls = Class.forName(recordClassName, true,
+        Thread.currentThread().getContextClassLoader());
+      sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    }
+
+    if (null == sqoopRecord) {
+      throw new IOException("Could not instantiate object of type "
+        + recordClassName);
+    }
+  }
+
+  @Override
+  protected void writeRecord(Text text, Context context)
+    throws IOException, InterruptedException {
+    try {
+      sqoopRecord.parse(text);
+      context.write(NullWritable.get(),
+        helper.convertToHCatRecord(sqoopRecord));
+    } catch (RecordParser.ParseError pe) {
+      throw new IOException("Exception parsing netezza import record", pe);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
index bcdc9e1..2f4c152 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
@@ -35,16 +35,17 @@ import org.apache.sqoop.config.ConfigurationHelper;
 import org.apache.sqoop.io.NamedFifo;
 import org.apache.sqoop.lib.DelimiterSet;
 import org.apache.sqoop.manager.DirectNetezzaManager;
-import org.apache.sqoop.mapreduce.SqoopMapper;
 import org.apache.sqoop.mapreduce.db.DBConfiguration;
 import org.apache.sqoop.util.PerfCounters;
 import org.apache.sqoop.util.TaskId;
 
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
 /**
  * Netezza import mapper using external tables.
  */
-public class NetezzaExternalTableImportMapper extends
-    SqoopMapper<Integer, NullWritable, Text, NullWritable> {
+public abstract class NetezzaExternalTableImportMapper<K, V> extends
+  AutoProgressMapper<Integer, NullWritable, K, V> {
   /**
    * Create a named FIFO, and start Netezza import connected to that FIFO. A
    * File object representing the FIFO is in 'fifoFile'.
@@ -57,7 +58,7 @@ public class NetezzaExternalTableImportMapper extends
   private Connection con;
   private BufferedReader recordReader;
   public static final Log LOG = LogFactory
-      .getLog(NetezzaExternalTableImportMapper.class.getName());
+    .getLog(NetezzaExternalTableImportMapper.class.getName());
   private NetezzaJDBCStatementRunner extTableThread;
   private PerfCounters counter;
 
@@ -70,7 +71,7 @@ public class NetezzaExternalTableImportMapper extends
     String nullValue = conf.get(DirectNetezzaManager.NETEZZA_NULL_VALUE);
 
     int errorThreshold = conf.getInt(
-        DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
+      DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
     String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
     String[] cols = dbc.getOutputFieldNames();
     String inputConds = dbc.getInputConditions();
@@ -142,7 +143,7 @@ public class NetezzaExternalTableImportMapper extends
 
     String stmt = sqlStmt.toString();
     LOG.debug("SQL generated for external table import for data slice " + myId
-        + "=" + stmt);
+      + "=" + stmt);
     return stmt;
   }
 
@@ -162,14 +163,14 @@ public class NetezzaExternalTableImportMapper extends
       LOG.error("Could not create FIFO file " + filename);
       this.fifoFile = null;
       throw new IOException(
-          "Could not create FIFO for netezza external table import", ioe);
+        "Could not create FIFO for netezza external table import", ioe);
     }
     String sqlStmt = getSqlStatement(myId);
     boolean cleanup = false;
     try {
       con = dbc.getConnection();
       extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(),
-          con, sqlStmt);
+        con, sqlStmt);
     } catch (SQLException sqle) {
       cleanup = true;
       throw new IOException(sqle);
@@ -187,12 +188,16 @@ public class NetezzaExternalTableImportMapper extends
     }
     extTableThread.start();
     // We need to start the reader end first
+
     recordReader = new BufferedReader(new InputStreamReader(
-        new FileInputStream(nf.getFile())));
+      new FileInputStream(nf.getFile())));
   }
 
+  abstract protected void writeRecord(Text text, Context context)
+    throws IOException, InterruptedException;
+
   public void map(Integer dataSliceId, NullWritable val, Context context)
-      throws IOException, InterruptedException {
+    throws IOException, InterruptedException {
     conf = context.getConfiguration();
     dbc = new DBConfiguration(conf);
     numMappers = ConfigurationHelper.getConfNumMaps(conf);
@@ -214,7 +219,7 @@ public class NetezzaExternalTableImportMapper extends
           // May be we should set the output to be String for faster performance
           // There is no real benefit in changing it to Text and then
           // converting it back in our case
-          context.write(outputRecord, NullWritable.get());
+          writeRecord(outputRecord, context);
           counter.addBytes(1 + inputRecord.length());
           inputRecord = recordReader.readLine();
         }
@@ -225,7 +230,7 @@ public class NetezzaExternalTableImportMapper extends
         LOG.info("Transferred " + counter.toString());
         if (extTableThread.hasExceptions()) {
           extTableThread.printException();
-          throw new IOException(extTableThread.getExcepton());
+          throw new IOException(extTableThread.getException());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableTextImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableTextImportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableTextImportMapper.java
new file mode 100644
index 0000000..acc4a2a
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableTextImportMapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatImportHelper;
+
+/**
+ * Netezza import mapper using external tables for text formats.
+ */
+public class NetezzaExternalTableTextImportMapper
+  extends NetezzaExternalTableImportMapper<Text, NullWritable> {
+
+  @Override
+  protected void writeRecord(Text text, Context context)
+    throws IOException, InterruptedException {
+    // May be we should set the output to be String for faster performance
+    // There is no real benefit in changing it to Text and then
+    // converting it back in our case
+    context.write(text, NullWritable.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
index 3a5df40..cedfd23 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
@@ -51,7 +51,7 @@ public class NetezzaJDBCStatementRunner extends Thread {
     }
   }
 
-  public Throwable getExcepton() {
+  public Throwable getException() {
     if (!hasExceptions()) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportHelper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportHelper.java
new file mode 100644
index 0000000..e48f6d6
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportHelper.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.ExportJobBase;
+
+/**
+ * Helper class for Sqoop HCat Integration export jobs.
+ */
+public class SqoopHCatExportHelper {
+  private InputJobInfo jobInfo;
+  private HCatSchema hCatFullTableSchema;
+  public static final Log LOG = LogFactory
+    .getLog(SqoopHCatExportHelper.class.getName());
+  private SqoopRecord sqoopRecord;
+  private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
+  private static final String TIME_TYPE = "java.sql.Time";
+  private static final String DATE_TYPE = "java.sql.Date";
+  private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
+  private static final String FLOAT_TYPE = "Float";
+  private static final String DOUBLE_TYPE = "Double";
+  private static final String BYTE_TYPE = "Byte";
+  private static final String SHORT_TYPE = "Short";
+  private static final String INTEGER_TYPE = "Integer";
+  private static final String LONG_TYPE = "Long";
+  private static final String BOOLEAN_TYPE = "Boolean";
+  private static final String STRING_TYPE = "String";
+  private static final String BYTESWRITABLE =
+    "org.apache.hadoop.io.BytesWritable";
+  private static boolean debugHCatExportMapper = false;
+  private MapWritable colTypesJava;
+  private MapWritable colTypesSql;
+
+  public SqoopHCatExportHelper(Configuration conf)
+    throws IOException, InterruptedException {
+
+    colTypesJava = DefaultStringifier.load(conf,
+      SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_JAVA, MapWritable.class);
+    colTypesSql = DefaultStringifier.load(conf,
+      SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_SQL, MapWritable.class);
+    // Instantiate a copy of the user's class to hold and parse the record.
+
+    String recordClassName = conf.get(
+      ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
+    if (null == recordClassName) {
+      throw new IOException("Export table class name ("
+        + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+        + ") is not set!");
+    }
+    debugHCatExportMapper = conf.getBoolean(
+      SqoopHCatUtilities.DEBUG_HCAT_EXPORT_MAPPER_PROP, false);
+    try {
+      Class<?> cls = Class.forName(recordClassName, true,
+        Thread.currentThread().getContextClassLoader());
+      sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    }
+
+    if (null == sqoopRecord) {
+      throw new IOException("Could not instantiate object of type "
+        + recordClassName);
+    }
+
+    String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+    jobInfo =
+      (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
+    HCatSchema tableSchema = jobInfo.getTableInfo().getDataColumns();
+    HCatSchema partitionSchema =
+      jobInfo.getTableInfo().getPartitionColumns();
+    hCatFullTableSchema = new HCatSchema(tableSchema.getFields());
+    for (HCatFieldSchema hfs : partitionSchema.getFields()) {
+      hCatFullTableSchema.append(hfs);
+    }
+  }
+
+  public SqoopRecord convertToSqoopRecord(HCatRecord hcr)
+    throws IOException {
+    Text key = new Text();
+    for (Map.Entry<String, Object> e : sqoopRecord.getFieldMap().entrySet()) {
+      String colName = e.getKey();
+      String hfn = colName.toLowerCase();
+      key.set(hfn);
+      String javaColType = colTypesJava.get(key).toString();
+      int sqlType = ((IntWritable) colTypesSql.get(key)).get();
+      HCatFieldSchema field =
+        hCatFullTableSchema.get(hfn);
+      HCatFieldSchema.Type fieldType = field.getType();
+      Object hCatVal =
+        hcr.get(hfn, hCatFullTableSchema);
+      String hCatTypeString = field.getTypeString();
+      Object sqlVal = convertToSqoop(hCatVal, fieldType,
+        javaColType, hCatTypeString);
+      if (debugHCatExportMapper) {
+        LOG.debug("hCatVal " + hCatVal + " of type "
+          + (hCatVal == null ? null : hCatVal.getClass().getName())
+          + ",sqlVal " + sqlVal + " of type "
+          + (sqlVal == null ? null : sqlVal.getClass().getName())
+          + ",java type " + javaColType + ", sql type = "
+          + SqoopHCatUtilities.sqlTypeString(sqlType));
+      }
+      sqoopRecord.setField(colName, sqlVal);
+    }
+    return sqoopRecord;
+  }
+
+  private Object convertToSqoop(Object val,
+    HCatFieldSchema.Type fieldType, String javaColType,
+    String hCatTypeString) throws IOException {
+
+    if (val == null) {
+      return null;
+    }
+
+    switch (fieldType) {
+      case INT:
+      case TINYINT:
+      case SMALLINT:
+      case FLOAT:
+      case DOUBLE:
+        val = convertNumberTypes(val, javaColType);
+        if (val != null) {
+          return val;
+        }
+        break;
+      case BOOLEAN:
+        val = convertBooleanTypes(val, javaColType);
+        if (val != null) {
+          return val;
+        }
+        break;
+      case BIGINT:
+        if (javaColType.equals(DATE_TYPE)) {
+          return new Date((Long) val);
+        } else if (javaColType.equals(TIME_TYPE)) {
+          return new Time((Long) val);
+        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+          return new Timestamp((Long) val);
+        } else {
+          val = convertNumberTypes(val, javaColType);
+          if (val != null) {
+            return val;
+          }
+        }
+        break;
+      case STRING:
+        val = convertStringTypes(val, javaColType);
+        if (val != null) {
+          return val;
+        }
+        break;
+      case BINARY:
+        val = convertBinaryTypes(val, javaColType);
+        if (val != null) {
+          return val;
+        }
+        break;
+      case ARRAY:
+      case MAP:
+      case STRUCT:
+      default:
+        throw new IOException("Cannot convert HCatalog type "
+          + fieldType);
+    }
+    LOG.error("Cannot convert HCatalog object of "
+      + " type " + hCatTypeString + " to java object type "
+      + javaColType);
+    return null;
+  }
+
+  private Object convertBinaryTypes(Object val, String javaColType) {
+    byte[] bb = (byte[]) val;
+    if (javaColType.equals(BYTESWRITABLE)) {
+      BytesWritable bw = new BytesWritable();
+      bw.set(bb, 0, bb.length);
+      return bw;
+    }
+    return null;
+  }
+
+  private Object convertStringTypes(Object val, String javaColType) {
+    String valStr = val.toString();
+    if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+      return new BigDecimal(valStr);
+    } else if (javaColType.equals(DATE_TYPE)
+      || javaColType.equals(TIME_TYPE)
+      || javaColType.equals(TIMESTAMP_TYPE)) {
+      // Oracle expects timestamps for Date also by default based on version
+      // Just allow all date types to be assignment compatible
+      if (valStr.length() == 10) { // Date in yyyy-mm-dd format
+        Date d = Date.valueOf(valStr);
+        if (javaColType.equals(DATE_TYPE)) {
+          return d;
+        } else if (javaColType.equals(TIME_TYPE)) {
+          return new Time(d.getTime());
+        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+          return new Timestamp(d.getTime());
+        }
+      } else if (valStr.length() == 8) { // time in hh:mm:ss
+        Time t = Time.valueOf(valStr);
+        if (javaColType.equals(DATE_TYPE)) {
+          return new Date(t.getTime());
+        } else if (javaColType.equals(TIME_TYPE)) {
+          return t;
+        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+          return new Timestamp(t.getTime());
+        }
+      } else if (valStr.length() == 19) { // timestamp in yyyy-mm-dd hh:ss:mm
+        Timestamp ts = Timestamp.valueOf(valStr);
+        if (javaColType.equals(DATE_TYPE)) {
+          return new Date(ts.getTime());
+        } else if (javaColType.equals(TIME_TYPE)) {
+          return new Time(ts.getTime());
+        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+          return ts;
+        }
+      } else {
+        return null;
+      }
+    } else if (javaColType.equals(STRING_TYPE)) {
+      return valStr;
+    } else if (javaColType.equals(BOOLEAN_TYPE)) {
+      return Boolean.valueOf(valStr);
+    } else if (javaColType.equals(BYTE_TYPE)) {
+      return Byte.parseByte(valStr);
+    } else if (javaColType.equals(SHORT_TYPE)) {
+      return Short.parseShort(valStr);
+    } else if (javaColType.equals(INTEGER_TYPE)) {
+      return Integer.parseInt(valStr);
+    } else if (javaColType.equals(LONG_TYPE)) {
+      return Long.parseLong(valStr);
+    } else if (javaColType.equals(FLOAT_TYPE)) {
+      return Float.parseFloat(valStr);
+    } else if (javaColType.equals(DOUBLE_TYPE)) {
+      return Double.parseDouble(valStr);
+    }
+    return null;
+  }
+
+  private Object convertBooleanTypes(Object val, String javaColType) {
+    Boolean b = (Boolean) val;
+    if (javaColType.equals(BOOLEAN_TYPE)) {
+      return b;
+    } else if (javaColType.equals(BYTE_TYPE)) {
+      return (byte) (b ? 1 : 0);
+    } else if (javaColType.equals(SHORT_TYPE)) {
+      return (short) (b ? 1 : 0);
+    } else if (javaColType.equals(INTEGER_TYPE)) {
+      return (int) (b ? 1 : 0);
+    } else if (javaColType.equals(LONG_TYPE)) {
+      return (long) (b ? 1 : 0);
+    } else if (javaColType.equals(FLOAT_TYPE)) {
+      return (float) (b ? 1 : 0);
+    } else if (javaColType.equals(DOUBLE_TYPE)) {
+      return (double) (b ? 1 : 0);
+    } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+      return new BigDecimal(b ? 1 : 0);
+    } else if (javaColType.equals(STRING_TYPE)) {
+      return val.toString();
+    }
+    return null;
+  }
+
+  private Object convertNumberTypes(Object val, String javaColType) {
+    Number n = (Number) val;
+    if (javaColType.equals(BYTE_TYPE)) {
+      return n.byteValue();
+    } else if (javaColType.equals(SHORT_TYPE)) {
+      return n.shortValue();
+    } else if (javaColType.equals(INTEGER_TYPE)) {
+      return n.intValue();
+    } else if (javaColType.equals(LONG_TYPE)) {
+      return n.longValue();
+    } else if (javaColType.equals(FLOAT_TYPE)) {
+      return n.floatValue();
+    } else if (javaColType.equals(DOUBLE_TYPE)) {
+      return n.doubleValue();
+    } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+      return new BigDecimal(n.doubleValue());
+    } else if (javaColType.equals(BOOLEAN_TYPE)) {
+      return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
+    } else if (javaColType.equals(STRING_TYPE)) {
+      return n.toString();
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
index 539cedf..c7e9b8e 100644
--- a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
@@ -19,33 +19,15 @@
 package org.apache.sqoop.mapreduce.hcat;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DefaultStringifier;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
 import org.apache.sqoop.lib.SqoopRecord;
 import org.apache.sqoop.mapreduce.AutoProgressMapper;
-import org.apache.sqoop.mapreduce.ExportJobBase;
 
 /**
  * A mapper that works on combined hcat splits.
@@ -56,28 +38,7 @@ public class SqoopHCatExportMapper
   SqoopRecord, WritableComparable> {
   public static final Log LOG = LogFactory
     .getLog(SqoopHCatExportMapper.class.getName());
-  private InputJobInfo jobInfo;
-  private HCatSchema hCatFullTableSchema;
-  private List<HCatFieldSchema> hCatSchemaFields;
-
-  private SqoopRecord sqoopRecord;
-  private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
-  private static final String TIME_TYPE = "java.sql.Time";
-  private static final String DATE_TYPE = "java.sql.Date";
-  private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
-  private static final String FLOAT_TYPE = "Float";
-  private static final String DOUBLE_TYPE = "Double";
-  private static final String BYTE_TYPE = "Byte";
-  private static final String SHORT_TYPE = "Short";
-  private static final String INTEGER_TYPE = "Integer";
-  private static final String LONG_TYPE = "Long";
-  private static final String BOOLEAN_TYPE = "Boolean";
-  private static final String STRING_TYPE = "String";
-  private static final String BYTESWRITABLE =
-    "org.apache.hadoop.io.BytesWritable";
-  private static boolean debugHCatExportMapper = false;
-  private MapWritable colTypesJava;
-  private MapWritable colTypesSql;
+  private SqoopHCatExportHelper helper;
 
   @Override
   protected void setup(Context context)
@@ -85,265 +46,14 @@ public class SqoopHCatExportMapper
     super.setup(context);
 
     Configuration conf = context.getConfiguration();
-
-    colTypesJava = DefaultStringifier.load(conf,
-      SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_JAVA, MapWritable.class);
-    colTypesSql = DefaultStringifier.load(conf,
-      SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_SQL, MapWritable.class);
-    // Instantiate a copy of the user's class to hold and parse the record.
-
-    String recordClassName = conf.get(
-      ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
-    if (null == recordClassName) {
-      throw new IOException("Export table class name ("
-        + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
-        + ") is not set!");
-    }
-    debugHCatExportMapper = conf.getBoolean(
-      SqoopHCatUtilities.DEBUG_HCAT_EXPORT_MAPPER_PROP, false);
-    try {
-      Class cls = Class.forName(recordClassName, true,
-        Thread.currentThread().getContextClassLoader());
-      sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException(cnfe);
-    }
-
-    if (null == sqoopRecord) {
-      throw new IOException("Could not instantiate object of type "
-        + recordClassName);
-    }
-
-    String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
-    jobInfo =
-      (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
-    HCatSchema tableSchema = jobInfo.getTableInfo().getDataColumns();
-    HCatSchema partitionSchema =
-      jobInfo.getTableInfo().getPartitionColumns();
-    hCatFullTableSchema = new HCatSchema(tableSchema.getFields());
-    for (HCatFieldSchema hfs : partitionSchema.getFields()) {
-      hCatFullTableSchema.append(hfs);
-    }
-    hCatSchemaFields = hCatFullTableSchema.getFields();
-
+    helper = new SqoopHCatExportHelper(conf);
   }
 
   @Override
   public void map(WritableComparable key, HCatRecord value,
     Context context)
     throws IOException, InterruptedException {
-    context.write(convertToSqoopRecord(value), NullWritable.get());
-  }
-
-  private SqoopRecord convertToSqoopRecord(HCatRecord hcr)
-    throws IOException {
-    Text key = new Text();
-    for (Map.Entry<String, Object> e : sqoopRecord.getFieldMap().entrySet()) {
-      String colName = e.getKey();
-      String hfn = colName.toLowerCase();
-      key.set(hfn);
-      String javaColType = colTypesJava.get(key).toString();
-      int sqlType = ((IntWritable) colTypesSql.get(key)).get();
-      HCatFieldSchema field =
-        hCatFullTableSchema.get(hfn);
-      HCatFieldSchema.Type fieldType = field.getType();
-      Object hCatVal =
-        hcr.get(hfn, hCatFullTableSchema);
-      String hCatTypeString = field.getTypeString();
-      Object sqlVal = convertToSqoop(hCatVal, fieldType,
-        javaColType, hCatTypeString);
-      if (debugHCatExportMapper) {
-        LOG.debug("hCatVal " + hCatVal + " of type "
-          + (hCatVal == null ? null : hCatVal.getClass().getName())
-          + ",sqlVal " + sqlVal + " of type "
-          + (sqlVal == null ? null : sqlVal.getClass().getName())
-          + ",java type " + javaColType + ", sql type = "
-          + SqoopHCatUtilities.sqlTypeString(sqlType));
-      }
-      sqoopRecord.setField(colName, sqlVal);
-    }
-    return sqoopRecord;
-  }
-
-  private Object convertToSqoop(Object val,
-    HCatFieldSchema.Type fieldType, String javaColType,
-    String hCatTypeString) throws IOException {
-
-    if (val == null) {
-      return null;
-    }
-
-    switch (fieldType) {
-      case INT:
-      case TINYINT:
-      case SMALLINT:
-      case FLOAT:
-      case DOUBLE:
-        val = convertNumberTypes(val, javaColType);
-        if (val != null) {
-          return val;
-        }
-        break;
-      case BOOLEAN:
-        val = convertBooleanTypes(val, javaColType);
-        if (val != null) {
-          return val;
-        }
-        break;
-      case BIGINT:
-        if (javaColType.equals(DATE_TYPE)) {
-          return new Date((Long) val);
-        } else if (javaColType.equals(TIME_TYPE)) {
-          return new Time((Long) val);
-        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
-          return new Timestamp((Long) val);
-        } else {
-          val = convertNumberTypes(val, javaColType);
-          if (val != null) {
-            return val;
-          }
-        }
-        break;
-      case STRING:
-        val = convertStringTypes(val, javaColType);
-        if (val != null) {
-          return val;
-        }
-        break;
-      case BINARY:
-        val = convertBinaryTypes(val, javaColType);
-        if (val != null) {
-          return val;
-        }
-        break;
-      case ARRAY:
-      case MAP:
-      case STRUCT:
-      default:
-        throw new IOException("Cannot convert HCatalog type "
-          + fieldType);
-    }
-    LOG.error("Cannot convert HCatalog object of "
-      + " type " + hCatTypeString + " to java object type "
-      + javaColType);
-    return null;
-  }
-
-  private Object convertBinaryTypes(Object val, String javaColType) {
-    byte[] bb = (byte[]) val;
-    if (javaColType.equals(BYTESWRITABLE)) {
-      BytesWritable bw = new BytesWritable();
-      bw.set(bb, 0, bb.length);
-      return bw;
-    }
-    return null;
-  }
-
-  private Object convertStringTypes(Object val, String javaColType) {
-    String valStr = val.toString();
-    if (javaColType.equals(BIG_DECIMAL_TYPE)) {
-      return new BigDecimal(valStr);
-    } else if (javaColType.equals(DATE_TYPE)
-      || javaColType.equals(TIME_TYPE)
-      || javaColType.equals(TIMESTAMP_TYPE)) {
-      // Oracle expects timestamps for Date also by default based on version
-      // Just allow all date types to be assignment compatible
-      if (valStr.length() == 10) { // Date in yyyy-mm-dd format
-        Date d = Date.valueOf(valStr);
-        if (javaColType.equals(DATE_TYPE)) {
-          return d;
-        } else if (javaColType.equals(TIME_TYPE)) {
-          return new Time(d.getTime());
-        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
-          return new Timestamp(d.getTime());
-        }
-      } else if (valStr.length() == 8) { // time in hh:mm:ss
-        Time t = Time.valueOf(valStr);
-        if (javaColType.equals(DATE_TYPE)) {
-          return new Date(t.getTime());
-        } else if (javaColType.equals(TIME_TYPE)) {
-          return t;
-        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
-          return new Timestamp(t.getTime());
-        }
-      } else if (valStr.length() == 19) { // timestamp in yyyy-mm-dd hh:ss:mm
-        Timestamp ts = Timestamp.valueOf(valStr);
-        if (javaColType.equals(DATE_TYPE)) {
-          return new Date(ts.getTime());
-        } else if (javaColType.equals(TIME_TYPE)) {
-          return new Time(ts.getTime());
-        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
-          return ts;
-        }
-      } else {
-        return null;
-      }
-    } else if (javaColType.equals(STRING_TYPE)) {
-      return valStr;
-    } else if (javaColType.equals(BOOLEAN_TYPE)) {
-      return Boolean.valueOf(valStr);
-    } else if (javaColType.equals(BYTE_TYPE)) {
-      return Byte.parseByte(valStr);
-    } else if (javaColType.equals(SHORT_TYPE)) {
-      return Short.parseShort(valStr);
-    } else if (javaColType.equals(INTEGER_TYPE)) {
-      return Integer.parseInt(valStr);
-    } else if (javaColType.equals(LONG_TYPE)) {
-      return Long.parseLong(valStr);
-    } else if (javaColType.equals(FLOAT_TYPE)) {
-      return Float.parseFloat(valStr);
-    } else if (javaColType.equals(DOUBLE_TYPE)) {
-      return Double.parseDouble(valStr);
-    }
-    return null;
-  }
-
-  private Object convertBooleanTypes(Object val, String javaColType) {
-    Boolean b = (Boolean) val;
-    if (javaColType.equals(BOOLEAN_TYPE)) {
-      return b;
-    } else if (javaColType.equals(BYTE_TYPE)) {
-      return (byte) (b ? 1 : 0);
-    } else if (javaColType.equals(SHORT_TYPE)) {
-      return (short) (b ? 1 : 0);
-    } else if (javaColType.equals(INTEGER_TYPE)) {
-      return (int) (b ? 1 : 0);
-    } else if (javaColType.equals(LONG_TYPE)) {
-      return (long) (b ? 1 : 0);
-    } else if (javaColType.equals(FLOAT_TYPE)) {
-      return (float) (b ? 1 : 0);
-    } else if (javaColType.equals(DOUBLE_TYPE)) {
-      return (double) (b ? 1 : 0);
-    } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
-      return new BigDecimal(b ? 1 : 0);
-    } else if (javaColType.equals(STRING_TYPE)) {
-      return val.toString();
-    }
-    return null;
-  }
-
-  private Object convertNumberTypes(Object val, String javaColType) {
-    Number n = (Number) val;
-    if (javaColType.equals(BYTE_TYPE)) {
-      return n.byteValue();
-    } else if (javaColType.equals(SHORT_TYPE)) {
-      return n.shortValue();
-    } else if (javaColType.equals(INTEGER_TYPE)) {
-      return n.intValue();
-    } else if (javaColType.equals(LONG_TYPE)) {
-      return n.longValue();
-    } else if (javaColType.equals(FLOAT_TYPE)) {
-      return n.floatValue();
-    } else if (javaColType.equals(DOUBLE_TYPE)) {
-      return n.doubleValue();
-    } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
-      return new BigDecimal(n.doubleValue());
-    } else if (javaColType.equals(BOOLEAN_TYPE)) {
-      return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
-    } else if (javaColType.equals(STRING_TYPE)) {
-      return n.toString();
-    }
-    return null;
+    context.write(helper.convertToSqoopRecord(value), NullWritable.get());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportHelper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportHelper.java
new file mode 100644
index 0000000..e9606ad
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportHelper.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.ImportJobBase;
+
+import com.cloudera.sqoop.lib.BlobRef;
+import com.cloudera.sqoop.lib.ClobRef;
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.lib.FieldFormatter;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+
+/**
+ * Helper class for Sqoop HCat Integration import jobs.
+ */
+public class SqoopHCatImportHelper {
+  public static final Log LOG = LogFactory.getLog(SqoopHCatImportHelper.class
+    .getName());
+
+  private static boolean debugHCatImportMapper = false;
+
+  private InputJobInfo jobInfo;
+  private HCatSchema hCatFullTableSchema;
+  private int fieldCount;
+  private boolean bigDecimalFormatString;
+  private LargeObjectLoader lobLoader;
+  private HCatSchema partitionSchema = null;
+  private HCatSchema dataColsSchema = null;
+  private String hiveDelimsReplacement;
+  private boolean doHiveDelimsReplacement = false;
+  private DelimiterSet hiveDelimiters;
+  private String staticPartitionKey;
+  private int[] hCatFieldPositions;
+  private int colCount;
+
+  public SqoopHCatImportHelper(Configuration conf) throws IOException,
+    InterruptedException {
+
+    String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+    jobInfo = (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
+    dataColsSchema = jobInfo.getTableInfo().getDataColumns();
+    partitionSchema = jobInfo.getTableInfo().getPartitionColumns();
+    StringBuilder storerInfoStr = new StringBuilder(1024);
+    StorerInfo storerInfo = jobInfo.getTableInfo().getStorerInfo();
+    storerInfoStr.append("HCatalog Storer Info : ").append("\n\tHandler = ")
+      .append(storerInfo.getStorageHandlerClass())
+      .append("\n\tInput format class = ").append(storerInfo.getIfClass())
+      .append("\n\tOutput format class = ").append(storerInfo.getOfClass())
+      .append("\n\tSerde class = ").append(storerInfo.getSerdeClass());
+    Properties storerProperties = storerInfo.getProperties();
+    if (!storerProperties.isEmpty()) {
+      storerInfoStr.append("\nStorer properties ");
+      for (Map.Entry<Object, Object> entry : storerProperties.entrySet()) {
+        String key = (String) entry.getKey();
+        Object val = entry.getValue();
+        storerInfoStr.append("\n\t").append(key).append('=').append(val);
+      }
+    }
+    storerInfoStr.append("\n");
+    LOG.info(storerInfoStr);
+
+    hCatFullTableSchema = new HCatSchema(dataColsSchema.getFields());
+    for (HCatFieldSchema hfs : partitionSchema.getFields()) {
+      hCatFullTableSchema.append(hfs);
+    }
+    fieldCount = hCatFullTableSchema.size();
+    lobLoader = new LargeObjectLoader(conf, new Path(jobInfo.getTableInfo()
+      .getTableLocation()));
+    bigDecimalFormatString = conf.getBoolean(
+      ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
+      ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+    debugHCatImportMapper = conf.getBoolean(
+      SqoopHCatUtilities.DEBUG_HCAT_IMPORT_MAPPER_PROP, false);
+    IntWritable[] delimChars = DefaultStringifier.loadArray(conf,
+      SqoopHCatUtilities.HIVE_DELIMITERS_TO_REPLACE_PROP, IntWritable.class);
+    hiveDelimiters = new DelimiterSet((char) delimChars[0].get(),
+      (char) delimChars[1].get(), (char) delimChars[2].get(),
+      (char) delimChars[3].get(), delimChars[4].get() == 1 ? true : false);
+    hiveDelimsReplacement = conf
+      .get(SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_PROP);
+    if (hiveDelimsReplacement == null) {
+      hiveDelimsReplacement = "";
+    }
+    doHiveDelimsReplacement = Boolean.valueOf(conf
+      .get(SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP));
+
+    IntWritable[] fPos = DefaultStringifier.loadArray(conf,
+      SqoopHCatUtilities.HCAT_FIELD_POSITIONS_PROP, IntWritable.class);
+    hCatFieldPositions = new int[fPos.length];
+    for (int i = 0; i < fPos.length; ++i) {
+      hCatFieldPositions[i] = fPos[i].get();
+    }
+
+    LOG.debug("Hive delims replacement enabled : " + doHiveDelimsReplacement);
+    LOG.debug("Hive Delimiters : " + hiveDelimiters.toString());
+    LOG.debug("Hive delimiters replacement : " + hiveDelimsReplacement);
+    staticPartitionKey = conf
+      .get(SqoopHCatUtilities.HCAT_STATIC_PARTITION_KEY_PROP);
+    LOG.debug("Static partition key used : " + staticPartitionKey);
+  }
+
+  public HCatRecord convertToHCatRecord(SqoopRecord sqr) throws IOException,
+    InterruptedException {
+    try {
+      // Loading of LOBs was delayed until we have a Context.
+      sqr.loadLargeObjects(lobLoader);
+    } catch (SQLException sqlE) {
+      throw new IOException(sqlE);
+    }
+    if (colCount == -1) {
+      colCount = sqr.getFieldMap().size();
+    }
+
+    Map<String, Object> fieldMap = sqr.getFieldMap();
+    HCatRecord result = new DefaultHCatRecord(fieldCount);
+
+    for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
+      String key = entry.getKey();
+      Object val = entry.getValue();
+      String hfn = key.toLowerCase();
+      if (staticPartitionKey != null && staticPartitionKey.equals(hfn)) {
+        continue;
+      }
+      HCatFieldSchema hfs = hCatFullTableSchema.get(hfn);
+      if (debugHCatImportMapper) {
+        LOG.debug("SqoopRecordVal: field = " + key + " Val " + val
+          + " of type " + (val == null ? null : val.getClass().getName())
+          + ", hcattype " + hfs.getTypeString());
+      }
+      Object hCatVal = toHCat(val, hfs.getType(), hfs.getTypeString());
+
+      result.set(hfn, hCatFullTableSchema, hCatVal);
+    }
+
+    return result;
+  }
+
+  private Object toHCat(Object val, HCatFieldSchema.Type hfsType,
+    String hCatTypeString) {
+
+    if (val == null) {
+      return null;
+    }
+
+    Object retVal = null;
+
+    if (val instanceof Number) {
+      retVal = convertNumberTypes(val, hfsType);
+    } else if (val instanceof Boolean) {
+      retVal = convertBooleanTypes(val, hfsType);
+    } else if (val instanceof String) {
+      if (hfsType == HCatFieldSchema.Type.STRING) {
+        String str = (String) val;
+        if (doHiveDelimsReplacement) {
+          retVal = FieldFormatter.hiveStringReplaceDelims(str,
+            hiveDelimsReplacement, hiveDelimiters);
+        } else {
+          retVal = str;
+        }
+      }
+    } else if (val instanceof java.util.Date) {
+      retVal = converDateTypes(val, hfsType);
+    } else if (val instanceof BytesWritable) {
+      if (hfsType == HCatFieldSchema.Type.BINARY) {
+        BytesWritable bw = (BytesWritable) val;
+        retVal = bw.getBytes();
+      }
+    } else if (val instanceof BlobRef) {
+      if (hfsType == HCatFieldSchema.Type.BINARY) {
+        BlobRef br = (BlobRef) val;
+        byte[] bytes = br.isExternal() ? br.toString().getBytes() : br
+          .getData();
+        retVal = bytes;
+      }
+    } else if (val instanceof ClobRef) {
+      if (hfsType == HCatFieldSchema.Type.STRING) {
+        ClobRef cr = (ClobRef) val;
+        String s = cr.isExternal() ? cr.toString() : cr.getData();
+        retVal = s;
+      }
+    } else {
+      throw new UnsupportedOperationException("Objects of type "
+        + val.getClass().getName() + " are not suported");
+    }
+    if (retVal == null) {
+      LOG.error("Objects of type " + val.getClass().getName()
+        + " can not be mapped to HCatalog type " + hCatTypeString);
+    }
+    return retVal;
+  }
+
+  private Object converDateTypes(Object val, HCatFieldSchema.Type hfsType) {
+    if (val instanceof java.sql.Date) {
+      if (hfsType == HCatFieldSchema.Type.BIGINT) {
+        return ((Date) val).getTime();
+      } else if (hfsType == HCatFieldSchema.Type.STRING) {
+        return val.toString();
+      }
+    } else if (val instanceof java.sql.Time) {
+      if (hfsType == HCatFieldSchema.Type.BIGINT) {
+        return ((Time) val).getTime();
+      } else if (hfsType == HCatFieldSchema.Type.STRING) {
+        return val.toString();
+      }
+    } else if (val instanceof java.sql.Timestamp) {
+      if (hfsType == HCatFieldSchema.Type.BIGINT) {
+        return ((Timestamp) val).getTime();
+      } else if (hfsType == HCatFieldSchema.Type.STRING) {
+        return val.toString();
+      }
+    }
+    return null;
+  }
+
+  private Object convertBooleanTypes(Object val, HCatFieldSchema.Type hfsType) {
+    Boolean b = (Boolean) val;
+    if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
+      return b;
+    } else if (hfsType == HCatFieldSchema.Type.TINYINT) {
+      return (byte) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
+      return (short) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.INT) {
+      return (int) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
+      return (long) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
+      return (float) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
+      return (double) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.STRING) {
+      return val.toString();
+    }
+    return null;
+  }
+
+  private Object convertNumberTypes(Object val, HCatFieldSchema.Type hfsType) {
+    if (!(val instanceof Number)) {
+      return null;
+    }
+    if (val instanceof BigDecimal && hfsType == HCatFieldSchema.Type.STRING) {
+      BigDecimal bd = (BigDecimal) val;
+      if (bigDecimalFormatString) {
+        return bd.toPlainString();
+      } else {
+        return bd.toString();
+      }
+    }
+    Number n = (Number) val;
+    if (hfsType == HCatFieldSchema.Type.TINYINT) {
+      return n.byteValue();
+    } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
+      return n.shortValue();
+    } else if (hfsType == HCatFieldSchema.Type.INT) {
+      return n.intValue();
+    } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
+      return n.longValue();
+    } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
+      return n.floatValue();
+    } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
+      return n.doubleValue();
+    } else if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
+      return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
+    } else if (hfsType == HCatFieldSchema.Type.STRING) {
+      return n.toString();
+    }
+    return null;
+  }
+
+  public void cleanup() throws IOException {
+    if (null != lobLoader) {
+      lobLoader.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
index 4f0ff1b..2d4830a 100644
--- a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
@@ -19,41 +19,15 @@
 package org.apache.sqoop.mapreduce.hcat;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Map;
-import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DefaultStringifier;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.DefaultHCatRecord;
 import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
-import org.apache.hcatalog.mapreduce.StorerInfo;
 import org.apache.sqoop.lib.SqoopRecord;
-import org.apache.sqoop.mapreduce.ImportJobBase;
 import org.apache.sqoop.mapreduce.SqoopMapper;
 
-import com.cloudera.sqoop.lib.BlobRef;
-import com.cloudera.sqoop.lib.ClobRef;
-import com.cloudera.sqoop.lib.DelimiterSet;
-import com.cloudera.sqoop.lib.FieldFormatter;
-import com.cloudera.sqoop.lib.LargeObjectLoader;
-
 /**
  * A mapper for HCatalog import.
  */
@@ -62,282 +36,25 @@ public class SqoopHCatImportMapper extends
   WritableComparable, HCatRecord> {
   public static final Log LOG = LogFactory
     .getLog(SqoopHCatImportMapper.class.getName());
-
-  private static boolean debugHCatImportMapper = false;
-
-  private InputJobInfo jobInfo;
-  private HCatSchema hCatFullTableSchema;
-  private int fieldCount;
-  private boolean bigDecimalFormatString;
-  private LargeObjectLoader lobLoader;
-  private HCatSchema partitionSchema = null;
-  private HCatSchema dataColsSchema = null;
-  private String stringDelimiterReplacements = null;
-  private ArrayWritable delimCharsArray;
-  private String hiveDelimsReplacement;
-  private boolean doHiveDelimsReplacement = false;
-  private DelimiterSet hiveDelimiters;
-  private String staticPartitionKey;
-  private int[] hCatFieldPositions;
-  private int colCount;
+  private SqoopHCatImportHelper helper;
 
   @Override
   protected void setup(Context context)
     throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
-    String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
-    jobInfo =
-      (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
-    dataColsSchema = jobInfo.getTableInfo().getDataColumns();
-    partitionSchema =
-      jobInfo.getTableInfo().getPartitionColumns();
-    StringBuilder storerInfoStr = new StringBuilder(1024);
-    StorerInfo storerInfo = jobInfo.getTableInfo().getStorerInfo();
-    storerInfoStr.append("HCatalog Storer Info : ")
-      .append("\n\tHandler = ").append(storerInfo.getStorageHandlerClass())
-      .append("\n\tInput format class = ").append(storerInfo.getIfClass())
-      .append("\n\tOutput format class = ").append(storerInfo.getOfClass())
-      .append("\n\tSerde class = ").append(storerInfo.getSerdeClass());
-    Properties storerProperties = storerInfo.getProperties();
-    if (!storerProperties.isEmpty()) {
-      storerInfoStr.append("\nStorer properties ");
-      for (Map.Entry<Object, Object> entry : storerProperties.entrySet()) {
-        String key = (String) entry.getKey();
-        Object val = entry.getValue();
-        storerInfoStr.append("\n\t").append(key).append('=').append(val);
-      }
-    }
-    storerInfoStr.append("\n");
-    LOG.info(storerInfoStr);
-
-    hCatFullTableSchema = new HCatSchema(dataColsSchema.getFields());
-    for (HCatFieldSchema hfs : partitionSchema.getFields()) {
-      hCatFullTableSchema.append(hfs);
-    }
-    fieldCount = hCatFullTableSchema.size();
-    lobLoader = new LargeObjectLoader(conf,
-      new Path(jobInfo.getTableInfo().getTableLocation()));
-    bigDecimalFormatString = conf.getBoolean(
-      ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
-      ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
-    debugHCatImportMapper = conf.getBoolean(
-      SqoopHCatUtilities.DEBUG_HCAT_IMPORT_MAPPER_PROP, false);
-    IntWritable[] delimChars = DefaultStringifier.loadArray(conf,
-        SqoopHCatUtilities.HIVE_DELIMITERS_TO_REPLACE_PROP, IntWritable.class);
-    hiveDelimiters = new DelimiterSet(
-      (char) delimChars[0].get(), (char) delimChars[1].get(),
-      (char) delimChars[2].get(), (char) delimChars[3].get(),
-      delimChars[4].get() == 1 ? true : false);
-    hiveDelimsReplacement =
-      conf.get(SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_PROP);
-    if (hiveDelimsReplacement == null) {
-      hiveDelimsReplacement = "";
-    }
-    doHiveDelimsReplacement = Boolean.valueOf(conf.get(
-      SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP));
-
-    IntWritable[] fPos = DefaultStringifier.loadArray(conf,
-        SqoopHCatUtilities.HCAT_FIELD_POSITIONS_PROP, IntWritable.class);
-    hCatFieldPositions = new int[fPos.length];
-    for (int i = 0; i < fPos.length; ++i) {
-      hCatFieldPositions[i] = fPos[i].get();
-    }
-
-    LOG.debug("Hive delims replacement enabled : " + doHiveDelimsReplacement);
-    LOG.debug("Hive Delimiters : " + hiveDelimiters.toString());
-    LOG.debug("Hive delimiters replacement : " + hiveDelimsReplacement);
-    staticPartitionKey =
-      conf.get(SqoopHCatUtilities.HCAT_STATIC_PARTITION_KEY_PROP);
-    LOG.debug("Static partition key used : " + staticPartitionKey);
-
-
+    helper = new SqoopHCatImportHelper(conf);
   }
 
   @Override
   public void map(WritableComparable key, SqoopRecord value,
     Context context)
     throws IOException, InterruptedException {
-
-    try {
-      // Loading of LOBs was delayed until we have a Context.
-      value.loadLargeObjects(lobLoader);
-    } catch (SQLException sqlE) {
-      throw new IOException(sqlE);
-    }
-    if (colCount == -1) {
-      colCount = value.getFieldMap().size();
-    }
-    context.write(key, convertToHCatRecord(value));
+    context.write(key, helper.convertToHCatRecord(value));
   }
 
   @Override
   protected void cleanup(Context context) throws IOException {
-    if (null != lobLoader) {
-      lobLoader.close();
-    }
+    helper.cleanup();
   }
 
-  private HCatRecord convertToHCatRecord(SqoopRecord sqr)
-    throws IOException {
-    Map<String, Object> fieldMap = sqr.getFieldMap();
-    HCatRecord result = new DefaultHCatRecord(fieldCount);
-
-    for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
-      String key = entry.getKey();
-      Object val = entry.getValue();
-      String hfn = key.toLowerCase();
-      if (staticPartitionKey != null && staticPartitionKey.equals(hfn)) {
-        continue;
-      }
-      HCatFieldSchema hfs = hCatFullTableSchema.get(hfn);
-      if (debugHCatImportMapper) {
-        LOG.debug("SqoopRecordVal: field = " + key + " Val " + val
-          + " of type " + (val == null ? null : val.getClass().getName())
-          + ", hcattype " + hfs.getTypeString());
-      }
-      Object hCatVal = toHCat(val, hfs.getType(), hfs.getTypeString());
-
-      result.set(hfn, hCatFullTableSchema, hCatVal);
-    }
-
-    return result;
-  }
-
-
-  private Object toHCat(Object val, HCatFieldSchema.Type hfsType,
-    String hCatTypeString) {
-
-    if (val == null) {
-      return null;
-    }
-
-    Object retVal = null;
-
-    if (val instanceof Number) {
-      retVal = convertNumberTypes(val, hfsType);
-    } else if (val instanceof Boolean) {
-      retVal = convertBooleanTypes(val, hfsType);
-    } else if (val instanceof String) {
-      if (hfsType == HCatFieldSchema.Type.STRING) {
-        String str = (String) val;
-        if (doHiveDelimsReplacement) {
-          retVal = FieldFormatter
-            .hiveStringReplaceDelims(str, hiveDelimsReplacement,
-                hiveDelimiters);
-        } else {
-          retVal = str;
-        }
-      }
-    } else if (val instanceof java.util.Date) {
-      retVal = converDateTypes(val, hfsType);
-    } else if (val instanceof BytesWritable) {
-      if (hfsType == HCatFieldSchema.Type.BINARY) {
-        BytesWritable bw = (BytesWritable) val;
-        retVal = bw.getBytes();
-      }
-    } else if (val instanceof BlobRef) {
-      if (hfsType == HCatFieldSchema.Type.BINARY) {
-        BlobRef br = (BlobRef) val;
-        byte[] bytes = br.isExternal() ? br.toString().getBytes()
-          : br.getData();
-        retVal = bytes;
-      }
-    } else if (val instanceof ClobRef) {
-      if (hfsType == HCatFieldSchema.Type.STRING) {
-        ClobRef cr = (ClobRef) val;
-        String s = cr.isExternal() ? cr.toString() : cr.getData();
-        retVal = s;
-      }
-    } else {
-      throw new UnsupportedOperationException("Objects of type "
-        + val.getClass().getName() + " are not suported");
-    }
-    if (retVal == null) {
-      LOG.error("Objects of type "
-        + val.getClass().getName() + " can not be mapped to HCatalog type "
-        + hCatTypeString);
-    }
-    return retVal;
-  }
-
-  private Object converDateTypes(Object val,
-    HCatFieldSchema.Type hfsType) {
-    if (val instanceof java.sql.Date) {
-      if (hfsType == HCatFieldSchema.Type.BIGINT) {
-        return ((Date) val).getTime();
-      } else if (hfsType == HCatFieldSchema.Type.STRING) {
-        return val.toString();
-      }
-    } else if (val instanceof java.sql.Time) {
-      if (hfsType == HCatFieldSchema.Type.BIGINT) {
-        return ((Time) val).getTime();
-      } else if (hfsType == HCatFieldSchema.Type.STRING) {
-        return val.toString();
-      }
-    } else if (val instanceof java.sql.Timestamp) {
-      if (hfsType == HCatFieldSchema.Type.BIGINT) {
-        return ((Timestamp) val).getTime();
-      } else if (hfsType == HCatFieldSchema.Type.STRING) {
-        return val.toString();
-      }
-    }
-    return null;
-  }
-
-  private Object convertBooleanTypes(Object val,
-    HCatFieldSchema.Type hfsType) {
-    Boolean b = (Boolean) val;
-    if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
-      return b;
-    } else if (hfsType == HCatFieldSchema.Type.TINYINT) {
-      return (byte) (b ? 1 : 0);
-    } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
-      return (short) (b ? 1 : 0);
-    } else if (hfsType == HCatFieldSchema.Type.INT) {
-      return (int) (b ? 1 : 0);
-    } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
-      return (long) (b ? 1 : 0);
-    } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
-      return (float) (b ? 1 : 0);
-    } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
-      return (double) (b ? 1 : 0);
-    } else if (hfsType == HCatFieldSchema.Type.STRING) {
-      return val.toString();
-    }
-    return null;
-  }
-
-  private Object convertNumberTypes(Object val,
-    HCatFieldSchema.Type hfsType) {
-    if (!(val instanceof Number)) {
-      return null;
-    }
-    if (val instanceof BigDecimal && hfsType == HCatFieldSchema.Type.STRING) {
-      BigDecimal bd = (BigDecimal) val;
-      if (bigDecimalFormatString) {
-        return bd.toPlainString();
-      } else {
-        return bd.toString();
-      }
-    }
-    Number n = (Number) val;
-    if (hfsType == HCatFieldSchema.Type.TINYINT) {
-      return n.byteValue();
-    } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
-      return n.shortValue();
-    } else if (hfsType == HCatFieldSchema.Type.INT) {
-      return n.intValue();
-    } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
-      return n.longValue();
-    } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
-      return n.floatValue();
-    } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
-      return n.doubleValue();
-    } else if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
-      return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
-    } else if (hfsType == HCatFieldSchema.Type.STRING) {
-      return n.toString();
-    }
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06183f7a/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableExportJob.java b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableExportJob.java
index 7caf9be..117cc3f 100644
--- a/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableExportJob.java
@@ -24,22 +24,25 @@ import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.sqoop.lib.DelimiterSet;
-import org.apache.sqoop.manager.ConnManager;
 import org.apache.sqoop.manager.DirectNetezzaManager;
-import org.apache.sqoop.mapreduce.DBWritable;
+import
+  org.apache.sqoop.mapreduce.db.netezza.NetezzaExternalTableHCatExportMapper;
 import
   org.apache.sqoop.mapreduce.db.netezza.NetezzaExternalTableRecordExportMapper;
 import
   org.apache.sqoop.mapreduce.db.netezza.NetezzaExternalTableTextExportMapper;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 
+import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ExportJobContext;
 import com.cloudera.sqoop.mapreduce.ExportJobBase;
 import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
 
 /**
  * Class that runs an export job using netezza external tables in the mapper.
@@ -79,49 +82,73 @@ public class NetezzaExternalTableExportJob extends ExportJobBase {
     conf.setBoolean(DelimiterSet.INPUT_ENCLOSE_REQUIRED_KEY,
         options.isOutputEncloseRequired());
   }
-  /**
-   * Configure the inputformat to use for the job.
-   */
-  @Override
-  protected void configureInputFormat(Job job, String tableName,
-      String tableClassName, String splitByCol) throws ClassNotFoundException,
-      IOException {
-
-    // Configure the delimiters, etc.
-    Configuration conf = job.getConfiguration();
 
-    ConnManager mgr = context.getConnManager();
-    String username = options.getUsername();
-    if (null == username || username.length() == 0) {
-      DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
-          options.getConnectString());
-    } else {
-      DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
-          options.getConnectString(), username, options.getPassword());
+  @Override
+  protected Class<? extends InputFormat> getInputFormatClass()
+      throws ClassNotFoundException {
+    if (isHCatJob) {
+      return SqoopHCatUtilities.getInputFormatClass();
     }
+    return super.getInputFormatClass();
+  }
 
-    String[] colNames = options.getColumns();
-    if (null == colNames) {
-      colNames = mgr.getColumnNames(tableName);
+  @Override
+  protected void configureInputFormat(Job job, String tableName,
+     String tableClassName, String splitCol)
+     throws ClassNotFoundException, IOException {
+    super.configureInputFormat(job, tableName, tableClassName, splitCol);
+    if (isHCatJob) {
+      SqoopHCatUtilities.configureExportInputFormat(options, job,
+          context.getConnManager(), tableName, job.getConfiguration());
+      return;
     }
+  }
+  @Override
+  protected void configureOutputFormat(Job job, String tableName,
+                                       String tableClassName)
+      throws ClassNotFoundException, IOException {
+    ConnManager mgr = context.getConnManager();
+    try {
+      String username = options.getUsername();
+      if (null == username || username.length() == 0) {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(),
+            options.getConnectString(),
+            options.getConnectionParams());
+      } else {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(),
+            options.getConnectString(),
+            username, options.getPassword(),
+            options.getConnectionParams());
+      }
 
-    String[] sqlColNames = null;
-    if (null != colNames) {
-      sqlColNames = new String[colNames.length];
-      for (int i = 0; i < colNames.length; i++) {
-        sqlColNames[i] = mgr.escapeColName(colNames[i]);
+      String [] colNames = options.getColumns();
+      if (null == colNames) {
+        colNames = mgr.getColumnNames(tableName);
       }
-    }
 
-    DataDrivenDBInputFormat.setInput(job, DBWritable.class, tableName, null,
-        null, sqlColNames);
+      if (mgr.escapeTableNameOnExport()) {
+        DBOutputFormat.setOutput(job, mgr.escapeTableName(tableName), colNames);
+      } else {
+        DBOutputFormat.setOutput(job, tableName, colNames);
+      }
 
-    // Configure the actual InputFormat to use.
-    super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+      job.setOutputFormatClass(getOutputFormatClass());
+      if (isHCatJob) {
+        job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY,
+          tableClassName);
+      }
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Could not load OutputFormat", cnfe);
+    }
   }
 
   @Override
   protected Class<? extends Mapper> getMapperClass() {
+    if (isHCatJob) {
+      return NetezzaExternalTableHCatExportMapper.class;
+    }
     if (inputIsSequenceFiles()) {
       return NetezzaExternalTableRecordExportMapper.class;
     } else {