You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/11/05 18:40:59 UTC

[5/6] sqoop git commit: SQOOP-2595: Add Oracle connector to Sqoop 2

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcLoader.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcLoader.java
new file mode 100644
index 0000000..b741dc8
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcLoader.java
@@ -0,0 +1,615 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumn;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumns;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.InsertMode;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.UpdateMode;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleVersion;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
+import org.apache.sqoop.schema.type.Column;
+import org.joda.time.LocalDateTime;
+
+public class OracleJdbcLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
+
+  private static final Logger LOG =
+      Logger.getLogger(OracleJdbcToDestroyer.class);
+
+  private long rowsWritten = 0;
+  private LoaderContext context;
+  private Connection connection;
+  private OracleVersion oracleVersion;
+  private OracleTable table; // <- If exporting into a partitioned
+                               // table, this table will be unique for
+                               // this mapper
+  private OracleTableColumns tableColumns; // <- The columns in the
+                                           // table we're inserting rows
+                                           // into
+  private int mapperId; // <- The index of this Hadoop mapper
+  private boolean tableHasMapperRowNumberColumn; // <- Whether the export
+                                                   // table contain the column
+                                                   // SQOOP_MAPPER_ROW
+  private long mapperRowNumber; // <- The 1-based row number being processed
+                                  // by this mapper. It's inserted into the
+                                  // "SQOOP_MAPPER_ROW" column
+  private boolean useAppendValuesOracleHint = false; // <- Whether to use the
+                                                     // " /*+APPEND_VALUES*/ " hint
+                                                     // within the Oracle SQL
+                                                     // statement we generate
+  private long numberOfRowsSkipped; // <- The number of rows encountered
+                                    // during configurePreparedStatement()
+                                    // that had a NULL value for (one of) the
+                                    // update columns. This row was therefore
+                                    // skipped.
+  private String[] updateColumnNames;
+  private int rowsPerBatch;
+  private int rowsPerCommit;
+
+
+  private void setupInsert(LinkConfiguration linkConfiguration,
+      ToJobConfiguration jobConfiguration) throws SQLException {
+    // Is each mapper inserting rows into a unique table?...
+    InsertMode insertMode = OracleUtilities.getExportInsertMode(
+        jobConfiguration.toJobConfig, context.getContext());
+
+    if(insertMode==InsertMode.ExchangePartition) {
+      Object sysDateTime =
+          OracleUtilities.recallOracleDateTime(context.getContext());
+      table = OracleUtilities.generateExportTableMapperTableName(
+          mapperId, sysDateTime, null);
+
+    } else {
+      table = OracleUtilities.decodeOracleTableName(
+          linkConfiguration.connectionConfig.username,
+          jobConfiguration.toJobConfig.tableName);
+    }
+    tableColumns = OracleQueries.getToTableColumns(
+        connection, table, true, false);
+    tableHasMapperRowNumberColumn =
+        tableColumns.findColumnByName(
+            OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_MAPPER_ROW) != null;
+
+    // Should we use the APPEND_VALUES Oracle hint?...
+    useAppendValuesOracleHint = false;
+    if (insertMode == InsertMode.ExchangePartition) {
+      // NB: "Direct inserts" cannot utilize APPEND_VALUES, otherwise Oracle
+      // will serialize
+      // the N mappers, causing a lot of lock contention.
+      useAppendValuesOracleHint = canUseOracleAppendValuesHint();
+    }
+  }
+
+  private void setupUpdate(LinkConfiguration linkConfiguration,
+      ToJobConfiguration jobConfiguration) throws SQLException {
+    UpdateMode updateMode = OracleUtilities.getExportUpdateMode(
+        jobConfiguration.toJobConfig);
+
+    Object sysDateTime =
+        OracleUtilities.recallOracleDateTime(context.getContext());
+    table = OracleUtilities.generateExportTableMapperTableName(
+        mapperId, sysDateTime, null);
+
+    updateColumnNames = OracleUtilities.
+        getExportUpdateKeyColumnNames(jobConfiguration.toJobConfig);
+
+    tableColumns = OracleQueries.getToTableColumns(
+        connection, table, true, false);
+
+    if (updateMode == UpdateMode.Merge || updateMode == UpdateMode.Update) {
+      // Should we use the APPEND_VALUES Oracle hint?...
+      useAppendValuesOracleHint = canUseOracleAppendValuesHint();
+    }
+
+  }
+
+  @Override
+  public void load(LoaderContext context, LinkConfiguration linkConfiguration,
+      ToJobConfiguration jobConfiguration) throws Exception {
+    LOG.debug("Running Oracle JDBC connector loader");
+    this.context = context;
+
+    //TODO: Mapper ID
+    mapperId = 1;
+    //TODO: Hardcoded values
+    rowsPerBatch = 5000;
+    rowsPerCommit = 5000;
+
+    // Retrieve the JDBC URL that should be used by this mapper.
+    // We achieve this by modifying the JDBC URL property in the
+    // configuration, prior to the OraOopDBRecordWriter's (ancestral)
+    // constructor using the configuration to establish a connection
+    // to the database - via DBConfiguration.getConnection()...
+    String mapperJdbcUrlPropertyName =
+        OracleUtilities.getMapperJdbcUrlPropertyName(mapperId);
+
+    // Get this mapper's JDBC URL
+    String mapperJdbcUrl = context.getString(mapperJdbcUrlPropertyName, null);
+
+    LOG.debug(String.format("Mapper %d has a JDBC URL of: %s", mapperId,
+        mapperJdbcUrl == null ? "<null>" : mapperJdbcUrl));
+
+    connection = OracleConnectionFactory.createOracleJdbcConnection(
+        OracleJdbcConnectorConstants.ORACLE_JDBC_DRIVER_CLASS,
+        mapperJdbcUrl,
+        linkConfiguration.connectionConfig.username,
+        linkConfiguration.connectionConfig.password);
+    String thisOracleInstanceName =
+        OracleQueries.getCurrentOracleInstanceName(connection);
+    LOG.info(String.format(
+        "This record writer is connected to Oracle via the JDBC URL: \n"
+            + "\t\"%s\"\n" + "\tto the Oracle instance: \"%s\"", connection
+            .toString(), thisOracleInstanceName));
+    OracleConnectionFactory.initializeOracleConnection(
+        connection, linkConfiguration.connectionConfig);
+    connection.setAutoCommit(false);
+    oracleVersion = OracleQueries.getOracleVersion(connection);
+
+    if (jobConfiguration.toJobConfig.updateKey == null ||
+        jobConfiguration.toJobConfig.updateKey.isEmpty()) {
+      setupInsert(linkConfiguration, jobConfiguration);
+    } else {
+      setupUpdate(linkConfiguration, jobConfiguration);
+    }
+
+    // Has the user forced the use of APPEND_VALUES either on or off?...
+    useAppendValuesOracleHint =
+        allowUserToOverrideUseOfTheOracleAppendValuesHint(
+            jobConfiguration.toJobConfig,
+            useAppendValuesOracleHint);
+
+    insertData();
+    connection.close();
+  }
+
+  @Override
+  public long getRowsWritten() {
+    return rowsWritten;
+  }
+
+  private void insertData() throws Exception {
+    // If using APPEND_VALUES, check the batch size and commit frequency...
+    if (useAppendValuesOracleHint) {
+      if(rowsPerBatch < OracleJdbcConnectorConstants.
+          MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT) {
+        LOG.info(String
+            .format(
+                "The number of rows per batch-insert has been changed from %d "
+                    + "to %d. This is in response "
+                    + "to the Oracle APPEND_VALUES hint being used.",
+                    rowsPerBatch, OracleJdbcConnectorConstants.
+                    MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT));
+        rowsPerBatch = OracleJdbcConnectorConstants.
+            MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT;
+      }
+      // Need to commit after each batch when using APPEND_VALUES
+      if(rowsPerCommit!=rowsPerBatch) {
+        LOG.info(String
+            .format(
+                "The number of rows to insert per commit has been "
+                    + "changed from %d to %d. This is in response "
+                    + "to the Oracle APPEND_VALUES hint being used.",
+                    rowsPerCommit, rowsPerBatch));
+        rowsPerCommit = rowsPerBatch;
+      }
+    }
+
+    mapperRowNumber = 1;
+
+    String sql = getBatchInsertSqlStatement(useAppendValuesOracleHint
+        ? "/*+APPEND_VALUES*/" : "");
+    PreparedStatement statement = connection.prepareStatement(sql);
+
+    Column[] columns = context.getSchema().getColumnsArray();
+    Object[] array;
+    boolean checkUpdateColumns = false;
+    List<Integer> updateColumnIndexes = null;
+    if(updateColumnNames!=null) {
+      checkUpdateColumns = true;
+      updateColumnIndexes = new ArrayList<Integer>();
+      for (int idx = 0; idx < this.updateColumnNames.length; idx++) {
+        for (int i = 0; i < columns.length; i++) {
+          if(columns[i].getName().equals(updateColumnNames[idx])) {
+            updateColumnIndexes.add(i);
+          }
+        }
+      }
+    }
+
+    while ((array = context.getDataReader().readArrayRecord()) != null) {
+      if(checkUpdateColumns) {
+        boolean updateKeyValueIsNull = false;
+        for (Integer i : updateColumnIndexes) {
+          Object updateKeyValue = array[i];
+          if (updateKeyValue == null) {
+            this.numberOfRowsSkipped++;
+            updateKeyValueIsNull = true;
+            break;
+          }
+        }
+
+        if (updateKeyValueIsNull) {
+          continue;
+        }
+      }
+      rowsWritten++;
+      configurePreparedStatementColumns(statement, columns, array);
+      if(rowsWritten % rowsPerBatch == 0) {
+        statement.executeBatch();
+      }
+      if(rowsWritten % rowsPerCommit == 0) {
+        connection.commit();
+      }
+    }
+    if(rowsWritten % rowsPerBatch != 0) {
+      statement.executeBatch();
+    }
+    connection.commit();
+    statement.close();
+
+    if (numberOfRowsSkipped > 0) {
+      LOG.warn(String.format(
+          "%d records were skipped due to a NULL value within one of the "
+        + "update-key column(s).\nHaving a NULL value prevents a record "
+        + "from being able to be matched to a row in the Oracle table.",
+              numberOfRowsSkipped));
+    }
+  }
+
+  private String getBatchInsertSqlStatement(String oracleHint) {
+
+    // String[] columnNames = this.getColumnNames();
+    StringBuilder sqlNames = new StringBuilder();
+    StringBuilder sqlValues = new StringBuilder();
+
+    /*
+     * NOTE: "this.oracleTableColumns" may contain a different list of columns
+     * than "this.getColumnNames()". This is because: (1)
+     * "this.getColumnNames()" includes columns with data-types that are not
+     * supported by OraOop. (2) "this.oracleTableColumns" includes any
+     * pseudo-columns that we've added to the export table (and don't exist in
+     * the HDFS file being read). For example, if exporting to a partitioned
+     * table (that OraOop created), there are two pseudo-columns we added to
+     * the table to identify the export job and the mapper.
+     */
+
+    int colCount = 0;
+    for (int idx = 0; idx < this.tableColumns.size(); idx++) {
+      OracleTableColumn oracleTableColumn = this.tableColumns.get(idx);
+      String columnName = oracleTableColumn.getName();
+
+      // column names...
+      if (colCount > 0) {
+        sqlNames.append("\n,");
+      }
+      sqlNames.append(columnName);
+
+      // column values...
+      if (colCount > 0) {
+        sqlValues.append("\n,");
+      }
+
+      String pseudoColumnValue =
+          generateInsertValueForPseudoColumn(columnName);
+
+      String bindVarName = null;
+
+      if (pseudoColumnValue != null) {
+        bindVarName = pseudoColumnValue;
+      } else if (oracleTableColumn.getOracleType() == OracleQueries
+          .getOracleType("STRUCT")) {
+        if (oracleTableColumn.getDataType().equals(
+            OracleJdbcConnectorConstants.Oracle.URITYPE)) {
+          bindVarName =
+              String.format("urifactory.getUri(%s)",
+                  columnNameToBindVariable(columnName));
+        }
+        //TODO: Date as string?
+      /*} else if (getConf().getBoolean(
+          OraOopConstants.ORAOOP_MAP_TIMESTAMP_AS_STRING,
+          OraOopConstants.ORAOOP_MAP_TIMESTAMP_AS_STRING_DEFAULT)) {
+        if (oracleTableColumn.getOracleType() == OraOopOracleQueries
+            .getOracleType("DATE")) {
+          bindVarName =
+              String.format("to_date(%s, 'yyyy-mm-dd hh24:mi:ss')",
+                  columnNameToBindVariable(columnName));
+        } else if (oracleTableColumn.getOracleType() == OraOopOracleQueries
+            .getOracleType("TIMESTAMP")) {
+          bindVarName =
+              String.format("to_timestamp(%s, 'yyyy-mm-dd hh24:mi:ss.ff')",
+                  columnNameToBindVariable(columnName));
+        } else if (oracleTableColumn.getOracleType() == OraOopOracleQueries
+            .getOracleType("TIMESTAMPTZ")) {
+          bindVarName =
+              String.format(
+                  "to_timestamp_tz(%s, 'yyyy-mm-dd hh24:mi:ss.ff TZR')",
+                  columnNameToBindVariable(columnName));
+        } else if (oracleTableColumn.getOracleType() == OraOopOracleQueries
+            .getOracleType("TIMESTAMPLTZ")) {
+          bindVarName =
+              String.format(
+                  "to_timestamp_tz(%s, 'yyyy-mm-dd hh24:mi:ss.ff TZR')",
+                  columnNameToBindVariable(columnName));
+        }*/
+      }
+
+      if (bindVarName == null) {
+        bindVarName = columnNameToBindVariable(columnName);
+      }
+
+      sqlValues.append(bindVarName);
+
+      colCount++;
+    }
+
+    String sql =
+        String.format("insert %s into %s\n" + "(%s)\n" + "values\n"
+            + "(%s)\n", oracleHint, this.table.toString(), sqlNames
+            .toString(), sqlValues.toString());
+
+    LOG.info("Batch-Mode insert statement:\n" + sql);
+    return sql;
+  }
+
+  private String generateInsertValueForPseudoColumn(String columnName) {
+
+    if (columnName.equalsIgnoreCase(
+        OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_PARTITION)) {
+
+      String partitionValueStr =
+          context.getString(
+              OracleJdbcConnectorConstants.ORAOOP_EXPORT_PARTITION_DATE_VALUE);
+      if (partitionValueStr == null) {
+        throw new RuntimeException(
+            "Unable to recall the value of the partition date-time.");
+      }
+
+      return String.format("to_date('%s', '%s')", partitionValueStr,
+          OracleJdbcConnectorConstants.ORAOOP_EXPORT_PARTITION_DATE_FORMAT);
+    }
+
+    if (columnName.equalsIgnoreCase(
+        OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_SUBPARTITION)) {
+      return Integer.toString(this.mapperId);
+    }
+
+    return null;
+  }
+
+  private String columnNameToBindVariable(String columnName) {
+    return ":" + columnName;
+  }
+
+  private void configurePreparedStatementColumns(
+      PreparedStatement statement, Column[] columns, Object[] array)
+      throws SQLException {
+
+    String bindValueName;
+
+    if (this.tableHasMapperRowNumberColumn) {
+      bindValueName = columnNameToBindVariable(OracleJdbcConnectorConstants.
+          COLUMN_NAME_EXPORT_MAPPER_ROW).replaceFirst(":", "");
+      try {
+        OracleQueries.setLongAtName(statement, bindValueName,
+            this.mapperRowNumber);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      this.mapperRowNumber++;
+    }
+
+    for (int i = 0; i < array.length; i++) {
+      String colName = columns[i].getName();
+      bindValueName = columnNameToBindVariable(colName).replaceFirst(":", "");
+      OracleTableColumn oracleTableColumn =
+          tableColumns.findColumnByName(colName);
+      setBindValueAtName(statement, bindValueName, array[i],
+          oracleTableColumn);
+    }
+    statement.addBatch();
+  }
+
+  private void setBindValueAtName(PreparedStatement statement,
+      String bindValueName, Object bindValue, OracleTableColumn column)
+      throws SQLException {
+    if (column.getOracleType()
+        == OracleQueries.getOracleType("NUMBER")) {
+      OracleQueries.setBigDecimalAtName(statement, bindValueName,
+          (BigDecimal) bindValue);
+    } else if (column.getOracleType() == OracleQueries
+        .getOracleType("VARCHAR")) {
+      OracleQueries.setStringAtName(statement, bindValueName,
+          (String) bindValue);
+    } else if (column.getOracleType() == OracleQueries
+        .getOracleType("TIMESTAMP")
+        || column.getOracleType() == OracleQueries
+            .getOracleType("TIMESTAMPTZ")
+        || column.getOracleType() == OracleQueries
+            .getOracleType("TIMESTAMPLTZ")) {
+      Object objValue = bindValue;
+      if (objValue instanceof LocalDateTime) {
+        //TODO: Improve date handling
+        LocalDateTime value = (LocalDateTime) objValue;
+        Timestamp timestampValue =
+            new Timestamp(value.toDateTime().getMillis());
+        OracleQueries.setTimestampAtName(statement, bindValueName,
+            timestampValue);
+      } else {
+        String value = (String) objValue;
+
+        if (value == null || value.equalsIgnoreCase("null")) {
+          value = "";
+        }
+
+        OracleQueries.setStringAtName(statement, bindValueName, value);
+      }
+    } else if (column.getOracleType() == OracleQueries
+        .getOracleType("BINARY_DOUBLE")) {
+      Double value = (Double) bindValue;
+      if (value != null) {
+        OracleQueries.setBinaryDoubleAtName(statement, bindValueName,
+            value);
+      } else {
+        OracleQueries.setObjectAtName(statement, bindValueName, null);
+      }
+    } else if (column.getOracleType() == OracleQueries
+        .getOracleType("BINARY_FLOAT")) {
+      Float value = (Float) bindValue;
+      if (value != null) {
+        OracleQueries.setBinaryFloatAtName(statement, bindValueName,
+            value);
+      } else {
+        OracleQueries.setObjectAtName(statement, bindValueName, null);
+      }
+    } else if (column.getOracleType() == OracleQueries
+        .getOracleType("STRUCT")) { // <- E.g. URITYPE
+      if (column.getDataType().equals(
+          OracleJdbcConnectorConstants.Oracle.URITYPE)) {
+        String value = (String) bindValue;
+        OracleQueries.setStringAtName(statement, bindValueName, value);
+      } else {
+        String msg =
+            String.format(
+                "%s needs to be updated to cope with the data-type: %s "
+                    + "where the Oracle data_type is \"%s\".",
+                OracleUtilities.getCurrentMethodName(), column.getDataType(),
+                column.getOracleType());
+        LOG.error(msg);
+        throw new UnsupportedOperationException(msg);
+      }
+    } else {
+      // LOB data-types are currently not supported during
+      // a Sqoop Export.
+      // JIRA: SQOOP-117
+      // OraOopConstants.SUPPORTED_EXPORT_ORACLE_DATA_TYPES_CLAUSE
+      // will already have excluded all LOB columns.
+
+      // case oracle.jdbc.OracleTypes.CLOB:
+      // {
+      // oracle.sql.CLOB clob = new
+      // oracle.sql.CLOB(connection);
+      // Object value = fieldMap.get(colName);
+      // //clob.set
+      // statement.setCLOBAtName(bindValueName, clob);
+      // break;
+      // }
+      String msg =
+          String.format(
+              "%s may need to be updated to cope with the data-type: %s",
+              OracleUtilities.getCurrentMethodName(), column.getOracleType());
+      LOG.debug(msg);
+
+      OracleQueries
+          .setObjectAtName(statement, bindValueName, bindValue);
+    }
+  }
+
+  private boolean canUseOracleAppendValuesHint() {
+
+    // Should we use the APPEND_VALUES Oracle hint?...
+    // (Yes, if this is Oracle 11.2 or above)...
+    boolean result = oracleVersion.isGreaterThanOrEqualTo(11, 2, 0, 0);
+
+    // If there is a BINARY_DOUBLE or BINARY_FLOAT column, then we'll avoid
+    // using
+    // the APPEND_VALUES hint. If there is a NULL in the HDFS file, then we'll
+    // encounter
+    // "ORA-12838: cannot read/modify an object after modifying it in parallel"
+    // due to the JDBC driver issuing the INSERT statement twice to the database
+    // without a COMMIT in between (as was observed via WireShark).
+    // We're not sure why this happens - we just know how to avoid it.
+    if (result) {
+      boolean binaryDoubleColumnExists = false;
+      boolean binaryFloatColumnExists = false;
+      for (int idx = 0; idx < this.tableColumns.size(); idx++) {
+        OracleTableColumn oracleTableColumn = this.tableColumns.get(idx);
+        if(oracleTableColumn.getOracleType()==
+            OracleQueries.getOracleType("BINARY_DOUBLE")) {
+          binaryDoubleColumnExists = true;
+        }
+        if(oracleTableColumn.getOracleType()==
+            OracleQueries.getOracleType("BINARY_FLOAT")) {
+          binaryFloatColumnExists = true;
+        }
+      }
+
+      if (binaryDoubleColumnExists || binaryFloatColumnExists) {
+        result = false;
+        LOG.info("The APPEND_VALUES Oracle hint will not be used for the "
+            + "INSERT SQL statement, as the Oracle table "
+            + "contains either a BINARY_DOUBLE or BINARY_FLOAT column.");
+      }
+    }
+
+    return result;
+  }
+
+  protected boolean allowUserToOverrideUseOfTheOracleAppendValuesHint(
+      ToJobConfig jobConfig, boolean useAppendValuesOracleHint) {
+
+    boolean result = useAppendValuesOracleHint;
+
+    // Has the user forced the use of APPEND_VALUES either on or off?...
+    switch (OracleUtilities.getOracleAppendValuesHintUsage(jobConfig)) {
+
+      case OFF:
+        result = false;
+        LOG.debug(String
+            .format(
+                "Use of the APPEND_VALUES Oracle hint has been forced OFF. "
+                + "(It was %s to used).",
+                useAppendValuesOracleHint ? "going" : "not going"));
+        break;
+
+      case ON:
+        result = true;
+        LOG.debug(String
+            .format(
+                "Use of the APPEND_VALUES Oracle hint has been forced ON. "
+                + "(It was %s to used).",
+                useAppendValuesOracleHint ? "going" : "not going"));
+        break;
+
+      case AUTO:
+        LOG.debug(String.format("The APPEND_VALUES Oracle hint %s be used.",
+            result ? "will" : "will not"));
+        break;
+
+      default:
+        throw new RuntimeException("Invalid value for APPEND_VALUES.");
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartition.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartition.java
new file mode 100644
index 0000000..9aeacaf
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartition.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleDataChunk;
+import org.apache.sqoop.job.etl.Partition;
+
+public class OracleJdbcPartition extends Partition {
+
+
+  private int splitId;
+  private double totalNumberOfBlocksInAllSplits;
+  private String splitLocation;
+  private List<OracleDataChunk> oracleDataChunks;
+
+  // NB: Update write(), readFields() and getDebugDetails() if you add fields
+  // here.
+
+  public OracleJdbcPartition() {
+
+    this.splitId = -1;
+    this.splitLocation = "";
+    this.oracleDataChunks = new ArrayList<OracleDataChunk>();
+  }
+
+  public OracleJdbcPartition(List<OracleDataChunk> dataChunks) {
+
+    setOracleDataChunks(dataChunks);
+  }
+
+  public void setOracleDataChunks(List<OracleDataChunk> dataChunks) {
+
+    this.oracleDataChunks = dataChunks;
+  }
+
+  public List<OracleDataChunk> getDataChunks() {
+
+    return this.oracleDataChunks;
+  }
+
+  public int getNumberOfDataChunks() {
+
+    if (this.getDataChunks() == null) {
+      return 0;
+    } else {
+      return this.getDataChunks().size();
+    }
+  }
+
+  /**
+   * @return The total number of blocks within the data-chunks of this split
+   */
+  public long getLength() {
+
+    return this.getTotalNumberOfBlocksInThisSplit();
+  }
+
+  public int getTotalNumberOfBlocksInThisSplit() {
+
+    if (this.getNumberOfDataChunks() == 0) {
+      return 0;
+    }
+
+    int result = 0;
+    for (OracleDataChunk dataChunk : this.getDataChunks()) {
+      result += dataChunk.getNumberOfBlocks();
+    }
+
+    return result;
+  }
+
+  public OracleDataChunk findDataChunkById(String id) {
+
+    for (OracleDataChunk dataChunk : this.getDataChunks()) {
+      if (dataChunk.getId().equals(id)) {
+        return dataChunk;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public void write(DataOutput output) throws IOException {
+
+    output.writeInt(splitId);
+
+    if (this.oracleDataChunks == null) {
+      output.writeInt(0);
+    } else {
+      output.writeInt(this.oracleDataChunks.size());
+      for (OracleDataChunk dataChunk : this.oracleDataChunks) {
+        output.writeUTF(dataChunk.getClass().getName());
+        dataChunk.write(output);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  /** {@inheritDoc} */
+  public void readFields(DataInput input) throws IOException {
+
+    this.splitId = input.readInt();
+
+    int dataChunkCount = input.readInt();
+    if (dataChunkCount == 0) {
+      this.oracleDataChunks = null;
+    } else {
+      Class<? extends OracleDataChunk> dataChunkClass;
+      OracleDataChunk dataChunk;
+      this.oracleDataChunks =
+          new ArrayList<OracleDataChunk>(dataChunkCount);
+      for (int idx = 0; idx < dataChunkCount; idx++) {
+        try {
+          dataChunkClass =
+              (Class<? extends OracleDataChunk>) Class.forName(input.readUTF());
+          dataChunk = dataChunkClass.newInstance();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+        dataChunk.readFields(input);
+        this.oracleDataChunks.add(dataChunk);
+      }
+    }
+  }
+
+  public String toString() {
+
+    StringBuilder result = new StringBuilder();
+
+    if (this.getNumberOfDataChunks() == 0) {
+      result.append(String.format(
+          "Split[%s] does not contain any Oracle data-chunks.", this.splitId));
+    } else {
+      result.append(String.format(
+          "Split[%s] includes the Oracle data-chunks:\n", this.splitId));
+      for (OracleDataChunk dataChunk : getDataChunks()) {
+        result.append(dataChunk.toString());
+      }
+    }
+    return result.toString();
+  }
+
+  protected int getSplitId() {
+    return this.splitId;
+  }
+
+  protected void setSplitId(int newSplitId) {
+    this.splitId = newSplitId;
+  }
+
+  protected void setSplitLocation(String newSplitLocation) {
+    this.splitLocation = newSplitLocation;
+  }
+
+  protected void setTotalNumberOfBlocksInAllSplits(
+      int newTotalNumberOfBlocksInAllSplits) {
+    this.totalNumberOfBlocksInAllSplits = newTotalNumberOfBlocksInAllSplits;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartitioner.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartitioner.java
new file mode 100644
index 0000000..00c7752
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartitioner.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleDataChunk;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
+
+public class OracleJdbcPartitioner extends
+    Partitioner<LinkConfiguration, FromJobConfiguration> {
+
+  private static final Logger LOG =
+      Logger.getLogger(OracleJdbcPartitioner.class);
+
+  @Override
+  public List<Partition> getPartitions(PartitionerContext context,
+      LinkConfiguration linkConfiguration,
+      FromJobConfiguration jobConfiguration) {
+    try {
+      Connection connection = OracleConnectionFactory.makeConnection(
+          linkConfiguration.connectionConfig);
+      OracleTable table = OracleUtilities.decodeOracleTableName(
+          linkConfiguration.connectionConfig.username,
+          jobConfiguration.fromJobConfig.tableName);
+
+      long desiredNumberOfMappers = context.getMaxPartitions();
+      List<String> partitionList = getPartitionList(
+          jobConfiguration.fromJobConfig);
+
+      List<Partition> splits = null;
+      try {
+        OracleConnectionFactory.initializeOracleConnection(connection,
+            linkConfiguration.connectionConfig);
+
+        // The number of chunks generated will *not* be a multiple of the number
+        // of splits,
+        // to ensure that each split doesn't always get data from the start of
+        // each data-file...
+        long numberOfChunksPerOracleDataFile = (desiredNumberOfMappers * 2) + 1;
+
+        // Get the Oracle data-chunks for the table...
+        List<? extends OracleDataChunk> dataChunks;
+        if (OracleUtilities.getOraOopOracleDataChunkMethod(
+            jobConfiguration.fromJobConfig).equals(
+            OracleUtilities.OracleDataChunkMethod.PARTITION)) {
+          dataChunks =
+              OracleQueries.getOracleDataChunksPartition(connection, table,
+                  partitionList);
+        } else {
+          dataChunks =
+              OracleQueries.getOracleDataChunksExtent(connection, table,
+                  partitionList, numberOfChunksPerOracleDataFile);
+        }
+
+        if (dataChunks.size() == 0) {
+          String errMsg;
+          if (OracleUtilities.getOraOopOracleDataChunkMethod(
+              jobConfiguration.fromJobConfig).equals(
+                  OracleUtilities.OracleDataChunkMethod.PARTITION)) {
+            errMsg =
+                String
+                    .format(
+                        "The table %s does not contain any partitions and you "
+                        + "have specified to chunk the table by partitions.",
+                        table.getName());
+          } else {
+            errMsg =
+                String.format("The table %s does not contain any data.", table
+                    .getName());
+          }
+          LOG.fatal(errMsg);
+          throw new RuntimeException(errMsg);
+        } else {
+          OracleUtilities.OracleBlockToSplitAllocationMethod
+              blockAllocationMethod = OracleUtilities
+                  .getOracleBlockToSplitAllocationMethod(
+                      jobConfiguration.fromJobConfig,
+                      OracleUtilities.
+                          OracleBlockToSplitAllocationMethod.ROUNDROBIN);
+
+          // Group the Oracle data-chunks into splits...
+          splits =
+              groupTableDataChunksIntoSplits(dataChunks, desiredNumberOfMappers,
+                  blockAllocationMethod);
+
+          /*String oraoopLocations =
+              jobContext.getConfiguration().get("oraoop.locations", "");
+          String[] locations = oraoopLocations.split(",");
+          for (int idx = 0; idx < locations.length; idx++) {
+            if (idx < splits.size()) {
+              String location = locations[idx].trim();
+              if (!location.isEmpty()) {
+                ((OraOopDBInputSplit) splits.get(idx)).setSplitLocation(location);
+
+                LOG.info(String
+                    .format("Split[%d] has been assigned location \"%s\".", idx,
+                        location));
+              }
+            }
+          }*/
+
+        }
+      } catch (SQLException ex) {
+        throw new RuntimeException(ex);
+      }
+
+      return splits;
+    } catch (SQLException ex) {
+      throw new RuntimeException(String.format(
+          "Unable to connect to the Oracle database at %s\nError:%s",
+          linkConfiguration.connectionConfig.connectionString, ex
+              .getMessage()), ex);
+    }
+  }
+
+  private List<String> getPartitionList(FromJobConfig jobConfig) {
+    LOG.debug("Partition list = " + jobConfig.partitionList);
+    List<String> result =
+        OracleUtilities.splitOracleStringList(jobConfig.partitionList);
+    if (result != null && result.size() > 0) {
+      LOG.debug("Partition filter list: " + result.toString());
+    }
+    return result;
+  }
+
+  protected static
+  List<Partition> groupTableDataChunksIntoSplits(
+      List<? extends OracleDataChunk> dataChunks,
+      long desiredNumberOfSplits,
+      OracleUtilities.OracleBlockToSplitAllocationMethod
+          blockAllocationMethod) {
+
+  long numberOfDataChunks = dataChunks.size();
+  long actualNumberOfSplits =
+      Math.min(numberOfDataChunks, desiredNumberOfSplits);
+  long totalNumberOfBlocksInAllDataChunks = 0;
+  for (OracleDataChunk dataChunk : dataChunks) {
+    totalNumberOfBlocksInAllDataChunks += dataChunk.getNumberOfBlocks();
+  }
+
+  String debugMsg = String.format(
+      "The table being imported by sqoop has %d blocks "
+    + "that have been divided into %d chunks "
+    + "which will be processed in %d splits. "
+    + "The chunks will be allocated to the splits using the method : %s",
+      totalNumberOfBlocksInAllDataChunks, numberOfDataChunks,
+      actualNumberOfSplits, blockAllocationMethod.toString());
+  LOG.info(debugMsg);
+
+  List<Partition> splits =
+      new ArrayList<Partition>((int) actualNumberOfSplits);
+
+  for (int i = 0; i < actualNumberOfSplits; i++) {
+    OracleJdbcPartition split = new OracleJdbcPartition();
+    //split.setSplitId(i);
+    //split.setTotalNumberOfBlocksInAllSplits(
+    //    totalNumberOfBlocksInAllDataChunks);
+    splits.add(split);
+  }
+
+  switch (blockAllocationMethod) {
+
+    case RANDOM:
+      // Randomize the order of the data chunks and then "fall through" into
+      // the ROUNDROBIN block below...
+      Collections.shuffle(dataChunks);
+
+      // NB: No "break;" statement here - we're intentionally falling into the
+      // ROUNDROBIN block below...
+
+    //$FALL-THROUGH$
+    case ROUNDROBIN:
+      int idxSplitRoundRobin = 0;
+      for (OracleDataChunk dataChunk : dataChunks) {
+
+        if (idxSplitRoundRobin >= splits.size()) {
+          idxSplitRoundRobin = 0;
+        }
+        OracleJdbcPartition split =
+            (OracleJdbcPartition) splits.get(idxSplitRoundRobin++);
+
+        split.getDataChunks().add(dataChunk);
+      }
+      break;
+
+    case SEQUENTIAL:
+      double dataChunksPerSplit = dataChunks.size() / (double) splits.size();
+      int dataChunksAllocatedToSplits = 0;
+
+      int idxSplitSeq = 0;
+      for (OracleDataChunk dataChunk : dataChunks) {
+
+        OracleJdbcPartition split =
+            (OracleJdbcPartition) splits.get(idxSplitSeq);
+        split.getDataChunks().add(dataChunk);
+
+        dataChunksAllocatedToSplits++;
+
+        if (dataChunksAllocatedToSplits
+                >= (dataChunksPerSplit * (idxSplitSeq + 1))
+            && idxSplitSeq < splits.size()) {
+          idxSplitSeq++;
+        }
+      }
+      break;
+
+    default:
+      throw new RuntimeException("Block allocation method not implemented.");
+
+  }
+
+  if (LOG.isDebugEnabled()) {
+    for (int idx = 0; idx < splits.size(); idx++) {
+      LOG.debug("\n\t"
+          + splits.get(idx).toString());
+    }
+  }
+
+  return splits;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToDestroyer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToDestroyer.java
new file mode 100644
index 0000000..8429a38
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToDestroyer.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ConnectionConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries.CreateExportChangesTableOptions;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumns;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.ExportTableUpdateTechnique;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.InsertMode;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.UpdateMode;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class OracleJdbcToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> {
+
+  private static final Logger LOG =
+    Logger.getLogger(OracleJdbcToDestroyer.class);
+
+  protected Connection connection;
+  protected OracleTable table;
+  protected int numMappers = 8;
+
+  private void connect(ConnectionConfig connectionConfig) {
+    try {
+      connection = OracleConnectionFactory.makeConnection(connectionConfig);
+    } catch (SQLException ex) {
+      throw new RuntimeException(String.format(
+          "Unable to connect to the Oracle database at %s\n"
+              + "Error:%s", connectionConfig.connectionString, ex
+              .getMessage()), ex);
+    }
+  }
+
+  @Override
+  public void destroy(DestroyerContext context,
+      LinkConfiguration linkConfiguration,
+      ToJobConfiguration jobConfiguration) {
+    LOG.debug("Running Oracle JDBC connector destroyer");
+
+    table = OracleUtilities.decodeOracleTableName(
+        linkConfiguration.connectionConfig.username,
+        jobConfiguration.toJobConfig.tableName);
+
+    if (jobConfiguration.toJobConfig.updateKey == null ||
+        jobConfiguration.toJobConfig.updateKey.isEmpty()) {
+
+      // Is each mapper inserting rows into a unique table?...
+      InsertMode insertMode = OracleUtilities.getExportInsertMode(
+          jobConfiguration.toJobConfig, context.getContext());
+
+      if(insertMode==InsertMode.ExchangePartition) {
+        connect(linkConfiguration.connectionConfig);
+        Object sysDateTime =
+            OracleUtilities.recallOracleDateTime(context.getContext());
+
+        exchangePartitionUniqueMapperTableDataIntoMainExportTable(sysDateTime);
+
+      }
+
+    } else {
+      connect(linkConfiguration.connectionConfig);
+      Object sysDateTime =
+          OracleUtilities.recallOracleDateTime(context.getContext());
+      try {
+        updateMainExportTableFromUniqueMapperTable(jobConfiguration.toJobConfig,
+            context.getContext(), sysDateTime);
+      } catch(SQLException e) {
+        throw new RuntimeException(
+            String.format(
+                "Unable to update the table %s.",table.toString()), e);
+      }
+    }
+  }
+
+  private void exchangePartitionUniqueMapperTableDataIntoMainExportTable(
+      Object sysDateTime) {
+
+    for(int i=0; i<numMappers; i++) {
+        long start = System.nanoTime();
+
+        OracleTable mapperTable =
+            OracleUtilities.generateExportTableMapperTableName(
+                i, sysDateTime, null);
+
+        String subPartitionName =
+            OracleUtilities.generateExportTableSubPartitionName(
+                i, sysDateTime);
+
+        try {
+          OracleQueries.exchangeSubpartition(connection,
+              table, subPartitionName, mapperTable);
+
+          double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
+          LOG.info(String
+              .format(
+                  "Time spent performing an \"exchange subpartition with "
+                  + "table\": %f sec.",
+                  timeInSec));
+
+          LOG.debug(String.format("Dropping temporary mapper table %s",
+              mapperTable.toString()));
+          OracleQueries.dropTable(connection, mapperTable);
+      } catch (SQLException ex) {
+        throw new RuntimeException(
+            String
+                .format(
+                    "Unable to perform an \"exchange subpartition\" operation "
+                        + "for the table %s, for the subpartition named "
+                        + "\"%s\" with the table named \"%s\".",
+                    table.toString(), subPartitionName,
+                    mapperTable.toString()), ex);
+      }
+    }
+  }
+
+  private void updateMainExportTableFromUniqueMapperTable(ToJobConfig jobConfig,
+      ImmutableContext context, Object sysDateTime)
+      throws SQLException {
+
+    String[] updateColumnNames = OracleUtilities.
+        getExportUpdateKeyColumnNames(jobConfig);
+
+    OracleTableColumns tableColumns = OracleQueries.getToTableColumns(
+        connection, table, true, false);
+
+    UpdateMode updateMode = OracleUtilities.getExportUpdateMode(jobConfig);
+
+    ExportTableUpdateTechnique exportTableUpdateTechnique =
+        OracleUtilities.getExportTableUpdateTechnique(context, updateMode);
+
+    CreateExportChangesTableOptions changesTableOptions;
+    boolean parallelizationEnabled =
+        OracleUtilities.enableOracleParallelProcessingDuringExport(jobConfig);
+
+    switch (exportTableUpdateTechnique) {
+
+      case ReInsertUpdatedRows:
+      case UpdateSql:
+        changesTableOptions =
+            CreateExportChangesTableOptions.OnlyRowsThatDiffer;
+        break;
+
+      case ReInsertUpdatedRowsAndNewRows:
+      case MergeSql:
+        changesTableOptions =
+            CreateExportChangesTableOptions.RowsThatDifferPlusNewRows;
+        break;
+
+      default:
+        throw new RuntimeException(String.format(
+            "Update %s to cater for the ExportTableUpdateTechnique \"%s\".",
+            OracleUtilities.getCurrentMethodName(),
+            exportTableUpdateTechnique.toString()));
+    }
+
+    String temporaryTableStorageClause =
+        OracleUtilities.getTemporaryTableStorageClause(jobConfig);
+
+    for(int i=0; i<numMappers; i++) {
+
+      OracleTable mapperTable =
+          OracleUtilities.generateExportTableMapperTableName(
+              i, sysDateTime, null);
+
+      OracleTable changesTable =
+          OracleUtilities.generateExportTableMapperTableName(Integer
+              .toString(i) + "_CHG", sysDateTime, null);
+
+      try {
+        int changeTableRowCount =
+            OracleQueries.createExportChangesTable(connection,
+                changesTable, temporaryTableStorageClause, mapperTable,
+                table, updateColumnNames, changesTableOptions,
+                parallelizationEnabled);
+
+        if (changeTableRowCount == 0) {
+          LOG.debug(String.format(
+              "The changes-table does not contain any rows. %s is now exiting.",
+                  OracleUtilities.getCurrentMethodName()));
+          continue;
+        }
+
+        switch (exportTableUpdateTechnique) {
+
+          case ReInsertUpdatedRows:
+          case ReInsertUpdatedRowsAndNewRows:
+
+            OracleQueries.deleteRowsFromTable(connection,
+                table, changesTable, updateColumnNames,
+                parallelizationEnabled);
+
+            OracleQueries.insertRowsIntoExportTable(connection,
+                table, changesTable, sysDateTime, i,
+                parallelizationEnabled);
+            break;
+
+          case UpdateSql:
+
+            long start = System.nanoTime();
+
+            OracleQueries.updateTable(connection, table,
+                changesTable, updateColumnNames, tableColumns, sysDateTime, i,
+                parallelizationEnabled);
+
+            double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
+            LOG.info(String.format("Time spent performing an update: %f sec.",
+                timeInSec));
+            break;
+
+          case MergeSql:
+
+            long mergeStart = System.nanoTime();
+
+            OracleQueries.mergeTable(connection, table,
+                changesTable, updateColumnNames, tableColumns, sysDateTime,
+                i, parallelizationEnabled);
+
+            double mergeTimeInSec = (System.nanoTime() - mergeStart)
+                / Math.pow(10, 9);
+            LOG.info(String.format("Time spent performing a merge: %f sec.",
+                mergeTimeInSec));
+
+            break;
+
+          default:
+            throw new RuntimeException(
+              String.format(
+                "Update %s to cater for the ExportTableUpdateTechnique \"%s\".",
+                        OracleUtilities.getCurrentMethodName(),
+                        exportTableUpdateTechnique.toString()));
+        }
+
+        connection.commit();
+      } catch (SQLException ex) {
+        connection.rollback();
+        throw ex;
+      } finally {
+        OracleQueries.dropTable(connection, changesTable);
+        LOG.debug(String.format("Dropping temporary mapper table %s",
+            mapperTable.toString()));
+        OracleQueries.dropTable(connection, mapperTable);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToInitializer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToInitializer.java
new file mode 100644
index 0000000..f1d92f0
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToInitializer.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ConnectionConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTablePartition;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTablePartitions;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.UpdateMode;
+import org.apache.sqoop.job.etl.InitializerContext;
+
+public class OracleJdbcToInitializer extends
+    OracleJdbcCommonInitializer<ToJobConfiguration> {
+
+  private static final Logger LOG =
+    Logger.getLogger(OracleJdbcToInitializer.class);
+
+  @Override
+  public void connect(InitializerContext context,
+      LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration)
+          throws SQLException {
+    super.connect(context, linkConfiguration, jobConfiguration);
+    table = OracleUtilities.decodeOracleTableName(
+        linkConfiguration.connectionConfig.username,
+        jobConfiguration.toJobConfig.tableName);
+  }
+
+  @Override
+  public void initialize(InitializerContext context,
+      LinkConfiguration linkConfiguration,
+      ToJobConfiguration jobConfiguration) {
+    super.initialize(context, linkConfiguration, jobConfiguration);
+    LOG.debug("Running Oracle JDBC connector initializer");
+    try {
+      createAnyRequiredOracleObjects(context.getContext(),
+          jobConfiguration.toJobConfig, linkConfiguration.connectionConfig);
+
+      if (!isSqoopTableAnOracleTable(connection,
+          linkConfiguration.connectionConfig.username, table)) {
+        throw new RuntimeException("Can only load data into Oracle tables.");
+      }
+    } catch(SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private void createAnyRequiredOracleObjects(MutableContext context,
+      ToJobConfig jobConfig, ConnectionConfig connectionConfig)
+          throws SQLException {
+
+      // The SYSDATE on the Oracle database will be used as the partition value
+      // for this export job...
+      Object sysDateTime = OracleQueries.getSysDate(connection);
+      String sysDateStr =
+        OracleQueries.oraDATEToString(sysDateTime, "yyyy-mm-dd hh24:mi:ss");
+      context.setString(OracleJdbcConnectorConstants.SQOOP_ORACLE_JOB_SYSDATE,
+          sysDateStr);
+
+      checkForOldOraOopTemporaryOracleTables(connection, sysDateTime,
+          OracleQueries.getCurrentSchema(connection));
+
+      // Store the actual partition value, so the N mappers know what value to
+      // insert...
+      String partitionValue =
+          OracleQueries.oraDATEToString(sysDateTime,
+              OracleJdbcConnectorConstants.ORAOOP_EXPORT_PARTITION_DATE_FORMAT);
+      context.setString(
+          OracleJdbcConnectorConstants.ORAOOP_EXPORT_PARTITION_DATE_VALUE,
+          partitionValue);
+
+      // Generate the (22 character) partition name...
+      String partitionName =
+          OracleUtilities
+              .createExportTablePartitionNameFromOracleTimestamp(sysDateTime);
+
+      //TODO: Number of mappers needs to be fixed
+      int numMappers = 8;
+
+      String exportTableTemplate = jobConfig.templateTable;
+
+      if(exportTableTemplate==null) {
+        exportTableTemplate = "";
+      }
+
+      String user = connectionConfig.username;
+      //TODO: This is from the other Oracle Manager
+      //if (user == null) {
+      //  user = OracleManager.getSessionUser(connection);
+      //}
+
+      OracleTable templateTableContext =
+          OracleUtilities.decodeOracleTableName(user, exportTableTemplate);
+
+      boolean noLoggingOnNewTable = BooleanUtils.isTrue(jobConfig.nologging);
+
+      String updateKeyCol = jobConfig.updateKey;
+
+      /* =========================== */
+      /* VALIDATION OF INPUTS */
+      /* =========================== */
+
+      if (updateKeyCol == null || updateKeyCol.isEmpty()) {
+        // We're performing an "insert" export, not an "update" export.
+
+        // Check that the "oraoop.export.merge" property has not been specified,
+        // as this would be
+        // an invalid scenario...
+        if (OracleUtilities.getExportUpdateMode(jobConfig) == UpdateMode.Merge) {
+          throw new RuntimeException("The merge option can only be used if "
+              + "an update key is specified.");
+        }
+      }
+
+      if (OracleUtilities
+          .userWantsToCreatePartitionedExportTableFromTemplate(jobConfig)
+          || OracleUtilities
+              .userWantsToCreateNonPartitionedExportTableFromTemplate(jobConfig)) {
+
+        // OraOop will create the export table.
+
+        if (table.getName().length()
+                > OracleJdbcConnectorConstants.Oracle.MAX_IDENTIFIER_LENGTH) {
+          String msg =
+              String.format(
+                  "The Oracle table name \"%s\" is longer than %d characters.\n"
+                + "Oracle will not allow a table with this name to be created.",
+              table.getName(),
+              OracleJdbcConnectorConstants.Oracle.MAX_IDENTIFIER_LENGTH);
+          throw new RuntimeException(msg);
+        }
+
+        if (updateKeyCol != null && !updateKeyCol.isEmpty()) {
+
+          // We're performing an "update" export, not an "insert" export.
+
+          // Check whether the user is attempting an "update" (i.e. a non-merge).
+          // If so, they're
+          // asking to only UPDATE rows in a (about to be created) (empty) table
+          // that contains no rows.
+          // This will be a waste of time, as we'd be attempting to perform UPDATE
+          // operations against a
+          // table with no rows in it...
+          UpdateMode updateMode = OracleUtilities.getExportUpdateMode(jobConfig);
+          if (updateMode == UpdateMode.Update) {
+            throw new RuntimeException(String.format(
+                "\n\nCombining the template table option with the merge "
+              + "option is nonsensical, as this would create an "
+              + "empty table and then perform "
+              + "a lot of work that results in a table containing no rows.\n"));
+          }
+        }
+
+        // Check that the specified template table actually exists and is a
+        // table...
+        String templateTableObjectType =
+            OracleQueries.getOracleObjectType(connection,
+                templateTableContext);
+        if (templateTableObjectType == null) {
+          throw new RuntimeException(String.format(
+              "The specified Oracle template table \"%s\" does not exist.",
+              templateTableContext.toString()));
+        }
+
+        if (!templateTableObjectType.equalsIgnoreCase(
+                OracleJdbcConnectorConstants.Oracle.OBJECT_TYPE_TABLE)) {
+          throw new RuntimeException(
+            String.format(
+                "The specified Oracle template table \"%s\" is not an "
+              + "Oracle table, it's a %s.",
+                templateTableContext.toString(), templateTableObjectType));
+        }
+
+        if (BooleanUtils.isTrue(jobConfig.dropTableIfExists)) {
+          OracleQueries.dropTable(connection, table);
+        }
+
+        // Check that there is no existing database object with the same name of
+        // the table to be created...
+        String newTableObjectType =
+            OracleQueries.getOracleObjectType(connection, table);
+        if (newTableObjectType != null) {
+          throw new RuntimeException(
+            String.format(
+                "%s cannot create a new Oracle table named %s as a \"%s\" "
+              + "with this name already exists.",
+              OracleJdbcConnectorConstants.CONNECTOR_NAME, table.toString(),
+              newTableObjectType));
+        }
+      } else {
+        // The export table already exists.
+
+        if (updateKeyCol != null && !updateKeyCol.isEmpty()) {
+
+          // We're performing an "update" export, not an "insert" export.
+
+          // Check that there exists an index on the export table on the
+          // update-key column(s).
+          // Without such an index, this export may perform like a real dog...
+          String[] updateKeyColumns =
+              OracleUtilities.getExportUpdateKeyColumnNames(jobConfig);
+          if (!OracleQueries.doesIndexOnColumnsExist(connection,
+              table, updateKeyColumns)) {
+            String msg = String.format(
+                "\n**************************************************************"
+              + "***************************************************************"
+              + "\n\tThe table %1$s does not have a valid index on "
+              + "the column(s) %2$s.\n"
+              + "\tAs a consequence, this export may take a long time to "
+              + "complete.\n"
+              + "\tIf performance is unacceptable, consider reattempting this "
+              + "job after creating an index "
+              + "on this table via the SQL...\n"
+              + "\t\tcreate index <index_name> on %1$s(%2$s);\n"
+              + "****************************************************************"
+              + "*************************************************************",
+                        table.toString(),
+                        OracleUtilities.stringArrayToCSV(updateKeyColumns));
+            LOG.warn(msg);
+          }
+        }
+      }
+
+      boolean createMapperTables = false;
+
+      if (updateKeyCol != null && !updateKeyCol.isEmpty()) {
+        createMapperTables = true;
+      }
+
+      if (OracleUtilities
+          .userWantsToCreatePartitionedExportTableFromTemplate(jobConfig)) {
+        /* ================================= */
+        /* CREATE A PARTITIONED TABLE */
+        /* ================================= */
+
+        // Create a new Oracle table using the specified template...
+
+        String[] subPartitionNames =
+            OracleUtilities.generateExportTableSubPartitionNames(numMappers,
+                sysDateTime);
+        // Create the export table from a template table...
+        String tableStorageClause =
+            OracleUtilities.getExportTableStorageClause(jobConfig);
+
+        OracleQueries.createExportTableFromTemplateWithPartitioning(
+            connection, table,
+            tableStorageClause, templateTableContext, noLoggingOnNewTable,
+            partitionName, sysDateTime, numMappers,
+            subPartitionNames);
+
+        createMapperTables = true;
+      } else if (OracleUtilities
+          .userWantsToCreateNonPartitionedExportTableFromTemplate(jobConfig)) {
+        /* ===================================== */
+        /* CREATE A NON-PARTITIONED TABLE */
+        /* ===================================== */
+        String tableStorageClause =
+            OracleUtilities.getExportTableStorageClause(jobConfig);
+
+        OracleQueries.createExportTableFromTemplate(connection,
+            table, tableStorageClause,
+            templateTableContext, noLoggingOnNewTable);
+      } else {
+        /* ===================================================== */
+        /* ADD ADDITIONAL PARTITIONS TO AN EXISTING TABLE */
+        /* ===================================================== */
+
+        // If the export table is partitioned, and the partitions were created by
+        // OraOop, then we need
+        // create additional partitions...
+
+        OracleTablePartitions tablePartitions =
+            OracleQueries.getPartitions(connection, table);
+        // Find any partition name starting with "ORAOOP_"...
+        OracleTablePartition oraOopPartition =
+            tablePartitions.findPartitionByRegEx("^"
+                + OracleJdbcConnectorConstants.
+                    EXPORT_TABLE_PARTITION_NAME_PREFIX);
+
+        if (tablePartitions.size() > 0 && oraOopPartition == null) {
+
+          for (int idx = 0; idx < tablePartitions.size(); idx++) {
+            LOG.info(String.format(
+                    "The Oracle table %s has a partition named \"%s\".",
+                    table.toString(),
+                    tablePartitions.get(idx).getName()));
+          }
+
+          LOG.warn(String.format(
+                  "The Oracle table %s is partitioned.\n"
+                      + "These partitions were not created by %s.",
+                  table.toString(),
+                  OracleJdbcConnectorConstants.CONNECTOR_NAME));
+        }
+
+        if (oraOopPartition != null) {
+
+          // Indicate in the configuration what's happening...
+          context.setBoolean(OracleJdbcConnectorConstants.
+              EXPORT_TABLE_HAS_SQOOP_PARTITIONS, true);
+
+          LOG.info(String.format(
+                          "The Oracle table %s is partitioned.\n"
+                              + "These partitions were created by %s, so "
+                              + "additional partitions will now be created.\n"
+                              + "The name of the new partition will be \"%s\".",
+                          table.toString(), OracleJdbcConnectorConstants.
+                          CONNECTOR_NAME, partitionName));
+
+          String[] subPartitionNames =
+              OracleUtilities.generateExportTableSubPartitionNames(numMappers,
+                  sysDateTime);
+
+          // Add another partition (and N subpartitions) to this existing,
+          // partitioned export table...
+          OracleQueries.createMoreExportTablePartitions(connection,
+              table, partitionName,
+              sysDateTime, subPartitionNames);
+
+          createMapperTables = true;
+        }
+      }
+
+      if(createMapperTables) {
+        createUniqueMapperTable(sysDateTime, numMappers, jobConfig);
+      }
+    }
+
+  private void createUniqueMapperTable(Object sysDateTime,
+      int numMappers, ToJobConfig jobConfig)
+      throws SQLException {
+
+    // Mappers insert data into a unique table before either:
+    // - exchanging it into a subpartition of the 'real' export table; or
+    // - merging it into the 'real' export table.
+
+    for (int i=0; i<numMappers; i++) {
+      OracleTable mapperTable =
+          OracleUtilities.generateExportTableMapperTableName(i,
+              sysDateTime, null);
+
+      // If this mapper is being reattempted in response to a failure, we need
+      // to delete the
+      // temporary table created by the previous attempt...
+      OracleQueries.dropTable(connection, mapperTable);
+
+      String temporaryTableStorageClause =
+          OracleUtilities.getTemporaryTableStorageClause(jobConfig);
+
+      OracleQueries.createExportTableForMapper(connection,
+          mapperTable, temporaryTableStorageClause, table
+          , false); // <- addOraOopPartitionColumns
+
+      LOG.debug(String.format("Created temporary mapper table %s", mapperTable
+          .toString()));
+    }
+  }
+
+  private void checkForOldOraOopTemporaryOracleTables(Connection connection,
+      Object sysDateTime, String schema) {
+
+    try {
+
+      StringBuilder message = new StringBuilder();
+      message
+        .append(String.format(
+          "The following tables appear to be old temporary tables created by "
+        + "%s that have not been deleted.\n"
+        + "They are probably left over from jobs that encountered an error and "
+        + "could not clean up after themselves.\n"
+        + "You might want to drop these Oracle tables in order to reclaim "
+        + "Oracle storage space:\n",
+        OracleJdbcConnectorConstants.CONNECTOR_NAME));
+      boolean showMessage = false;
+
+      String generatedTableName =
+          OracleUtilities.generateExportTableMapperTableName(0, sysDateTime,
+              schema).getName();
+      generatedTableName = generatedTableName.replaceAll("[0-9]", "%");
+      generatedTableName =
+          OracleUtilities.replaceAll(generatedTableName, "%%", "%");
+      Date sysDate = OracleQueries.oraDATEToDate(sysDateTime);
+
+      List<OracleTable> tables =
+          OracleQueries.getTablesWithTableNameLike(connection, schema,
+              generatedTableName);
+
+      for (OracleTable oracleTable : tables) {
+        OracleUtilities.DecodedExportMapperTableName tableName =
+            OracleUtilities.decodeExportTableMapperTableName(oracleTable);
+        if (tableName != null) {
+          Date tableDate =
+              OracleQueries.oraDATEToDate(tableName.getTableDateTime());
+          double daysApart =
+              (sysDate.getTime() - tableDate.getTime()) / (1000 * 60 * 60 * 24);
+          if (daysApart > 1.0) {
+            showMessage = true;
+            message.append(String.format("\t%s\n", oracleTable.toString()));
+          }
+        }
+      }
+
+      if (showMessage) {
+        LOG.info(message.toString());
+      }
+    } catch (Exception ex) {
+      LOG.warn(String.format(
+              "%s was unable to check for the existance of old "
+                  + "temporary Oracle tables.\n" + "Error:\n%s",
+              OracleJdbcConnectorConstants.CONNECTOR_NAME, ex.toString()));
+    }
+  }
+
+  private boolean isSqoopTableAnOracleTable(Connection connection,
+      String connectionUserName, OracleTable tableContext) {
+
+    String oracleObjectType;
+
+    try {
+
+      // Find the table via dba_tables...
+      OracleTable oracleTable =
+          OracleQueries.getTable(connection, tableContext.getSchema(),
+              tableContext.getName());
+      if (oracleTable != null) {
+        return true;
+      }
+
+      // If we could not find the table via dba_tables, then try and determine
+      // what type of database object the
+      // user was referring to. Perhaps they've specified the name of a view?...
+      oracleObjectType =
+          OracleQueries.getOracleObjectType(connection, tableContext);
+
+      if (oracleObjectType == null) {
+        LOG.info(String.format(
+            "%1$s will not process this Sqoop connection, "
+          + "as the Oracle user %2$s does not own a table named %3$s.\n"
+          + "\tPlease prefix the table name with the owner.\n "
+          + "\tNote: You may need to double-quote the owner and/or table name."
+          + "\n\tE.g. sqoop ... --username %4$s --table %2$s.%3$s\n",
+          OracleJdbcConnectorConstants.CONNECTOR_NAME, tableContext.getSchema(),
+          tableContext.getName(), connectionUserName));
+        return false;
+      }
+
+    } catch (SQLException ex) {
+      LOG.warn(String.format(
+        "Unable to determine the Oracle-type of the object named %s owned by "
+            + "%s.\nError:\n" + "%s", tableContext.getName(), tableContext
+            .getSchema(), ex.getMessage()));
+
+      // In the absence of conflicting information, let's assume the object is
+      // actually a table...
+      return true;
+    }
+
+    boolean result =
+        oracleObjectType.equalsIgnoreCase(
+            OracleJdbcConnectorConstants.Oracle.OBJECT_TYPE_TABLE);
+
+    if (!result) {
+      LOG.info(String.format("%s will not process this sqoop connection, "
+          + "as %s is not an Oracle table, it's a %s.",
+          OracleJdbcConnectorConstants.CONNECTOR_NAME, tableContext.toString(),
+          oracleObjectType));
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ConnectionConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ConnectionConfig.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ConnectionConfig.java
new file mode 100644
index 0000000..c355a77
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ConnectionConfig.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle.configuration;
+
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory;
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.validators.AbstractValidator;
+import org.apache.sqoop.validation.validators.StartsWith;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ *
+ */
+@ConfigClass(validators = {@Validator(ConnectionConfig.ConfigValidator.class)})
+public class ConnectionConfig {
+  @Input(size = 128, validators = {@Validator(value = StartsWith.class, strArg = "jdbc:")} )
+  public String connectionString;
+
+  @Input(size = 40)
+  public String username;
+
+  @Input(size = 40, sensitive = true)
+  public String password;
+
+  @Input
+  public Map<String, String> jdbcProperties;
+
+  @Input
+  public String timeZone;
+
+  @Input
+  public String actionName;
+
+  @Input
+  public Integer fetchSize;
+
+  @Input
+  public String initializationStatements;
+
+  @Input
+  public Boolean jdbcUrlVerbatim;
+
+  @Input
+  public String racServiceName;
+
+  public static class ConfigValidator extends AbstractValidator<ConnectionConfig> {
+    @Override
+    public void validate(ConnectionConfig linkConfig) {
+      // See if we can connect to the database
+      try {
+        OracleConnectionFactory.makeConnection(linkConfig);
+      } catch (SQLException e) {
+        addMessage(Status.WARNING, "Can't connect to the database with given credentials: " + e.getMessage());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfig.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfig.java
new file mode 100644
index 0000000..38c808f
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfig.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.validators.NotEmpty;
+
+/**
+ *
+ */
+@ConfigClass
+public class FromJobConfig {
+
+  @Input(size = 2000, validators = { @Validator(NotEmpty.class)})
+  public String tableName;
+
+  @Input
+  public Boolean consistentRead;
+
+  @Input
+  public Long consistentReadScn;
+
+  @Input(size = 2000)
+  public String partitionList;
+
+  @Input(size = 2000)
+  public String dataChunkMethod;
+
+  @Input(size = 2000)
+  public String dataChunkAllocationMethod;
+
+  @Input(size = 2000)
+  public String whereClauseLocation;
+
+  @Input
+  public Boolean omitLobColumns;
+
+  @Input
+  public String queryHint;
+
+  @Input(size = 2000)
+  public String conditions;
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfiguration.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfiguration.java
new file mode 100644
index 0000000..6a6c1aa
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfiguration.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Config;
+
+/**
+ *
+ */
+@ConfigurationClass
+public class FromJobConfiguration {
+  @Config public FromJobConfig fromJobConfig;
+
+  public FromJobConfiguration() {
+    fromJobConfig = new FromJobConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/LinkConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/LinkConfiguration.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/LinkConfiguration.java
new file mode 100644
index 0000000..990343b
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/LinkConfiguration.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Config;
+
+/**
+ *
+ */
+@ConfigurationClass
+public class LinkConfiguration {
+
+  @Config public ConnectionConfig connectionConfig;
+
+  public LinkConfiguration() {
+    connectionConfig = new ConnectionConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfig.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfig.java
new file mode 100644
index 0000000..939a87a
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfig.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.validators.NotEmpty;
+
+/**
+ *
+ */
+@ConfigClass
+public class ToJobConfig {
+
+  @Input(size = 2000, validators = { @Validator(NotEmpty.class)})
+  public String tableName;
+
+  @Input(size = 2000)
+  public String templateTable;
+
+  @Input
+  public Boolean partitioned;
+
+  @Input
+  public Boolean nologging;
+
+  @Input(size = 2000)
+  public String updateKey;
+
+  @Input
+  public Boolean updateMerge;
+
+  @Input
+  public Boolean dropTableIfExists;
+
+  @Input(size = 2000)
+  public String storageClause;
+
+  @Input(size = 2000)
+  public String temporaryStorageClause;
+
+  @Input(size = 2000)
+  public String appendValuesHint;
+
+  @Input
+  public Boolean parallel;
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfiguration.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfiguration.java
new file mode 100644
index 0000000..b34df1a
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfiguration.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Config;
+
+/**
+ *
+ */
+@ConfigurationClass
+public class ToJobConfiguration {
+  @Config public ToJobConfig toJobConfig;
+
+  public ToJobConfiguration() {
+    toJobConfig = new ToJobConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleActiveInstance.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleActiveInstance.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleActiveInstance.java
new file mode 100644
index 0000000..b46bce5
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleActiveInstance.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.jdbc.oracle.util;
+
+/**
+ * Wraps data from v$active_instances.
+ */
+public class OracleActiveInstance {
+
+  private String instanceName;
+  private String hostName;
+
+  public String getInstanceName() {
+    return instanceName;
+  }
+
+  public void setInstanceName(String newInstanceName) {
+    this.instanceName = newInstanceName;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public void setHostName(String newHostName) {
+    this.hostName = newHostName;
+  }
+}