You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ve...@apache.org on 2014/06/29 18:03:18 UTC

[7/7] git commit: SQOOP-1287: Add high performance Oracle connector into Sqoop (David Robson via Venkat Ranganathan)

SQOOP-1287: Add high performance Oracle connector into Sqoop
  (David Robson via Venkat Ranganathan)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/6bfaa9d6
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/6bfaa9d6
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/6bfaa9d6

Branch: refs/heads/trunk
Commit: 6bfaa9d65395cf1cc365a4a6b9df492550c5c847
Parents: d03faf3
Author: Venkat Ranganathan <ve...@hortonworks.com>
Authored: Sun Jun 29 09:02:05 2014 -0700
Committer: Venkat Ranganathan <ve...@hortonworks.com>
Committed: Sun Jun 29 09:02:05 2014 -0700

----------------------------------------------------------------------
 COMPILING.txt                                   |    8 +-
 build.xml                                       |    5 +
 conf/oraoop-site-template.xml                   |  103 ++
 src/java/org/apache/sqoop/ConnFactory.java      |   10 +-
 .../sqoop/manager/oracle/OraOopConnManager.java |  630 +++++++
 .../sqoop/manager/oracle/OraOopConstants.java   |  512 ++++++
 .../manager/oracle/OraOopDBInputSplit.java      |  195 ++
 .../manager/oracle/OraOopDBRecordReader.java    |  468 +++++
 .../oracle/OraOopDataDrivenDBInputFormat.java   |  359 ++++
 .../sqoop/manager/oracle/OraOopGenerics.java    |   64 +
 .../sqoop/manager/oracle/OraOopJdbcUrl.java     |  232 +++
 .../apache/sqoop/manager/oracle/OraOopLog.java  |  235 +++
 .../sqoop/manager/oracle/OraOopLogFactory.java  |   54 +
 .../sqoop/manager/oracle/OraOopLogMessage.java  |   61 +
 .../manager/oracle/OraOopManagerFactory.java    | 1126 ++++++++++++
 .../manager/oracle/OraOopOracleDataChunk.java   |   68 +
 .../oracle/OraOopOracleDataChunkExtent.java     |   93 +
 .../oracle/OraOopOracleDataChunkPartition.java  |   78 +
 .../manager/oracle/OraOopOracleQueries.java     | 1687 ++++++++++++++++++
 .../manager/oracle/OraOopOutputFormatBase.java  |  713 ++++++++
 .../oracle/OraOopOutputFormatInsert.java        |  263 +++
 .../oracle/OraOopOutputFormatUpdate.java        |  418 +++++
 .../sqoop/manager/oracle/OraOopUtilities.java   | 1461 +++++++++++++++
 .../manager/oracle/OracleActiveInstance.java    |   44 +
 .../manager/oracle/OracleConnectionFactory.java |  217 +++
 .../sqoop/manager/oracle/OracleTable.java       |   68 +
 .../sqoop/manager/oracle/OracleTableColumn.java |   59 +
 .../manager/oracle/OracleTableColumns.java      |   43 +
 .../manager/oracle/OracleTablePartition.java    |   50 +
 .../manager/oracle/OracleTablePartitions.java   |   62 +
 .../sqoop/manager/oracle/OracleVersion.java     |   84 +
 .../com/cloudera/sqoop/manager/OracleUtils.java |   10 +
 .../sqoop/testutil/BaseSqoopTestCase.java       |    2 +
 .../sqoop/testutil/ExportJobTestCase.java       |    2 +
 .../sqoop/testutil/ImportJobTestCase.java       |    4 +
 src/test/oraoop/create_users.sql                |   49 +
 src/test/oraoop/pkg_tst_product_gen.pbk         |  126 ++
 src/test/oraoop/pkg_tst_product_gen.psk         |   45 +
 src/test/oraoop/table_tst_product.xml           |   90 +
 src/test/oraoop/table_tst_product_part.xml      |  103 ++
 .../oraoop/table_tst_product_special_chars.xml  |   90 +
 src/test/oraoop/table_tst_product_subpart.xml   |  105 ++
 .../apache/sqoop/manager/oracle/ExportTest.java |   68 +
 .../apache/sqoop/manager/oracle/ImportTest.java |  241 +++
 .../manager/oracle/OraOopOracleQueriesTest.java |   54 +
 .../sqoop/manager/oracle/OraOopTestCase.java    |  321 ++++
 .../manager/oracle/OraOopTestConstants.java     |   62 +
 .../oracle/OracleConnectionFactoryTest.java     |  520 ++++++
 .../sqoop/manager/oracle/SystemImportTest.java  |  315 ++++
 .../TestOraOopDataDrivenDBInputFormat.java      |  131 ++
 .../sqoop/manager/oracle/TestOraOopJdbcUrl.java |  276 +++
 .../manager/oracle/TestOraOopUtilities.java     |  619 +++++++
 .../sqoop/manager/oracle/TestOracleTable.java   |   42 +
 .../sqoop/manager/oracle/TimestampDataTest.java |   51 +
 .../oracle/util/BigDecimalGenerator.java        |   57 +
 .../oracle/util/BinaryDoubleGenerator.java      |   32 +
 .../oracle/util/BinaryFloatGenerator.java       |   32 +
 .../manager/oracle/util/BlobGenerator.java      |  103 ++
 .../manager/oracle/util/BytesGenerator.java     |   52 +
 .../manager/oracle/util/CharGenerator.java      |   54 +
 .../manager/oracle/util/FloatGenerator.java     |   57 +
 .../sqoop/manager/oracle/util/HadoopFiles.java  |   37 +
 .../oracle/util/IntervalDaySecondGenerator.java |   64 +
 .../oracle/util/IntervalYearMonthGenerator.java |   50 +
 .../manager/oracle/util/NCharGenerator.java     |   54 +
 .../oracle/util/OraOopTestDataGenerator.java    |   67 +
 .../manager/oracle/util/OraOopTestUtils.java    |   60 +
 .../sqoop/manager/oracle/util/OracleData.java   |  192 ++
 .../oracle/util/OracleDataDefinition.java       |   66 +
 .../oracle/util/OracleTableDefinition.java      |  150 ++
 .../manager/oracle/util/RowIdGenerator.java     |   64 +
 .../manager/oracle/util/TimestampGenerator.java |   71 +
 .../sqoop/manager/oracle/util/URIGenerator.java |   57 +
 73 files changed, 14112 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/COMPILING.txt
----------------------------------------------------------------------
diff --git a/COMPILING.txt b/COMPILING.txt
index ddbed30..ae28411 100644
--- a/COMPILING.txt
+++ b/COMPILING.txt
@@ -85,9 +85,12 @@ jdbc:mysql://localhost/
 
 === Oracle
 
-Install Oracle XE (Express edition) 10.2.0. Instructions for configuring the
+Install Oracle Enterprise Edition 10.2.0+. Instructions for configuring the
 database are in OracleManagerTest. Download the ojdbc6_g jar.
 
+If running the tests against Oracle XE (Express Edition) - a lot of them will
+fail as it does not include the partitioning feature.
+
 Use the system property sqoop.test.oracle.connectstring to specify the
 connection string for Oracle host used for testing. Specify this property on the
 command line or via the build.properties file. For example:
@@ -97,6 +100,9 @@ sqoop.test.oracle.connectstring=jdbc:oracle:thin:@//host.example.com/xe
 If not specified, the default value used for this property is:
 jdbc:oracle:thin:@//localhost/xe
 
+Users sqooptest and sqooptest2 should be created prior to running the tests.
+SQL script is available in src/test/oraoop/create_users.sql
+
 === PostgreSQL
 
 Install PostgreSQL 8.3.9. Download the postgresql 8.4 jdbc driver. Instructions

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 2dc99a8..ec5d2fa 100644
--- a/build.xml
+++ b/build.xml
@@ -827,6 +827,11 @@
     <mkdir dir="${cobertura.class.dir}" />
     <copy file="${test.dir}/fi-site.xml"
             todir="${test.build.extraconf}" />
+    <copy file="${basedir}/conf/oraoop-site-template.xml"
+            todir="${test.build.extraconf}" />
+    <copy todir="${test.build.extraconf}/oraoop">
+      <fileset dir="${test.dir}/oraoop"/>
+    </copy>
     <junit
       printsummary="yes" showoutput="${test.output}"
       haltonfailure="no" fork="yes" maxmemory="512m"

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/conf/oraoop-site-template.xml
----------------------------------------------------------------------
diff --git a/conf/oraoop-site-template.xml b/conf/oraoop-site-template.xml
new file mode 100644
index 0000000..eac9796
--- /dev/null
+++ b/conf/oraoop-site-template.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+<!-- Put OraOop-specific properties in this file. -->
+
+<configuration>
+
+  <property>
+    <name>oraoop.oracle.session.initialization.statements</name>
+    <value>alter session disable parallel query;
+           alter session set "_serial_direct_read"=true;
+           alter session set tracefile_identifier=oraoop;
+           --alter session set events '10046 trace name context forever, level 8';
+    </value>
+    <description>A semicolon-delimited list of Oracle statements that are executed, in order, to initialize each Oracle session.
+                 Use {[property_name]|[default_value]} characters to refer to a Sqoop/Hadoop configuration property.
+                 If the property does not exist, the specified default value will be used.
+                 E.g. {oracle.sessionTimeZone|GMT} will equate to the value of the property named "oracle.sessionTimeZone" or
+                 to "GMT" if this property has not been set.
+    </description>
+  </property>
+
+  <property>
+    <name>mapred.map.tasks.speculative.execution</name>
+    <value>false</value>
+    <description>Speculative execution is disabled to prevent redundant load on the Oracle database.
+    </description>
+  </property>
+
+  <property>
+    <name>oraoop.import.hint</name>
+    <value>NO_INDEX(t)</value>
+    <description>Hint to add to the SELECT statement for an IMPORT job.
+                 The table will have an alias of t which can be used in the hint.
+                 By default the NO_INDEX hint is applied to stop the use of an index.
+                 To override this in oraoop-site.xml set the value to a blank string.
+    </description>
+  </property>
+
+<!--
+  <property>
+    <name>oraoop.block.allocation</name>
+    <value>ROUNDROBIN</value>
+    <description>Supported values are: ROUNDROBIN or SEQUENTIAL or RANDOM.
+                 Refer to the OraOop documentation for more details.
+    </description>
+  </property>
+-->
+
+<!--
+  <property>
+    <name>oraoop.import.omit.lobs.and.long</name>
+    <value>false</value>
+    <description>If true, OraOop will omit BLOB, CLOB, NCLOB and LONG columns during an Import.
+    </description>
+  </property>
+-->
+
+<!--
+  <property>
+    <name>oraoop.table.import.where.clause.location</name>
+    <value>SUBSPLIT</value>
+    <description>Supported values are: SUBSPLIT or SPLIT.
+                 Refer to the OraOop documentation for more details.
+    </description>
+  </property>
+-->
+
+<!--
+  <property>
+    <name>oraoop.oracle.append.values.hint.usage</name>
+    <value>AUTO</value>
+    <description>Supported values are: AUTO or ON or OFF.
+                 ON:
+                     OraOop will use the APPEND_VALUES Oracle hint during a Sqoop export, when inserting
+                     data into an Oracle table.
+                 OFF:
+                     OraOop will not use the APPEND_VALUES Oracle hint during a Sqoop export.
+                 AUTO:
+                     For OraOop 1.1, the AUTO setting will not use the APPEND_VALUES hint.
+    </description>
+  </property>
+-->
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/ConnFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/ConnFactory.java b/src/java/org/apache/sqoop/ConnFactory.java
index 61d3307..2276525 100644
--- a/src/java/org/apache/sqoop/ConnFactory.java
+++ b/src/java/org/apache/sqoop/ConnFactory.java
@@ -43,6 +43,7 @@ import com.cloudera.sqoop.metastore.JobData;
 
 import com.cloudera.sqoop.util.ClassLoaderStack;
 import org.apache.sqoop.manager.GenericJdbcManager;
+import org.apache.sqoop.manager.oracle.OraOopManagerFactory;
 
 /**
  * Factory class to create the ConnManager type required
@@ -70,8 +71,12 @@ public class ConnFactory {
 
   // The default value for sqoop.connection.factories is the
   // name of the DefaultManagerFactory.
+  public static final String[] DEFAULT_FACTORY_CLASS_NAMES_ARR =
+      {OraOopManagerFactory.class.getName(),
+      DefaultManagerFactory.class.getName(), };
+
   public static final String DEFAULT_FACTORY_CLASS_NAMES =
-      DefaultManagerFactory.class.getName();
+      StringUtils.arrayToString(DEFAULT_FACTORY_CLASS_NAMES_ARR);
 
   /** The list of ManagerFactory instances consulted by getManager().
    */
@@ -84,7 +89,8 @@ public class ConnFactory {
   private void instantiateFactories(Configuration conf) {
     loadManagersFromConfDir(conf);
     String [] classNameArray =
-        conf.getStrings(FACTORY_CLASS_NAMES_KEY, DEFAULT_FACTORY_CLASS_NAMES);
+        conf.getStrings(FACTORY_CLASS_NAMES_KEY,
+            DEFAULT_FACTORY_CLASS_NAMES_ARR);
 
     for (String className : classNameArray) {
       try {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java b/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
new file mode 100644
index 0000000..302849c
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
@@ -0,0 +1,630 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.OutputFormat;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.manager.GenericJdbcManager;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.JdbcExportJob;
+import com.cloudera.sqoop.mapreduce.JdbcUpdateExportJob;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * OraOop manager for high performance Oracle import / export.
+ * NOTES:
+ *   Escaping Column Names:
+ *   ----------------------
+ *   There are 3 main queries that occur during a Sqoop import.
+ *   (1) Selecting columns to obtain their data-type via getColTypesQuery();
+ *   (2) selecting column names via getColNamesQuery(); and
+ *   (3) getting the data during the import via
+ *       OraOopDBRecordReader.getSelectQuery();
+ *   In each of these queries, we'd ideally escape the column names so that
+ *   Oracle columns that require this work okay.
+ *   Unfortunately we can't do this, because if the user specifies column
+ *   names via the "--columns" clause, these names will be used (verbatim)
+ *   during OraOopDBRecordReader.getSelectQuery(). This means that we could
+ *   only escape the column names during OraOopDBRecordReader.getSelectQuery()
+ *   if the user entered them in the correct case.
+ *   Therefore, escapeColName() in this class does not actually do anything so
+ *   that OraOopDBRecordReader.getSelectQuery() generates a valid SQL statement
+ *   when the user utilises the "--columns" clause.
+ *   However, getColTypesQuery() and getColNamesQuery() do escape column names
+ *   via the method escapeOracleColumnName(). We also get getColumnTypes() to
+ *   unescape the column names so that Sqoop has the most accurate column
+ *   name strings.
+ */
+public class OraOopConnManager extends GenericJdbcManager {
+
+  public static final OraOopLog LOG = OraOopLogFactory
+      .getLog(OraOopConnManager.class.getName());
+  private List<String> columnNamesInOracleTable = null;
+  private Map<String, Integer> columnTypesInOracleTable = null;
+  private final String timestampJavaType;
+
+  public OraOopConnManager(final SqoopOptions sqoopOptions) {
+    super(OraOopConstants.ORACLE_JDBC_DRIVER_CLASS, sqoopOptions);
+    if (this.options.getConf().getBoolean(
+        OraOopConstants.ORAOOP_MAP_TIMESTAMP_AS_STRING,
+        OraOopConstants.ORAOOP_MAP_TIMESTAMP_AS_STRING_DEFAULT)) {
+      timestampJavaType = "String";
+    } else {
+      timestampJavaType = super.toJavaType(Types.TIMESTAMP);
+    }
+  }
+
+  @Override
+  protected Connection makeConnection() throws SQLException {
+
+    String connectStr = this.options.getConnectString();
+    String username = this.options.getUsername();
+    String password = this.options.getPassword();
+    Properties additionalProps = this.options.getConnectionParams();
+
+    Connection connection =
+        OracleConnectionFactory.createOracleJdbcConnection(this
+            .getDriverClass(), connectStr, username, password, additionalProps);
+
+    return connection;
+  }
+
+  @Override
+  public void close() throws SQLException {
+
+    super.close();
+  }
+
+  private List<String> getColumnNamesInOracleTable(String tableName) {
+
+    if (this.columnNamesInOracleTable == null) {
+
+      OracleTable tableContext = null;
+
+      try {
+        tableContext = getOracleTableContext();
+
+        Configuration conf = this.options.getConf();
+
+        this.columnNamesInOracleTable =
+            OraOopOracleQueries.getTableColumnNames(getConnection(),
+                tableContext, OraOopUtilities
+                    .omitLobAndLongColumnsDuringImport(conf), OraOopUtilities
+                    .recallSqoopJobType(conf), true, // <-
+                                                     // onlyOraOopSupportedTypes
+                true // <- omitOraOopPseudoColumns
+                );
+      } catch (SQLException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    // Return a copy of our list, so the original will not be inadvertently
+    // altered...
+    return OraOopUtilities.copyStringList(this.columnNamesInOracleTable);
+  }
+
+  protected List<String> getSelectedColumnNamesInOracleTable(String tableName) {
+
+    List<String> colNamesInTable = getColumnNamesInOracleTable(tableName);
+
+    String[] selectedColumns = this.options.getColumns();
+    if (selectedColumns != null && selectedColumns.length > 0) {
+
+      for (int idx = 0; idx < selectedColumns.length; idx++) {
+
+        String selectedColumn = selectedColumns[idx];
+        // If the user did not escape this column name, then we should
+        // uppercase it...
+        if (!isEscaped(selectedColumn)) {
+          selectedColumns[idx] = selectedColumn.toUpperCase();
+        } else {
+          // If the user escaped this column name, then we should
+          // retain its case...
+          selectedColumns[idx] = unescapeOracleColumnName(selectedColumn);
+        }
+      }
+
+      // Ensure there are no duplicated column names...
+      String[] duplicates =
+          OraOopUtilities
+              .getDuplicatedStringArrayValues(selectedColumns, false);
+      if (duplicates.length > 0) {
+        StringBuilder msg = new StringBuilder();
+        msg.append("The following column names have been duplicated in the ");
+        msg.append("\"--columns\" clause:\n");
+
+        for (String duplicate : duplicates) {
+          msg.append("\t" + duplicate + "\n");
+        }
+
+        throw new RuntimeException(msg.toString());
+      }
+
+      // Ensure the user selected column names that actually exist...
+      for (String selectedColumn : selectedColumns) {
+        if (!colNamesInTable.contains(selectedColumn)) {
+          OracleTable tableContext = getOracleTableContext();
+          throw new RuntimeException(String.format(
+              "The column named \"%s\" does not exist within the table"
+                  + "%s (or is of an unsupported data-type).", selectedColumn,
+              tableContext.toString()));
+        }
+      }
+
+      // Remove any columns (that exist in the table) that were not
+      // selected by the user...
+      for (int idx = colNamesInTable.size() - 1; idx >= 0; idx--) {
+        String colName = colNamesInTable.get(idx);
+        if (!OraOopUtilities.stringArrayContains(selectedColumns, colName,
+            false)) {
+          colNamesInTable.remove(idx);
+        }
+      }
+    }
+
+    // To assist development/testing of Oracle data-types, you can use this
+    // to limit the number of columns from the table...
+    int columnNameLimit =
+        this.options.getConf().getInt("oraoop.column.limit", 0);
+    if (columnNameLimit > 0) {
+      columnNameLimit = Math.min(columnNameLimit, colNamesInTable.size());
+      colNamesInTable = colNamesInTable.subList(0, columnNameLimit);
+    }
+
+    return colNamesInTable;
+  }
+
+  @Override
+  protected String getColTypesQuery(String tableName) {
+
+    List<String> colNames = getSelectedColumnNamesInOracleTable(tableName);
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("SELECT ");
+    for (int idx = 0; idx < colNames.size(); idx++) {
+      if (idx > 0) {
+        sb.append(",");
+      }
+      sb.append(escapeOracleColumnName(colNames.get(idx))); // <- See notes at
+                                                            // top about escaped
+                                                            // column names
+    }
+    sb.append(String.format(" FROM %s WHERE 0=1", tableName));
+
+    return sb.toString();
+  }
+
+  @Override
+  protected String getColNamesQuery(String tableName) {
+
+    // NOTE: This code is similar to getColTypesQuery() - except the
+    // escaping of column names and table name differs.
+
+    List<String> colNames = getSelectedColumnNamesInOracleTable(tableName);
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("SELECT ");
+    for (int idx = 0; idx < colNames.size(); idx++) {
+      if (idx > 0) {
+        sb.append(",");
+      }
+      sb.append(escapeColName(colNames.get(idx))); // <- See notes at top about
+                                                   // escaped column names
+    }
+    sb.append(String.format(" FROM %s WHERE 1=0", escapeTableName(tableName)));
+
+    return sb.toString();
+  }
+
+  @Override
+  protected String getSplitColumn(SqoopOptions opts, String tableName) {
+
+    // If we're importing an Oracle table and will be generating
+    // "splits" based on its Oracle data-files, we don't actually require
+    // a primary key to exist, or for the user to identify the split-column.
+    // As a consequence, return "NotRequired" to prevent sqoop code
+    // such as SqlManager.importTable() from throwing an exception.
+    //
+    // NB: The tableName parameter will be null if no table is involved,
+    // such as when importing data via an (arbitrary) SQL query.
+    if (tableName != null) {
+      return OraOopConstants.TABLE_SPLIT_COLUMN_NOT_REQUIRED;
+    } else {
+      return super.getSplitColumn(opts, tableName);
+    }
+  }
+
+  @Override
+  public void importTable(ImportJobContext context) throws IOException,
+      ImportException {
+
+    logImportTableDetails(context);
+
+    context.setConnManager(this);
+
+    // Specify the Oracle-specific DBInputFormat for import.
+    context.setInputFormat(OraOopDataDrivenDBInputFormat.class);
+
+    super.importTable(context);
+  }
+
+  @Override
+  public void exportTable(ExportJobContext context) throws IOException,
+      ExportException {
+
+    logExportTableDetails(context);
+
+    if (this.columnTypesInOracleTable == null) {
+      throw new ExportException("The column-types for the table are not"
+          + "known.");
+    }
+    if (this.columnTypesInOracleTable.containsValue(OraOopOracleQueries
+        .getOracleType("BINARY_DOUBLE"))) {
+      context.getOptions().getConf().setBoolean(
+          OraOopConstants.TABLE_CONTAINS_BINARY_DOUBLE_COLUMN, true);
+    }
+    if (this.columnTypesInOracleTable.containsValue(OraOopOracleQueries
+        .getOracleType("BINARY_FLOAT"))) {
+      context.getOptions().getConf().setBoolean(
+          OraOopConstants.TABLE_CONTAINS_BINARY_FLOAT_COLUMN, true);
+    }
+
+    context.setConnManager(this);
+
+    @SuppressWarnings("rawtypes")
+    Class<? extends OutputFormat> oraOopOutputFormatClass;
+    try {
+      oraOopOutputFormatClass = OraOopOutputFormatInsert.class;
+    } catch (NoClassDefFoundError ex) {
+      explainWhyExportClassCannotBeLoaded(ex, "OraOopOutputFormatInsert");
+      throw ex;
+    }
+    JdbcExportJob exportJob =
+        new JdbcExportJob(context, null, null, oraOopOutputFormatClass);
+    exportJob.runExport();
+  }
+
+  @Override
+  public void updateTable(ExportJobContext context) throws IOException,
+      ExportException {
+
+    logExportTableDetails(context);
+
+    context.setConnManager(this);
+
+    @SuppressWarnings("rawtypes")
+    Class<? extends OutputFormat> oraOopOutputFormatClass;
+    try {
+      oraOopOutputFormatClass = OraOopOutputFormatUpdate.class;
+    } catch (NoClassDefFoundError ex) {
+      explainWhyExportClassCannotBeLoaded(ex, "OraOopOutputFormatUpdate");
+      throw ex;
+    }
+
+    JdbcUpdateExportJob exportJob =
+        new JdbcUpdateExportJob(context, null, null, oraOopOutputFormatClass);
+    exportJob.runExport();
+  }
+
+  @Override
+  protected void finalize() throws Throwable {
+
+    close();
+    super.finalize();
+  }
+
+  @Override
+  public String toHiveType(int sqlType) {
+
+    String hiveType = super.toHiveType(sqlType);
+
+    if (hiveType == null) {
+
+      // http://wiki.apache.org/hadoop/Hive/Tutorial#Primitive_Types
+
+      if (sqlType == OraOopOracleQueries.getOracleType("BFILE")
+          || sqlType == OraOopOracleQueries.getOracleType("INTERVALYM")
+          || sqlType == OraOopOracleQueries.getOracleType("INTERVALDS")
+          || sqlType == OraOopOracleQueries.getOracleType("NCLOB")
+          || sqlType == OraOopOracleQueries.getOracleType("NCHAR")
+          || sqlType == OraOopOracleQueries.getOracleType("NVARCHAR")
+          || sqlType == OraOopOracleQueries.getOracleType("OTHER")
+          || sqlType == OraOopOracleQueries.getOracleType("ROWID")
+          || sqlType == OraOopOracleQueries.getOracleType("TIMESTAMPTZ")
+          || sqlType == OraOopOracleQueries.getOracleType("TIMESTAMPLTZ")
+          || sqlType == OraOopOracleQueries.getOracleType("STRUCT")) {
+        hiveType = "STRING";
+      }
+
+      if (sqlType == OraOopOracleQueries.getOracleType("BINARY_FLOAT")) {
+        hiveType = "FLOAT";
+      }
+
+      if (sqlType == OraOopOracleQueries.getOracleType("BINARY_DOUBLE")) {
+        hiveType = "DOUBLE";
+      }
+    }
+
+    if (hiveType == null) {
+      LOG.warn(String.format("%s should be updated to cater for data-type: %d",
+          OraOopUtilities.getCurrentMethodName(), sqlType));
+    }
+
+    return hiveType;
+  }
+
+  @Override
+  public String toJavaType(int sqlType) {
+
+    String javaType = super.toJavaType(sqlType);
+
+    if (sqlType == OraOopOracleQueries.getOracleType("TIMESTAMP")) {
+      // Get the Oracle JDBC driver to convert this value to a string
+      // instead of the generic JDBC driver.
+      // If the generic JDBC driver is used, it will take into account the
+      // timezone of the client machine's locale. The problem with this is that
+      // timestamp data should not be associated with a timezone. In practice,
+      // this
+      // leads to problems, for example, the time '2010-10-03 02:01:00' being
+      // changed to '2010-10-03 03:01:00' if the client machine's locale is
+      // Melbourne.
+      // (This is in response to daylight saving starting in Melbourne on
+      // this date at 2am.)
+      javaType = timestampJavaType;
+    }
+
+    if (sqlType == OraOopOracleQueries.getOracleType("TIMESTAMPTZ")) {
+      // Returning "String" produces: "2010-08-08 09:00:00.0 +10:00"
+      // Returning "java.sql.Timestamp" produces: "2010-08-08 09:00:00.0"
+
+      // If we use "java.sql.Timestamp", the field's value will not
+      // contain the timezone when converted to a string and written to the HDFS
+      // CSV file.
+      // I.e. Get the Oracle JDBC driver to convert this value to a string
+      // instead of the generic JDBC driver...
+      javaType = timestampJavaType;
+    }
+
+    if (sqlType == OraOopOracleQueries.getOracleType("TIMESTAMPLTZ")) {
+      // Returning "String" produces:
+      // "2010-08-08 09:00:00.0 Australia/Melbourne"
+      // Returning "java.sql.Timestamp" produces: "2010-08-08 09:00:00.0"
+      javaType = timestampJavaType;
+    }
+
+    /*
+     * http://www.oracle.com/technology/sample_code/tech/java/sqlj_jdbc/files
+     * /oracle10g/ieee/Readme.html
+     *
+     * BINARY_DOUBLE is a 64-bit, double-precision floating-point number
+     * datatype. (IEEE 754) Each BINARY_DOUBLE value requires 9 bytes, including
+     * a length byte. A 64-bit double format number X is divided as sign s 1-bit
+     * exponent e 11-bits fraction f 52-bits
+     *
+     * BINARY_FLOAT is a 32-bit, single-precision floating-point number
+     * datatype. (IEEE 754) Each BINARY_FLOAT value requires 5 bytes, including
+     * a length byte. A 32-bit single format number X is divided as sign s 1-bit
+     * exponent e 8-bits fraction f 23-bits
+     */
+    if (sqlType == OraOopOracleQueries.getOracleType("BINARY_FLOAT")) {
+      // http://people.uncw.edu/tompkinsj/133/numbers/Reals.htm
+      javaType = "Float";
+    }
+
+    if (sqlType == OraOopOracleQueries.getOracleType("BINARY_DOUBLE")) {
+      // http://people.uncw.edu/tompkinsj/133/numbers/Reals.htm
+      javaType = "Double";
+    }
+
+    if (sqlType == OraOopOracleQueries.getOracleType("STRUCT")) {
+      // E.g. URITYPE
+      javaType = "String";
+    }
+
+    if (javaType == null) {
+
+      // For constant values, refer to:
+      // http://oracleadvisor.com/documentation/oracle/database/11.2/
+      //   appdev.112/e13995/constant-values.html#oracle_jdbc
+
+      if (sqlType == OraOopOracleQueries.getOracleType("BFILE")
+          || sqlType == OraOopOracleQueries.getOracleType("NCLOB")
+          || sqlType == OraOopOracleQueries.getOracleType("NCHAR")
+          || sqlType == OraOopOracleQueries.getOracleType("NVARCHAR")
+          || sqlType == OraOopOracleQueries.getOracleType("ROWID")
+          || sqlType == OraOopOracleQueries.getOracleType("INTERVALYM")
+          || sqlType == OraOopOracleQueries.getOracleType("INTERVALDS")
+          || sqlType == OraOopOracleQueries.getOracleType("OTHER")) {
+        javaType = "String";
+      }
+
+    }
+
+    if (javaType == null) {
+      LOG.warn(String.format("%s should be updated to cater for data-type: %d",
+          OraOopUtilities.getCurrentMethodName(), sqlType));
+    }
+
+    return javaType;
+  }
+
+  @Override
+  public String timestampToQueryString(Timestamp ts) {
+
+    return "TO_TIMESTAMP('" + ts + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
+  }
+
+  public OracleTable getOracleTableContext() {
+
+    return OraOopUtilities.decodeOracleTableName(this.options.getUsername(),
+        this.options.getTableName(), this.options.getConf());
+  }
+
+  @Override
+  public Map<String, Integer> getColumnTypes(String tableName) {
+
+    if (this.columnTypesInOracleTable == null) {
+
+      Map<String, Integer> columnTypes = super.getColumnTypes(tableName);
+      this.columnTypesInOracleTable = new HashMap<String, Integer>();
+
+      List<String> colNames = getColumnNamesInOracleTable(tableName);
+
+      for (int idx = 0; idx < colNames.size(); idx++) {
+
+        String columnNameInTable = colNames.get(idx);
+        if (columnTypes.containsKey(columnNameInTable)) {
+
+          // Unescape the column names being returned...
+          int colType = columnTypes.get(columnNameInTable);
+          String key = unescapeOracleColumnName(columnNameInTable); // <- See
+                                                                    // notes at
+                                                                    // top about
+                                                                    // escaped
+                                                                    // column
+                                                                    // names
+          this.columnTypesInOracleTable.put(key, colType);
+        }
+      }
+    }
+
+    return this.columnTypesInOracleTable;
+  }
+
+  private boolean isEscaped(String name) {
+
+    return name.startsWith("\"") && name.endsWith("\"");
+  }
+
+  private String escapeOracleColumnName(String columnName) {
+    // See notes at top about escaped column names
+    if (isEscaped(columnName)) {
+      return columnName;
+    } else {
+      return "\"" + columnName + "\"";
+    }
+  }
+
+  @Override
+  public String escapeColName(String colName) {
+
+    return super.escapeColName(colName); // <- See notes at top about escaped
+                                         // column names
+  }
+
+  private String unescapeOracleColumnName(String columnName) {
+
+    if (isEscaped(columnName)) {
+      return columnName.substring(1, columnName.length() - 1);
+    } else {
+      return columnName;
+    }
+  }
+
+  private void logImportTableDetails(ImportJobContext context) {
+
+    Path outputDirectory = context.getDestination();
+    if (outputDirectory != null) {
+      LOG.debug("The output directory for the sqoop table import is : "
+          + outputDirectory.getName());
+    }
+
+    // Indicate whether we can load the class named: OraOopOraStats
+    showUserWhetherOraOopOraStatsIsAvailable(context.getOptions().getConf());
+  }
+
+  private void logExportTableDetails(ExportJobContext context) {
+
+    // Indicate whether we can load the class named: OraOopOraStats
+    showUserWhetherOraOopOraStatsIsAvailable(context.getOptions().getConf());
+
+    // Indicate what the update/merge columns are...
+    String[] updateKeyColumns =
+        OraOopUtilities.getExportUpdateKeyColumnNames(context.getOptions());
+    if (updateKeyColumns.length > 0) {
+      LOG.info(String.format(
+          "The column%s used to match rows in the HDFS file with rows in "
+              + "the Oracle table %s: %s", updateKeyColumns.length > 1 ? "s"
+              : "", updateKeyColumns.length > 1 ? "are" : "is", OraOopUtilities
+              .stringArrayToCSV(updateKeyColumns)));
+    }
+  }
+
+  private void showUserWhetherOraOopOraStatsIsAvailable(Configuration conf) {
+
+    if (OraOopUtilities.userWantsOracleSessionStatisticsReports(conf)) {
+
+      LOG.info(String.format("%s=true",
+          OraOopConstants.ORAOOP_REPORT_SESSION_STATISTICS));
+
+      // This will log a warning if it's unable to load the OraOopOraStats
+      // class...
+      OraOopUtilities.startSessionSnapshot(null);
+    }
+  }
+
+  @Override
+  protected String getCurTimestampQuery() {
+
+    return "SELECT SYSTIMESTAMP FROM DUAL";
+  }
+
+  @Override
+  protected void checkTableImportOptions(ImportJobContext context)
+      throws IOException, ImportException {
+
+    // Update the unit-test code if you modify this method.
+    super.checkTableImportOptions(context);
+  }
+
+  private void explainWhyExportClassCannotBeLoaded(NoClassDefFoundError ex,
+      String exportClassName) {
+
+    String msg =
+        String.format("Unable to load class %s.\n"
+            + "This is most likely caused by the Cloudera Shim Jar "
+            + "not being included in the Java Classpath.\n" + "Either:\n"
+            + "\tUse \"-libjars\" on the Sqoop command-line to "
+            + "include the Cloudera shim jar in the Java Classpath; or"
+            + "\n\tCopy the Cloudera shim jar into the Sqoop/lib "
+            + "directory so that it is automatically included in the "
+            + "Java Classpath; or\n"
+            + "\tObtain an updated version of Sqoop that addresses "
+            + "the Sqoop Jira \"SQOOP-127\".\n" + "\n"
+            + "The Java Classpath is:\n%s", exportClassName, OraOopUtilities
+            .getJavaClassPath());
+    LOG.fatal(msg, ex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OraOopConstants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopConstants.java b/src/java/org/apache/sqoop/manager/oracle/OraOopConstants.java
new file mode 100644
index 0000000..874ef02
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopConstants.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+/**
+ * Constants for OraOop.
+ */
+public final class OraOopConstants {
+  private OraOopConstants() {
+  }
+
+  public static final String ORAOOP_PRODUCT_NAME =
+      "Data Connector for Oracle and Hadoop";
+  public static final String ORAOOP_JAR_FILENAME = "oraoop.jar";
+
+  // Disables OraOop - falling back to the OracleManager inside Sqoop...
+  public static final String ORAOOP_DISABLED = "oraoop.disabled";
+
+  // Whether to log Oracle session statistics using Guy Harrison's jar file...
+  public static final String ORAOOP_REPORT_SESSION_STATISTICS =
+      "oraoop.report.session.statistics";
+
+  // Disables dynamic JDBC URL generation for each mapper...
+  public static final String ORAOOP_JDBC_URL_VERBATIM =
+      "oraoop.jdbc.url.verbatim";
+
+  // The name of the Oracle RAC service each mapper should connect to, via their
+  // dynamically generated JDBC URL...
+  public static final String ORAOOP_ORACLE_RAC_SERVICE_NAME =
+      "oraoop.oracle.rac.service.name";
+
+  // The log4j log-level for OraOop...
+  public static final String ORAOOP_LOGGING_LEVEL = "oraoop.logging.level";
+
+  // The file names for the configuration properties of OraOop...
+  public static final String ORAOOP_SITE_TEMPLATE_FILENAME =
+      "oraoop-site-template.xml";
+  public static final String ORAOOP_SITE_FILENAME = "oraoop-site.xml";
+
+  // A flag that indicates that the OraOop job has been cancelled.
+  // E.g. An Oracle DBA killed our Oracle session.
+  // public static final String ORAOOP_JOB_CANCELLED = "oraoop.job.cancelled";
+
+  // The SYSDATE from the Oracle database when this OraOop job was started.
+  // This is used to generate unique names for partitions and temporary tables
+  // that we create during the job...
+  public static final String ORAOOP_JOB_SYSDATE = "oraoop.job.sysdate";
+
+  // The properties are used internally by OraOop to indicate the schema and
+  // name of
+  // the table being imported/exported...
+  public static final String ORAOOP_TABLE_OWNER = "oraoop.table.owner";
+  public static final String ORAOOP_TABLE_NAME = "oraoop.table.name";
+
+  // Constants used to indicate the desired location of the WHERE clause within
+  // the SQL generated by the record-reader.
+  // E.g. A WHERE clause like "rownum <= 10" would want to be located so that
+  // it had an impact on the total number of rows returned by the split;
+  // as opposed to impacting the number of rows returned for each of the
+  // unioned data-chunks within each split.
+  public static final String ORAOOP_TABLE_IMPORT_WHERE_CLAUSE_LOCATION =
+      "oraoop.table.import.where.clause.location";
+
+  /**
+   * Location to place the WHERE clause.
+   */
+  public enum OraOopTableImportWhereClauseLocation {
+    SUBSPLIT, SPLIT
+  }
+
+  // The SQL statements to execute for each new Oracle session that is
+  // created...
+  public static final String ORAOOP_SESSION_INITIALIZATION_STATEMENTS =
+      "oraoop.oracle.session.initialization.statements";
+
+  // Reliably stores the number mappers requested for the sqoop map-reduce
+  // job...
+  public static final String ORAOOP_DESIRED_NUMBER_OF_MAPPERS =
+      "oraoop.desired.num.mappers";
+
+  // The minimum number of mappers required for OraOop to accept the import
+  // job...
+  public static final String ORAOOP_MIN_IMPORT_MAPPERS =
+      "oraoop.min.import.mappers";
+  public static final int MIN_NUM_IMPORT_MAPPERS_ACCEPTED_BY_ORAOOP = 2;
+
+  // The minimum number of mappers required for OraOop to accept the export
+  // job...
+  public static final String ORAOOP_MIN_EXPORT_MAPPERS =
+      "oraoop.min.export.mappers";
+  public static final int MIN_NUM_EXPORT_MAPPERS_ACCEPTED_BY_ORAOOP = 2;
+
+  // The query used to fetch oracle data chunks...
+  public static final String ORAOOP_ORACLE_DATA_CHUNKS_QUERY =
+      "oraoop.oracle.data.chunks.query";
+
+  // The minimum number of active instances in an Oracle RAC required for OraOop
+  // to use dynamically generated JDBC URLs...
+  public static final String ORAOOP_MIN_RAC_ACTIVE_INSTANCES =
+      "oraoop.min.rac.active.instances";
+  public static final int MIN_NUM_RAC_ACTIVE_INSTANCES_FOR_DYNAMIC_JDBC_URLS =
+      2;
+
+  // The name of the Oracle JDBC class...
+  public static final String ORACLE_JDBC_DRIVER_CLASS =
+      "oracle.jdbc.OracleDriver";
+
+  // How many rows to pre-fetch when executing Oracle queries...
+  public static final String ORACLE_ROW_FETCH_SIZE = "oracle.row.fetch.size";
+  public static final int ORACLE_ROW_FETCH_SIZE_DEFAULT = 5000;
+
+  // OraOop does not require a "--split-by" column to be defined...
+  public static final String TABLE_SPLIT_COLUMN_NOT_REQUIRED = "not-required";
+
+  // The name of the data_chunk_id column the OraOop appends to each (import)
+  // query...
+  public static final String COLUMN_NAME_DATA_CHUNK_ID = "data_chunk_id";
+
+  // The hint that will be used on the SELECT statement for import jobs
+  public static final String IMPORT_QUERY_HINT = "oraoop.import.hint";
+
+  // Pseudo-columns added to an partitioned export table (created by OraOop from
+  // a template table)
+  // to store the partition value and subpartition value. The partition value is
+  // the sysdate when
+  // the job was performed. The subpartition value is the mapper index...
+  public static final String COLUMN_NAME_EXPORT_PARTITION =
+      "ORAOOP_EXPORT_SYSDATE";
+  public static final String COLUMN_NAME_EXPORT_SUBPARTITION =
+      "ORAOOP_MAPPER_ID";
+  public static final String COLUMN_NAME_EXPORT_MAPPER_ROW =
+      "ORAOOP_MAPPER_ROW";
+
+  public static final String ORAOOP_EXPORT_PARTITION_DATE_VALUE =
+      "oraoop.export.partition.date.value";
+  public static final String ORAOOP_EXPORT_PARTITION_DATE_FORMAT =
+      "yyyy-mm-dd hh24:mi:ss";
+
+  // The string we want to pass to dbms_application_info.set_module() via the
+  // "module_name" parameter...
+  public static final String ORACLE_SESSION_MODULE_NAME = ORAOOP_PRODUCT_NAME;
+
+  // The name of the configuration property containing the string we want to
+  // pass to
+  // dbms_application_info.set_module() via the "action_name" parameter...
+  public static final String ORACLE_SESSION_ACTION_NAME =
+      "oraoop.oracle.session.module.action";
+
+  // Boolean whether to do a consistent read based off an SCN
+  public static final String ORAOOP_IMPORT_CONSISTENT_READ =
+      "oraoop.import.consistent.read";
+
+  // The SCN number to use for the consistent read - calculated automatically -
+  // cannot be overridden
+  public static final String ORAOOP_IMPORT_CONSISTENT_READ_SCN =
+      "oraoop.import.consistent.read.scn";
+
+  // The method that will be used to create data chunks - ROWID ranges or
+  // partitions
+  public static final String ORAOOP_ORACLE_DATA_CHUNK_METHOD =
+      "oraoop.chunk.method";
+
+  /**
+   * How should data be split up - by ROWID range, or by partition.
+   */
+  public enum OraOopOracleDataChunkMethod {
+    ROWID, PARTITION
+  }
+
+  // List of partitions to be imported, comma seperated list
+  public static final String ORAOOP_IMPORT_PARTITION_LIST =
+      "oraoop.import.partitions";
+
+  public static final OraOopOracleDataChunkMethod
+                          ORAOOP_ORACLE_DATA_CHUNK_METHOD_DEFAULT =
+                              OraOopOracleDataChunkMethod.ROWID;
+
+  // How to allocate data-chunks into splits...
+  public static final String ORAOOP_ORACLE_BLOCK_TO_SPLIT_ALLOCATION_METHOD =
+      "oraoop.block.allocation";
+
+  /**
+   * How splits should be allocated to the mappers.
+   */
+  public enum OraOopOracleBlockToSplitAllocationMethod {
+    ROUNDROBIN, SEQUENTIAL, RANDOM
+  }
+
+  // Whether to omit LOB and LONG columns during an import...
+  public static final String ORAOOP_IMPORT_OMIT_LOBS_AND_LONG =
+      "oraoop.import.omit.lobs.and.long";
+
+  // Identifies an existing Oracle table used to create a new table as the
+  // destination of a Sqoop export.
+  // Hence, use of this property implies that the "-table" does not exist in
+  // Oracle and OraOop should create it.
+  public static final String ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE =
+      "oraoop.template.table";
+
+  // If the table already exists that we want to create, should we drop it?...
+  public static final String ORAOOP_EXPORT_CREATE_TABLE_DROP =
+      "oraoop.drop.table";
+
+  // If ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE has been specified, then this flag
+  // indicates whether the created Oracle
+  // tables should have NOLOGGING...
+  public static final String ORAOOP_EXPORT_CREATE_TABLE_NO_LOGGING =
+      "oraoop.no.logging";
+
+  // If ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE has been specified, then this flag
+  // indicates whether the created Oracle
+  // tables should be partitioned by job and mapper...
+  public static final String ORAOOP_EXPORT_CREATE_TABLE_PARTITIONED =
+      "oraoop.partitioned";
+
+  // Indicates (internally) the the export table we're dealling with has been
+  // paritioned by OraOop...
+  public static final String EXPORT_TABLE_HAS_ORAOOP_PARTITIONS =
+      "oraoop.export.table.has.oraoop.partitions";
+
+  // When using the Oracle hint... /* +APPEND_VALUES */ ...a commit must be
+  // performed after each batch insert.
+  // Therefore, the batches need to be quite large to avoid a performance
+  // penality (for the 'extra' commits).
+  // This is the minimum batch size to use under these conditions...
+  public static final String ORAOOP_MIN_APPEND_VALUES_BATCH_SIZE =
+      "oraoop.min.append.values.batch.size";
+  public static final int ORAOOP_MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT = 5000;
+
+  // The version of the Oracle database we're connected to...
+  public static final String ORAOOP_ORACLE_DATABASE_VERSION_MAJOR =
+      "oraoop.oracle.database.version.major";
+  public static final String ORAOOP_ORACLE_DATABASE_VERSION_MINOR =
+      "oraoop.oracle.database.version.minor";
+
+  // When OraOop creates a table for a Sqoop export (from a template table) and
+  // the table contains partitions,
+  // this is the prefix of those partition names. (This also allows us to later
+  // identify partitions that OraOop
+  // created.)
+  public static final String EXPORT_TABLE_PARTITION_NAME_PREFIX = "ORAOOP_";
+
+  // When OraOop creates temporary tables for each mapper during a Sqoop export
+  // this is the prefix of table names...
+  public static final String EXPORT_MAPPER_TABLE_NAME_PREFIX = "ORAOOP_";
+
+  // The format string used to turn a DATE into a string for use within the
+  // names of Oracle objects
+  // that we create. For example, temporary tables, table partitions, table
+  // subpartitions...
+  public static final String ORACLE_OBJECT_NAME_DATE_TO_STRING_FORMAT_STRING =
+      "yyyymmdd_hh24miss";
+
+  // Indicates whether to perform a "merge" operation when performing a Sqoop
+  // export.
+  // If false, 'insert' statements will be used (i.e. no 'updates')...
+  public static final String ORAOOP_EXPORT_MERGE = "oraoop.export.merge";
+
+  // This property allows the user to enable parallelization during exports...
+  public static final String ORAOOP_EXPORT_PARALLEL =
+      "oraoop.export.oracle.parallelization.enabled";
+
+  // Flag used to indicate that the Oracle table contains at least one column of
+  // type BINARY_DOUBLE...
+  public static final String TABLE_CONTAINS_BINARY_DOUBLE_COLUMN =
+      "oraoop.table.contains.binary.double.column";
+  // Flag used to indicate that the Oracle table contains at least one column of
+  // type BINARY_FLOAT...
+  public static final String TABLE_CONTAINS_BINARY_FLOAT_COLUMN =
+      "oraoop.table.contains.binary.float.column";
+
+  // The storage clause to append to the end of any CREATE TABLE statements we
+  // execute for temporary Oracle tables...
+  public static final String ORAOOP_TEMPORARY_TABLE_STORAGE_CLAUSE =
+      "oraoop.temporary.table.storage.clause";
+
+  // The storage clause to append to the end of any CREATE TABLE statements we
+  // execute for permanent (export) Oracle tables...
+  public static final String ORAOOP_EXPORT_TABLE_STORAGE_CLAUSE =
+      "oraoop.table.storage.clause";
+
+  // Additional columns to include with the --update-key column...
+  public static final String ORAOOP_UPDATE_KEY_EXTRA_COLUMNS =
+      "oraoop.update.key.extra.columns";
+
+  // Should OraOop map Timestamps as java.sql.Timestamp as Sqoop does, or as
+  // String
+  public static final String ORAOOP_MAP_TIMESTAMP_AS_STRING =
+      "oraoop.timestamp.string";
+  public static final boolean ORAOOP_MAP_TIMESTAMP_AS_STRING_DEFAULT = true;
+
+  // This flag allows the user to force use of the APPEND_VALUES Oracle hint
+  // either ON, OFF or AUTO...
+  public static final String ORAOOP_ORACLE_APPEND_VALUES_HINT_USAGE =
+      "oraoop.oracle.append.values.hint.usage";
+
+  /**
+   * Whether to use the append values hint for exports.
+   */
+  public enum AppendValuesHintUsage {
+    AUTO, ON, OFF
+  }
+
+  // http://download.oracle.com/docs/cd/E11882_01/server.112/e17118/
+  //     sql_elements001.htm#i45441
+  public static final String SUPPORTED_IMPORT_ORACLE_DATA_TYPES_CLAUSE =
+      "(DATA_TYPE IN ("
+          +
+          // "'BFILE',"+
+          "'BINARY_DOUBLE',"
+          + "'BINARY_FLOAT',"
+          + "'BLOB',"
+          + "'CHAR',"
+          + "'CLOB',"
+          + "'DATE',"
+          + "'FLOAT',"
+          + "'LONG',"
+          +
+          // "'LONG RAW',"+
+          // "'MLSLABEL',"+
+          "'NCHAR',"
+          + "'NCLOB',"
+          + "'NUMBER',"
+          + "'NVARCHAR2',"
+          + "'RAW',"
+          + "'ROWID',"
+          +
+          // "'UNDEFINED',"+
+          "'URITYPE',"
+          +
+          // "'UROWID',"+ //<- SqlType = 1111 = "OTHER" Not supported as
+          // "AAAAACAADAAAAAEAAF" is being returned as "AAAAAAgADAAAA"
+          "'VARCHAR2'"
+          + // <- Columns declared as VARCHAR are listed as VARCHAR2 in
+            // dba_tabl_columns
+          // "'XMLTYPE',"+
+          ")" + " OR DATA_TYPE LIKE 'INTERVAL YEAR(%) TO MONTH'"
+          + " OR DATA_TYPE LIKE 'INTERVAL DAY(%) TO SECOND(%)'"
+          + " OR DATA_TYPE LIKE 'TIMESTAMP(%)'"
+          + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH TIME ZONE'"
+          + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH LOCAL TIME ZONE'" + ")";
+
+  public static final String SUPPORTED_EXPORT_ORACLE_DATA_TYPES_CLAUSE =
+      "(DATA_TYPE IN ("
+          +
+          // "'BFILE',"+
+          "'BINARY_DOUBLE',"
+          + "'BINARY_FLOAT',"
+          +
+          // "'BLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data
+          "'CHAR',"
+          +
+          // "'CLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data
+          "'DATE',"
+          + "'FLOAT',"
+          +
+          // "'LONG',"+ //<- "create table as select..." and
+          // "insert into table as select..." do not work when a long column
+          // exists.
+          // "'LONG RAW',"+
+          // "'MLSLABEL',"+
+          "'NCHAR',"
+          +
+          // "'NCLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data
+          "'NUMBER',"
+          + "'NVARCHAR2',"
+          +
+          // "'RAW',"+
+          "'ROWID',"
+          +
+          // "'UNDEFINED',"+
+          "'URITYPE',"
+          +
+          // "'UROWID',"+ //<- SqlType = 1111 = "OTHER" Not supported as
+          // "AAAAACAADAAAAAEAAF" is being returned as "AAAAAAgADAAAA"
+          "'VARCHAR2'"
+          + // <- Columns declared as VARCHAR are listed as VARCHAR2 in
+            // dba_tabl_columns
+          // "'XMLTYPE',"+
+          ")" + " OR DATA_TYPE LIKE 'INTERVAL YEAR(%) TO MONTH'"
+          + " OR DATA_TYPE LIKE 'INTERVAL DAY(%) TO SECOND(%)'"
+          + " OR DATA_TYPE LIKE 'TIMESTAMP(%)'"
+          + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH TIME ZONE'"
+          + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH LOCAL TIME ZONE'" + ")";
+
+  // public static final int[] SUPPORTED_ORACLE_DATA_TYPES = {
+  // oracle.jdbc.OracleTypes.BIT // -7;
+  // ,oracle.jdbc.OracleTypes.TINYINT // -6;
+  // ,oracle.jdbc.OracleTypes.SMALLINT // 5;
+  // ,oracle.jdbc.OracleTypes.INTEGER // 4;
+  // ,oracle.jdbc.OracleTypes.BIGINT // -5;
+  // ,oracle.jdbc.OracleTypes.FLOAT // 6;
+  // ,oracle.jdbc.OracleTypes.REAL // 7;
+  // ,oracle.jdbc.OracleTypes.DOUBLE // 8;
+  // ,oracle.jdbc.OracleTypes.NUMERIC // 2;
+  // ,oracle.jdbc.OracleTypes.DECIMAL // 3;
+  // ,oracle.jdbc.OracleTypes.CHAR // 1;
+  // ,oracle.jdbc.OracleTypes.VARCHAR // 12;
+  // ,oracle.jdbc.OracleTypes.LONGVARCHAR // -1;
+  // ,oracle.jdbc.OracleTypes.DATE // 91;
+  // ,oracle.jdbc.OracleTypes.TIME // 92;
+  // ,oracle.jdbc.OracleTypes.TIMESTAMP // 93;
+  // // ,oracle.jdbc.OracleTypes.TIMESTAMPNS // -100; //<- Deprecated
+  // ,oracle.jdbc.OracleTypes.TIMESTAMPTZ // -101;
+  // ,oracle.jdbc.OracleTypes.TIMESTAMPLTZ // -102;
+  // ,oracle.jdbc.OracleTypes.INTERVALYM // -103;
+  // ,oracle.jdbc.OracleTypes.INTERVALDS // -104;
+  // ,oracle.jdbc.OracleTypes.BINARY // -2;
+  // /// ,oracle.jdbc.OracleTypes.VARBINARY // -3;
+  // ,oracle.jdbc.OracleTypes.LONGVARBINARY // -4;
+  // ,oracle.jdbc.OracleTypes.ROWID // -8;
+  // ,oracle.jdbc.OracleTypes.CURSOR // -10;
+  // ,oracle.jdbc.OracleTypes.BLOB // 2004;
+  // ,oracle.jdbc.OracleTypes.CLOB // 2005;
+  // // ,oracle.jdbc.OracleTypes.BFILE // -13;
+  // // ,oracle.jdbc.OracleTypes.STRUCT // 2002;
+  // // ,oracle.jdbc.OracleTypes.ARRAY // 2003;
+  // ,oracle.jdbc.OracleTypes.REF // 2006;
+  // ,oracle.jdbc.OracleTypes.NCHAR // -15;
+  // ,oracle.jdbc.OracleTypes.NCLOB // 2011;
+  // ,oracle.jdbc.OracleTypes.NVARCHAR // -9;
+  // ,oracle.jdbc.OracleTypes.LONGNVARCHAR // -16;
+  // // ,oracle.jdbc.OracleTypes.SQLXML // 2009;
+  // // ,oracle.jdbc.OracleTypes.OPAQUE // 2007;
+  // // ,oracle.jdbc.OracleTypes.JAVA_STRUCT // 2008;
+  // // ,oracle.jdbc.OracleTypes.JAVA_OBJECT // 2000;
+  // // ,oracle.jdbc.OracleTypes.PLSQL_INDEX_TABLE // -14;
+  // ,oracle.jdbc.OracleTypes.BINARY_FLOAT // 100;
+  // ,oracle.jdbc.OracleTypes.BINARY_DOUBLE // 101;
+  // ,oracle.jdbc.OracleTypes.NULL // 0;
+  // ,oracle.jdbc.OracleTypes.NUMBER // 2;
+  // // ,oracle.jdbc.OracleTypes.RAW // -2;
+  // // ,oracle.jdbc.OracleTypes.OTHER // 1111;
+  // ,oracle.jdbc.OracleTypes.FIXED_CHAR // 999;
+  // // ,oracle.jdbc.OracleTypes.DATALINK // 70;
+  // ,oracle.jdbc.OracleTypes.BOOLEAN // 16;
+  // };
+
+  /**
+   * Constants for things belonging to sqoop...
+   */
+  public static final class Sqoop {
+    private Sqoop() {
+    }
+
+    /**
+     * What type of Sqoop tool is being run.
+     */
+    public enum Tool {
+      UNKNOWN, IMPORT, EXPORT
+    }
+
+    public static final String IMPORT_TOOL_NAME = "import";
+    public static final String MAX_MAPREDUCE_ATTEMPTS =
+        "mapred.map.max.attempts";
+  }
+
+/**
+ * Constants for things belonging to Oracle...
+ */
+  public static final class Oracle {
+    private Oracle() {
+    }
+
+    public static final int ROWID_EXTENDED_ROWID_TYPE = 1;
+    public static final int ROWID_MAX_ROW_NUMBER_PER_BLOCK = 32767;
+
+    // This is how you comment-out a line of SQL text in Oracle.
+    public static final String ORACLE_SQL_STATEMENT_COMMENT_TOKEN = "--";
+
+    public static final String OBJECT_TYPE_TABLE = "TABLE";
+
+    public static final String URITYPE = "URITYPE";
+
+    public static final int MAX_IDENTIFIER_LENGTH = 30; // <- Max length of an
+                                                        // Oracle name
+                                                        // (table-name,
+                                                        // partition-name etc.)
+
+    public static final String HINT_SYNTAX = "/*+ %s */ "; // Syntax for a hint
+                                                           // in Oracle
+  }
+
+  /**
+   * Logging constants.
+   */
+  public static class Logging {
+    /**
+     * Level of log to output.
+     */
+    public enum Level {
+      TRACE, DEBUG, INFO, WARN, ERROR, FATAL
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OraOopDBInputSplit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopDBInputSplit.java b/src/java/org/apache/sqoop/manager/oracle/OraOopDBInputSplit.java
new file mode 100644
index 0000000..93efa76
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopDBInputSplit.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.Text;
+import com.cloudera.sqoop.mapreduce.db.DBInputFormat;
+
+class OraOopDBInputSplit extends DBInputFormat.DBInputSplit {
+
+  private int splitId;
+  private double totalNumberOfBlocksInAllSplits;
+  private String splitLocation;
+  private List<OraOopOracleDataChunk> oracleDataChunks;
+
+  // NB: Update write(), readFields() and getDebugDetails() if you add fields
+  // here.
+
+  public OraOopDBInputSplit() {
+
+    this.splitId = -1;
+    this.splitLocation = "";
+    this.oracleDataChunks = new ArrayList<OraOopOracleDataChunk>();
+  }
+
+  public OraOopDBInputSplit(List<OraOopOracleDataChunk> dataChunks) {
+
+    setOracleDataChunks(dataChunks);
+  }
+
+  public void setOracleDataChunks(List<OraOopOracleDataChunk> dataChunks) {
+
+    this.oracleDataChunks = dataChunks;
+  }
+
+  public List<OraOopOracleDataChunk> getDataChunks() {
+
+    return this.oracleDataChunks;
+  }
+
+  public int getNumberOfDataChunks() {
+
+    if (this.getDataChunks() == null) {
+      return 0;
+    } else {
+      return this.getDataChunks().size();
+    }
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+
+    if (this.splitLocation.isEmpty()) {
+      return new String[] {};
+    } else {
+      return new String[] { this.splitLocation };
+    }
+
+  }
+
+  /**
+   * @return The total number of blocks within the data-chunks of this split
+   */
+  @Override
+  public long getLength() {
+
+    return this.getTotalNumberOfBlocksInThisSplit();
+  }
+
+  public int getTotalNumberOfBlocksInThisSplit() {
+
+    if (this.getNumberOfDataChunks() == 0) {
+      return 0;
+    }
+
+    int result = 0;
+    for (OraOopOracleDataChunk dataChunk : this.getDataChunks()) {
+      result += dataChunk.getNumberOfBlocks();
+    }
+
+    return result;
+  }
+
+  public OraOopOracleDataChunk findDataChunkById(String id) {
+
+    for (OraOopOracleDataChunk dataChunk : this.getDataChunks()) {
+      if (dataChunk.getId().equals(id)) {
+        return dataChunk;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public void write(DataOutput output) throws IOException {
+
+    output.writeInt(splitId);
+
+    if (this.oracleDataChunks == null) {
+      output.writeInt(0);
+    } else {
+      output.writeInt(this.oracleDataChunks.size());
+      for (OraOopOracleDataChunk dataChunk : this.oracleDataChunks) {
+        Text.writeString(output, dataChunk.getClass().getName());
+        dataChunk.write(output);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  /** {@inheritDoc} */
+  public void readFields(DataInput input) throws IOException {
+
+    this.splitId = input.readInt();
+
+    int dataChunkCount = input.readInt();
+    if (dataChunkCount == 0) {
+      this.oracleDataChunks = null;
+    } else {
+      Class<? extends OraOopOracleDataChunk> dataChunkClass;
+      OraOopOracleDataChunk dataChunk;
+      this.oracleDataChunks =
+          new ArrayList<OraOopOracleDataChunk>(dataChunkCount);
+      for (int idx = 0; idx < dataChunkCount; idx++) {
+        try {
+          dataChunkClass =
+              (Class<? extends OraOopOracleDataChunk>) Class.forName(Text
+                  .readString(input));
+          dataChunk = dataChunkClass.newInstance();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+        dataChunk.readFields(input);
+        this.oracleDataChunks.add(dataChunk);
+      }
+    }
+  }
+
+  public String getDebugDetails() {
+
+    StringBuilder result = new StringBuilder();
+
+    if (this.getNumberOfDataChunks() == 0) {
+      result.append(String.format(
+          "Split[%s] does not contain any Oracle data-chunks.", this.splitId));
+    } else {
+      result.append(String.format(
+          "Split[%s] includes the Oracle data-chunks:\n", this.splitId));
+      for (OraOopOracleDataChunk dataChunk : getDataChunks()) {
+        result.append(dataChunk.toString());
+      }
+    }
+    return result.toString();
+  }
+
+  protected int getSplitId() {
+    return this.splitId;
+  }
+
+  protected void setSplitId(int newSplitId) {
+    this.splitId = newSplitId;
+  }
+
+  protected void setSplitLocation(String newSplitLocation) {
+    this.splitLocation = newSplitLocation;
+  }
+
+  protected void setTotalNumberOfBlocksInAllSplits(
+      int newTotalNumberOfBlocksInAllSplits) {
+    this.totalNumberOfBlocksInAllSplits = newTotalNumberOfBlocksInAllSplits;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OraOopDBRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopDBRecordReader.java b/src/java/org/apache/sqoop/manager/oracle/OraOopDBRecordReader.java
new file mode 100644
index 0000000..45a88ef
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopDBRecordReader.java
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBInputFormat;
+import com.cloudera.sqoop.mapreduce.db.DBInputFormat.DBInputSplit;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBRecordReader;
+import org.apache.sqoop.manager.oracle.OraOopConstants.
+           OraOopTableImportWhereClauseLocation;
+import org.apache.sqoop.manager.oracle.OraOopUtilities.OraOopStatsReports;
+
+/*
+ * NOTES:
+ *
+ *   T is the output-type of this record reader.
+ *
+ *   getFieldNames() is overridden to insert an "data_chunk_id" column
+ *   containing the id (integer) of the Oracle data-chunk the data
+ *   was obtained from. This is used to calculate the "percentage complete"
+ *   for this mapper.
+ *
+ *   getSelectQuery() is overridden to inject the actual data_chunk_id number
+ *   into the query that is executed (for the data-chunk being processed).
+ *
+ *   This class extends DBRecordReader. Unfortunately, DBRecordReader does
+ *   not expose its results property (of type ResultSet), so we have to
+ *   override executeQuery() in order to obtain a reference to the data
+ *   obtained when the SQL generated by getSelectQuery() is executed.
+ */
+class OraOopDBRecordReader<T extends SqoopRecord> extends
+    DataDrivenDBRecordReader<T> {
+
+  private static final OraOopLog LOG = OraOopLogFactory
+      .getLog(OraOopDBRecordReader.class);
+
+  private OraOopDBInputSplit dbInputSplit; // <- The split this record-reader is
+                                           // working on.
+  private int numberOfBlocksInThisSplit; // <- The number of Oracle blocks in
+                                         // this Oracle data-chunk.
+  private int numberOfBlocksProcessedInThisSplit; // <- How many Oracle blocks
+                                                  // we've processed with this
+                                                  // record-reader.
+  private String currentDataChunkId; // <- The id of the current data-chunk
+                                     // being processed
+  private ResultSet results; // <- The ResultSet containing the data from the
+                             // query returned by getSelectQuery()
+  private int columnIndexDataChunkIdZeroBased = -1; // <- The zero-based column
+                                                    // index of the
+                                                    // data_chunk_id column.
+  private boolean progressCalculationErrorLogged; // <- Whether we've logged a
+                                                  // problem with the progress
+                                                  // calculation during
+                                                  // nextKeyValue().
+  private Object oraOopOraStats; // <- A reference to the Oracle statistics
+                                 // object that is being tracked for this Oracle
+                                 // session.
+  private boolean profilingEnabled; // <- Whether to collect profiling metrics
+  private long timeSpentInNextKeyValueInNanoSeconds; // <- Total time spent in
+                                                     // super.nextKeyValue()
+
+  public OraOopDBRecordReader(DBInputFormat.DBInputSplit split,
+      Class<T> inputClass, Configuration conf, Connection conn,
+      DBConfiguration dbConfig, String cond, String[] fields, String table)
+      throws SQLException {
+
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table,
+        "ORACLE-ORAOOP");
+
+    OraOopUtilities.enableDebugLoggingIfRequired(conf);
+
+    this.dbInputSplit = castSplit(split);
+
+    String thisOracleInstanceName =
+        OraOopOracleQueries.getCurrentOracleInstanceName(conn);
+    LOG.info(String.format(
+        "This record reader is connected to Oracle via the JDBC URL: \n"
+            + "\t\"%s\"\n" + "\tto the Oracle instance: \"%s\"", conn
+            .toString(), thisOracleInstanceName));
+
+    OracleConnectionFactory.initializeOracleConnection(conn, conf);
+
+    if (OraOopUtilities.userWantsOracleSessionStatisticsReports(conf)) {
+      this.oraOopOraStats = OraOopUtilities.startSessionSnapshot(conn);
+    }
+
+    this.numberOfBlocksInThisSplit =
+        this.dbInputSplit.getTotalNumberOfBlocksInThisSplit();
+    this.numberOfBlocksProcessedInThisSplit = 0;
+
+    this.profilingEnabled = conf.getBoolean("oraoop.profiling.enabled", false);
+  }
+
+  public static OraOopDBInputSplit castSplit(DBInputSplit split) {
+
+    // Check there's a split available...
+    if (split == null) {
+      throw new IllegalArgumentException("The DBInputSplit cannot be null.");
+    }
+
+    // Check that the split is the correct type...
+    Class<?> desiredSplitClass = OraOopDBInputSplit.class;
+    if (!(split.getClass() == desiredSplitClass)) {
+      String errMsg =
+          String.format("The type of Split available within %s "
+              + "should be an instance of class %s, "
+              + "but is actually an instance of class %s", OraOopUtilities
+              .getCurrentMethodName(), desiredSplitClass.getName(), split
+              .getClass().getName());
+      throw new RuntimeException(errMsg);
+    }
+
+    // TODO Cast this using desiredSplitClass, so we only need 1 line of code
+    // that
+    // identifies the type of the split class...
+    // inputSplit = (desiredSplitClass)this.getSplit();
+    return (OraOopDBInputSplit) split;
+  }
+
+  @Override
+  protected String[] getFieldNames() {
+
+    String[] fieldNames = super.getFieldNames();
+    ArrayList<String> result = new ArrayList<String>();
+
+    for (int idx = 0; idx < fieldNames.length; idx++) {
+      result.add(fieldNames[idx]);
+    }
+
+    result.add(OraOopConstants.COLUMN_NAME_DATA_CHUNK_ID);
+    this.columnIndexDataChunkIdZeroBased = result.size() - 1;
+
+    return result.toArray(new String[result.size()]);
+  }
+
+  @Override
+  protected String getSelectQuery() {
+
+    boolean consistentRead =
+        this.getDBConf().getConf().getBoolean(
+            OraOopConstants.ORAOOP_IMPORT_CONSISTENT_READ, false);
+    long consistentReadScn =
+        this.getDBConf().getConf().getLong(
+            OraOopConstants.ORAOOP_IMPORT_CONSISTENT_READ_SCN, 0L);
+    if (consistentRead && consistentReadScn == 0L) {
+      throw new RuntimeException("Could not get SCN for consistent read.");
+    }
+
+    StringBuilder query = new StringBuilder();
+
+    if (this.dbInputSplit.getDataChunks() == null) {
+      String errMsg =
+          String.format("The %s does not contain any data-chunks, within %s.",
+              this.dbInputSplit.getClass().getName(), OraOopUtilities
+                  .getCurrentMethodName());
+      throw new RuntimeException(errMsg);
+    }
+
+    OraOopConstants.OraOopTableImportWhereClauseLocation whereClauseLocation =
+        OraOopUtilities.getOraOopTableImportWhereClauseLocation(this
+            .getDBConf().getConf(),
+            OraOopConstants.OraOopTableImportWhereClauseLocation.SUBSPLIT);
+
+    OracleTable tableContext = getOracleTableContext();
+    OracleTableColumns tableColumns = null;
+    try {
+
+      Configuration conf = this.getDBConf().getConf();
+
+      tableColumns =
+          OraOopOracleQueries.getTableColumns(getConnection(), tableContext,
+              OraOopUtilities.omitLobAndLongColumnsDuringImport(conf),
+              OraOopUtilities.recallSqoopJobType(conf)
+              , true // <- onlyOraOopSupportedTypes
+              , true // <- omitOraOopPseudoColumns
+              );
+    } catch (SQLException ex) {
+      LOG.error(String.format(
+          "Unable to obtain the data-types of the columns in table %s.\n"
+              + "Error:\n%s", tableContext.toString(), ex.getMessage()));
+      throw new RuntimeException(ex);
+    }
+
+    int numberOfDataChunks = this.dbInputSplit.getNumberOfDataChunks();
+    for (int idx = 0; idx < numberOfDataChunks; idx++) {
+
+      OraOopOracleDataChunk dataChunk =
+          this.dbInputSplit.getDataChunks().get(idx);
+
+      if (idx > 0) {
+        query.append("UNION ALL \n");
+      }
+
+      query.append(getColumnNamesClause(tableColumns, dataChunk.getId())) // <-
+                                                                     // SELECT
+                                                                     // clause
+          .append("\n");
+
+      query.append(" FROM ").append(this.getTableName()).append(" ");
+
+      if (consistentRead) {
+        query.append("AS OF SCN ").append(consistentReadScn).append(" ");
+      }
+
+      query.append(getPartitionClauseForDataChunk(this.dbInputSplit, idx))
+          .append(" t").append("\n");
+
+      query.append(" WHERE (").append(
+          getWhereClauseForDataChunk(this.dbInputSplit, idx)).append(")\n");
+
+      // If the user wants the WHERE clause applied to each data-chunk...
+      if (whereClauseLocation
+              == OraOopTableImportWhereClauseLocation.SUBSPLIT) {
+        String conditions = this.getConditions();
+        if (conditions != null && conditions.length() > 0) {
+          query.append(" AND (").append(conditions).append(")\n");
+        }
+      }
+
+    }
+
+    // If the user wants the WHERE clause applied to the whole split...
+    if (whereClauseLocation == OraOopTableImportWhereClauseLocation.SPLIT) {
+      String conditions = this.getConditions();
+      if (conditions != null && conditions.length() > 0) {
+
+        // Insert a "select everything" line at the start of the SQL query...
+        query.insert(0, getColumnNamesClause(tableColumns, null) + " FROM (\n");
+
+        // ...and then apply the WHERE clause to all the UNIONed sub-queries...
+        query.append(")\n").append("WHERE\n").append(conditions).append("\n");
+      }
+    }
+
+    LOG.info("SELECT QUERY = \n" + query.toString());
+
+    return query.toString();
+  }
+
+  private String getColumnNamesClause(OracleTableColumns tableColumns,
+      String dataChunkId) {
+
+    StringBuilder result = new StringBuilder();
+
+    result.append("SELECT ");
+    result.append(OraOopUtilities.getImportHint(this.getDBConf().getConf()));
+
+    String[] fieldNames = this.getFieldNames();
+
+    int firstFieldIndex = 0;
+    int lastFieldIndex = fieldNames.length - 1;
+    for (int i = firstFieldIndex; i <= lastFieldIndex; i++) {
+      if (i > firstFieldIndex) {
+        result.append(",");
+      }
+      String fieldName = fieldNames[i];
+
+      OracleTableColumn oracleTableColumn =
+          tableColumns.findColumnByName(fieldName);
+      if (oracleTableColumn != null) {
+        if (oracleTableColumn.getDataType().equals(
+            OraOopConstants.Oracle.URITYPE)) {
+          fieldName = String.format("uritype.geturl(%s)", fieldName);
+        }
+      }
+
+      // If this field is the "data_chunk_id" that we inserted during
+      // getFields()
+      // then we need to insert the value of that data_chunk_id now...
+      if (i == this.columnIndexDataChunkIdZeroBased
+          && fieldName == OraOopConstants.COLUMN_NAME_DATA_CHUNK_ID) {
+        if (dataChunkId != null && !dataChunkId.isEmpty()) {
+          fieldName =
+              String.format("'%s' %s", dataChunkId,
+                  OraOopConstants.COLUMN_NAME_DATA_CHUNK_ID);
+        }
+      }
+
+      result.append(fieldName);
+    }
+    return result.toString();
+  }
+
+  private String getPartitionClauseForDataChunk(OraOopDBInputSplit split,
+      int dataChunkIndex) {
+    OraOopOracleDataChunk dataChunk = split.getDataChunks().get(dataChunkIndex);
+    return dataChunk.getPartitionClause();
+  }
+
+  private String getWhereClauseForDataChunk(OraOopDBInputSplit split,
+      int dataChunkIndex) {
+
+    OraOopOracleDataChunk dataChunk = split.getDataChunks().get(dataChunkIndex);
+    return dataChunk.getWhereClause();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getPos() throws IOException {
+
+    // This split contains multiple data-chunks.
+    // Each data-chunk contains multiple blocks.
+    // Return the number of blocks that have been processed by this split...
+    return numberOfBlocksProcessedInThisSplit;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public float getProgress() throws IOException {
+
+    return numberOfBlocksProcessedInThisSplit
+        / (float) numberOfBlocksInThisSplit;
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException {
+
+    boolean result = false;
+    try {
+
+      long startTime = 0;
+      if (this.profilingEnabled) {
+        startTime = System.nanoTime();
+      }
+
+      result = super.nextKeyValue();
+
+      if (this.profilingEnabled) {
+        this.timeSpentInNextKeyValueInNanoSeconds +=
+            System.nanoTime() - startTime;
+      }
+
+      // Keep track of which data-chunk we're processing, and therefore how many
+      // Oracle blocks we've processed. This can be used to calculate our
+      // "percentage complete"...
+      if (result && this.results != null) {
+
+        String thisDataChunkId = null;
+        try {
+          // ColumnIndexes are 1-based in jdbc...
+          thisDataChunkId =
+              this.results.getString(this.columnIndexDataChunkIdZeroBased + 1);
+        } catch (SQLException ex) {
+          if (!progressCalculationErrorLogged) {
+            // This prevents us from flooding the log with the same message
+            // thousands of times...
+            progressCalculationErrorLogged = true;
+
+            LOG.warn(String
+                .format(
+                  "Unable to obtain the value of the %s column in method %s.\n"
+                      + "\tthis.columnIndexDataChunkIdZeroBased = %d (NB: "
+                      + "jdbc field indexes are 1-based)\n\tAs a consequence, "
+                      + "progress for the record-reader cannot be calculated.\n"
+                      + "\tError=\n%s",
+                  OraOopConstants.COLUMN_NAME_DATA_CHUNK_ID, OraOopUtilities
+                      .getCurrentMethodName(),
+                  this.columnIndexDataChunkIdZeroBased, ex.getMessage()));
+          }
+        }
+
+        if (thisDataChunkId != null
+            && !thisDataChunkId.equals(this.currentDataChunkId)) {
+          if (this.currentDataChunkId != null
+              && !this.currentDataChunkId.isEmpty()) {
+            OraOopOracleDataChunk dataChunk =
+                this.dbInputSplit.findDataChunkById(thisDataChunkId);
+            if (dataChunk != null) {
+              this.numberOfBlocksProcessedInThisSplit +=
+                  dataChunk.getNumberOfBlocks();
+            }
+          }
+          this.currentDataChunkId = thisDataChunkId;
+        }
+      }
+    } catch (IOException ex) {
+      if (OraOopUtilities.oracleSessionHasBeenKilled(ex)) {
+        LOG.info("\n*********************************************************"
+            + "\nThe Oracle session in use has been killed by a 3rd party."
+            + "\n*********************************************************");
+      }
+      throw ex;
+    }
+
+    return result;
+  }
+
+  @Override
+  protected ResultSet executeQuery(String query) throws SQLException {
+
+    try {
+      this.results = super.executeQuery(query);
+      return this.results;
+    } catch (SQLException ex) {
+      LOG.error(String.format("Error in %s while executing the SQL query:\n"
+          + "%s\n\n" + "%s", OraOopUtilities.getCurrentMethodName(), query, ex
+          .getMessage()));
+      throw ex;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+
+    if (this.profilingEnabled) {
+      LOG.info(String.format(
+          "Time spent in super.nextKeyValue() = %s seconds.",
+          this.timeSpentInNextKeyValueInNanoSeconds / Math.pow(10, 9)));
+    }
+
+    if (OraOopUtilities.userWantsOracleSessionStatisticsReports(getDBConf()
+        .getConf())) {
+      OraOopStatsReports reports =
+          OraOopUtilities.stopSessionSnapshot(this.oraOopOraStats);
+      this.oraOopOraStats = null;
+
+      LOG.info(String.format("Oracle Statistics Report for OraOop:\n\n%s",
+          reports.getPerformanceReport()));
+
+      String fileName =
+          String.format("oracle-stats-csv-%d", this.dbInputSplit.getSplitId());
+      OraOopUtilities.writeOutputFile(this.getDBConf().getConf(), fileName,
+          reports.getCsvReport());
+
+      fileName =
+          String.format("oracle-stats-%d", this.dbInputSplit.getSplitId());
+      OraOopUtilities.writeOutputFile(this.getDBConf().getConf(), fileName,
+          reports.getPerformanceReport());
+    }
+
+    super.close();
+  }
+
+  public OracleTable getOracleTableContext() {
+
+    Configuration conf = this.getDBConf().getConf();
+    OracleTable result =
+        new OracleTable(conf.get(OraOopConstants.ORAOOP_TABLE_OWNER), conf
+            .get(OraOopConstants.ORAOOP_TABLE_NAME));
+    return result;
+  }
+
+}