You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/11/05 18:41:00 UTC

[6/6] sqoop git commit: SQOOP-2595: Add Oracle connector to Sqoop 2

SQOOP-2595: Add Oracle connector to Sqoop 2

(David Robson via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/fa3c77b6
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/fa3c77b6
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/fa3c77b6

Branch: refs/heads/sqoop2
Commit: fa3c77b6a8352f68ec429164f48aee00ae2480d8
Parents: 2a9ae31
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Thu Nov 5 09:40:31 2015 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Thu Nov 5 09:40:31 2015 -0800

----------------------------------------------------------------------
 connector/connector-oracle-jdbc/pom.xml         |  134 ++
 .../oracle/OracleJdbcCommonInitializer.java     |  477 +++++
 .../jdbc/oracle/OracleJdbcConnector.java        |   92 +
 .../oracle/OracleJdbcConnectorConstants.java    |  493 +++++
 .../oracle/OracleJdbcConnectorUpgrader.java     |   43 +
 .../jdbc/oracle/OracleJdbcExtractor.java        |  361 ++++
 .../jdbc/oracle/OracleJdbcFromDestroyer.java    |   36 +
 .../jdbc/oracle/OracleJdbcFromInitializer.java  |   90 +
 .../connector/jdbc/oracle/OracleJdbcLoader.java |  615 +++++++
 .../jdbc/oracle/OracleJdbcPartition.java        |  183 ++
 .../jdbc/oracle/OracleJdbcPartitioner.java      |  252 +++
 .../jdbc/oracle/OracleJdbcToDestroyer.java      |  273 +++
 .../jdbc/oracle/OracleJdbcToInitializer.java    |  498 +++++
 .../oracle/configuration/ConnectionConfig.java  |   78 +
 .../oracle/configuration/FromJobConfig.java     |   61 +
 .../configuration/FromJobConfiguration.java     |   33 +
 .../oracle/configuration/LinkConfiguration.java |   34 +
 .../jdbc/oracle/configuration/ToJobConfig.java  |   64 +
 .../configuration/ToJobConfiguration.java       |   33 +
 .../jdbc/oracle/util/OracleActiveInstance.java  |   44 +
 .../oracle/util/OracleConnectionFactory.java    |  246 +++
 .../jdbc/oracle/util/OracleDataChunk.java       |   48 +
 .../jdbc/oracle/util/OracleDataChunkExtent.java |  109 ++
 .../oracle/util/OracleDataChunkPartition.java   |   85 +
 .../jdbc/oracle/util/OracleGenerics.java        |   64 +
 .../jdbc/oracle/util/OracleJdbcUrl.java         |  244 +++
 .../jdbc/oracle/util/OracleQueries.java         | 1721 ++++++++++++++++++
 .../jdbc/oracle/util/OracleSqlTypesUtils.java   |  176 ++
 .../connector/jdbc/oracle/util/OracleTable.java |   68 +
 .../jdbc/oracle/util/OracleTableColumn.java     |   59 +
 .../jdbc/oracle/util/OracleTableColumns.java    |   43 +
 .../jdbc/oracle/util/OracleTablePartition.java  |   50 +
 .../jdbc/oracle/util/OracleTablePartitions.java |   62 +
 .../jdbc/oracle/util/OracleUtilities.java       | 1446 +++++++++++++++
 .../jdbc/oracle/util/OracleVersion.java         |   84 +
 .../oracle-jdbc-connector-config.properties     |  136 ++
 .../main/resources/sqoopconnector.properties    |   18 +
 .../jdbc/oracle/TestOracleJdbcPartitioner.java  |  102 ++
 .../jdbc/oracle/TestOracleJdbcUrl.java          |  249 +++
 .../connector/jdbc/oracle/TestOracleTable.java  |   42 +
 .../jdbc/oracle/TestOracleUtilities.java        |  613 +++++++
 .../OracleConnectionFactoryTest.java            |  497 +++++
 .../oracle/integration/OracleQueriesTest.java   |   49 +
 .../jdbc/oracle/integration/OracleTestCase.java |   41 +
 connector/pom.xml                               |    1 +
 pom.xml                                         |   11 +
 server/pom.xml                                  |    5 +
 47 files changed, 10163 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/pom.xml b/connector/connector-oracle-jdbc/pom.xml
new file mode 100644
index 0000000..325790d
--- /dev/null
+++ b/connector/connector-oracle-jdbc/pom.xml
@@ -0,0 +1,134 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.sqoop</groupId>
+    <artifactId>connector</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.sqoop.connector</groupId>
+  <artifactId>sqoop-connector-oracle-jdbc</artifactId>
+  <name>Sqoop Oracle JDBC Connector</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>connector-sdk</artifactId>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-common-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <finalName>sqoop</finalName>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <excludedGroups>oracle</excludedGroups>
+
+          <excludes>
+            <exclude>**/integration/**</exclude>
+          </excludes>
+        </configuration>
+        <executions>
+          <execution>
+            <id>integration-test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+            <phase>integration-test</phase>
+            <configuration>
+              <excludes>
+                <exclude>none</exclude>
+              </excludes>
+              <includes>
+                <include>**/integration/**</include>
+              </includes>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>jdbc-oracle</id>
+
+      <activation>
+        <property>
+          <name>jdbc.oracle</name>
+        </property>
+      </activation>
+
+      <dependencies>
+        <dependency>
+          <groupId>com.oracle</groupId>
+          <artifactId>ojdbc14</artifactId>
+        </dependency>
+      </dependencies>
+
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+
+            <configuration>
+              <excludedGroups>none</excludedGroups>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcCommonInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcCommonInitializer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcCommonInitializer.java
new file mode 100644
index 0000000..1fd95c0
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcCommonInitializer.java
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ConnectionConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleActiveInstance;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleJdbcUrl;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.JdbcOracleThinConnectionParsingError;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleVersion;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+
+public class OracleJdbcCommonInitializer<JobConfiguration> extends Initializer<LinkConfiguration, JobConfiguration> {
+
+  private static final Logger LOG =
+      Logger.getLogger(OracleJdbcCommonInitializer.class);
+
+  protected Connection connection;
+  protected OracleTable table;
+  protected int numMappers = 8;
+
+  public void connect(InitializerContext context,
+      LinkConfiguration linkConfiguration,
+      JobConfiguration jobConfiguration) throws SQLException {
+    connection = OracleConnectionFactory.makeConnection(
+        linkConfiguration.connectionConfig);
+  }
+
+  @Override
+  public void initialize(InitializerContext context,
+      LinkConfiguration linkConfiguration,
+      JobConfiguration jobConfiguration) {
+    showUserTheOraOopWelcomeMessage();
+
+    try {
+      connect(context, linkConfiguration, jobConfiguration);
+    } catch (SQLException ex) {
+      throw new RuntimeException(String.format(
+          "Unable to connect to the Oracle database at %s\nError:%s",
+          linkConfiguration.connectionConfig.connectionString, ex
+              .getMessage()), ex);
+    }
+
+    // Generate the "action" name that we'll assign to our Oracle sessions
+    // so that the user knows which Oracle sessions belong to OraOop...
+    //TODO: Get the job name
+    context.getContext().setString(
+        OracleJdbcConnectorConstants.ORACLE_SESSION_ACTION_NAME,
+        getOracleSessionActionName(
+            linkConfiguration.connectionConfig.username));
+
+    //TODO: Don't think this can be done anymore
+    //OraOopUtilities.appendJavaSecurityEgd(sqoopOptions.getConf());
+
+    // Get the Oracle database version...
+    try {
+      OracleVersion oracleVersion =
+          OracleQueries.getOracleVersion(connection);
+      LOG.info(String.format("Oracle Database version: %s",
+          oracleVersion.getBanner()));
+    } catch (SQLException ex) {
+      LOG.error("Unable to obtain the Oracle database version.", ex);
+    }
+
+    // Generate the JDBC URLs to be used by each mapper...
+    setMapperConnectionDetails(linkConfiguration.connectionConfig,
+        context.getContext());
+
+    // Show the user the Oracle command that can be used to kill this
+    // OraOop
+    // job via Oracle...
+    showUserTheOracleCommandToKillOraOop(context.getContext());
+  }
+
+  @Override
+  public Schema getSchema(InitializerContext context,
+      LinkConfiguration linkConfiguration,
+      JobConfiguration jobConfiguration) {
+    try {
+      connect(context, linkConfiguration, jobConfiguration);
+    } catch (SQLException ex) {
+      throw new RuntimeException(String.format(
+          "Unable to connect to the Oracle database at %s\n"
+              + "Error:%s", linkConfiguration.connectionConfig.connectionString,
+              ex.getMessage()), ex);
+    }
+
+    Schema schema = new Schema(table.toString());
+
+    try {
+      List<String> colNames = OracleQueries.getToTableColumnNames(
+          connection, table, true, true);
+
+      List<Column> columnTypes =
+            OracleQueries.getColDataTypes(connection, table, colNames);
+
+      for(Column column : columnTypes) {
+        schema.addColumn(column);
+      }
+
+      return schema;
+    } catch(Exception e) {
+      throw new RuntimeException(
+          "Could not determine columns in Oracle Table.", e);
+    }
+  }
+
+  private void showUserTheOraOopWelcomeMessage() {
+
+    String msg1 =
+        String.format("Using %s",
+            OracleJdbcConnectorConstants.ORACLE_SESSION_MODULE_NAME);
+
+    int longestMessage = msg1.length();
+
+    msg1 = StringUtils.rightPad(msg1, longestMessage);
+
+    char[] asterisks = new char[longestMessage + 8];
+    Arrays.fill(asterisks, '*');
+
+    String msg =
+        String.format("\n" + "%1$s\n" + "*** %2$s ***\n" + "%1$s", new String(
+            asterisks), msg1);
+    LOG.info(msg);
+  }
+
+  private String getOracleSessionActionName(String jobName) {
+
+    String timeStr =
+        (new SimpleDateFormat("yyyyMMddHHmmsszzz")).format(new Date());
+
+    String result = String.format("%s %s", jobName, timeStr);
+
+    // NOTE: The "action" column of v$session is only a 32 character column.
+    // Therefore we need to ensure that the string returned by this
+    // method does not exceed 32 characters...
+    if (result.length() > 32) {
+      result = result.substring(0, 32).trim();
+    }
+
+    return result;
+  }
+
+  private void setMapperConnectionDetails(ConnectionConfig connectionConfig,
+      MutableContext context) {
+
+    // Query v$active_instances to get a list of all instances in the Oracle RAC
+    // (assuming this *could* be a RAC)...
+    List<OracleActiveInstance> activeInstances = null;
+    try {
+      activeInstances =
+          OracleQueries.getOracleActiveInstances(connection);
+    } catch (SQLException ex) {
+      throw new RuntimeException(
+          "An error was encountered when attempting to determine the "
+        + "configuration of the Oracle RAC.",
+        ex);
+    }
+
+    if (activeInstances == null) {
+      LOG.info("This Oracle database is not a RAC.");
+    } else {
+      LOG.info("This Oracle database is a RAC.");
+    }
+
+    // Is dynamic JDBC URL generation disabled?...
+    if (OracleUtilities.oracleJdbcUrlGenerationDisabled(connectionConfig)) {
+      LOG.info(String
+          .format(
+              "%s will not use dynamically generated JDBC URLs - this feature "
+            + "has been disabled.",
+            OracleJdbcConnectorConstants.CONNECTOR_NAME));
+      return;
+    }
+
+    boolean generateRacBasedJdbcUrls = false;
+
+    // Decide whether this is a multi-instance RAC, and whether we need to do
+    // anything more...
+    if (activeInstances != null) {
+      generateRacBasedJdbcUrls = true;
+
+      if (activeInstances.size() < OracleJdbcConnectorConstants.
+          MIN_NUM_RAC_ACTIVE_INSTANCES_FOR_DYNAMIC_JDBC_URLS) {
+        LOG.info(String.format(
+            "There are only %d active instances in the Oracle RAC. "
+          + "%s will not bother utilizing dynamically generated JDBC URLs.",
+          activeInstances.size(),
+          OracleJdbcConnectorConstants.CONNECTOR_NAME));
+        generateRacBasedJdbcUrls = false;
+      }
+    }
+
+    // E.g. jdbc:oracle:thin:@localhost.localdomain:1521:orcl
+    String jdbcConnectStr = connectionConfig.connectionString;
+
+    // Parse the JDBC URL to obtain the port number for the TNS listener...
+    String jdbcHost = "";
+    int jdbcPort = 0;
+    String jdbcSid = "";
+    String jdbcService = "";
+    String jdbcTnsName = "";
+    try {
+
+      OracleJdbcUrl oraOopJdbcUrl = new OracleJdbcUrl(jdbcConnectStr);
+      OracleUtilities.JdbcOracleThinConnection jdbcConnection =
+          oraOopJdbcUrl.parseJdbcOracleThinConnectionString();
+      jdbcHost = jdbcConnection.getHost();
+      jdbcPort = jdbcConnection.getPort();
+      jdbcSid = jdbcConnection.getSid();
+      jdbcService = jdbcConnection.getService();
+      jdbcTnsName = jdbcConnection.getTnsName();
+    } catch (JdbcOracleThinConnectionParsingError ex) {
+      LOG.info(String.format(
+          "Unable to parse the JDBC connection URL \"%s\" as a connection "
+        + "that uses the Oracle 'thin' JDBC driver.\n"
+        + "This problem prevents %s from being able to dynamically generate "
+        + "JDBC URLs that specify 'dedicated server connections' or spread "
+        + "mapper sessions across multiple Oracle instances.\n"
+        + "If the JDBC driver-type is 'OCI' (instead of 'thin'), then "
+        + "load-balancing should be appropriately managed automatically.",
+              jdbcConnectStr, OracleJdbcConnectorConstants.CONNECTOR_NAME, ex));
+      return;
+    }
+
+    if (generateRacBasedJdbcUrls) {
+
+      // Retrieve the Oracle service name to use when connecting to the RAC...
+      String oracleServiceName = connectionConfig.racServiceName;
+
+      // Generate JDBC URLs for each of the mappers...
+      if (!oracleServiceName.isEmpty()) {
+        if (!generateRacJdbcConnectionUrlsByServiceName(jdbcHost, jdbcPort,
+            oracleServiceName, connectionConfig, context)) {
+          throw new RuntimeException(String.format(
+              "Unable to connect to the Oracle database at %s "
+                  + "via the service name \"%s\".", jdbcConnectStr,
+                  oracleServiceName));
+        }
+      } else {
+        generateJdbcConnectionUrlsByActiveInstance(activeInstances, jdbcPort,
+            connectionConfig, context);
+      }
+    } else {
+      generateJdbcConnectionUrlsByTnsnameSidOrService(jdbcHost, jdbcPort,
+        jdbcSid, jdbcService, jdbcTnsName, connectionConfig, context);
+    }
+
+  }
+
+  private boolean generateRacJdbcConnectionUrlsByServiceName(String hostName,
+      int port, String serviceName, ConnectionConfig connectionConfig,
+      MutableContext context) {
+
+    boolean result = false;
+    String jdbcUrl =
+        OracleUtilities.generateOracleServiceNameJdbcUrl(hostName, port,
+            serviceName);
+
+    if (testDynamicallyGeneratedOracleRacInstanceConnection(jdbcUrl,
+        connectionConfig.username, connectionConfig.password,
+        connectionConfig.jdbcProperties
+        , false // <- ShowInstanceSysTimestamp
+        , "" // <- instanceDescription
+    )) {
+
+      LOG.info(String.format(
+          "%s will load-balance sessions across the Oracle RAC instances "
+              + "by connecting each mapper to the Oracle Service \"%s\".",
+          OracleJdbcConnectorConstants.CONNECTOR_NAME, serviceName));
+
+      // Now store these connection strings in such a way that each mapper knows
+      // which one to use...
+      for (int idxMapper = 0; idxMapper < numMappers; idxMapper++) {
+        storeJdbcUrlForMapper(idxMapper, jdbcUrl, context);
+      }
+      result = true;
+    }
+    return result;
+  }
+
+  private void generateJdbcConnectionUrlsByTnsnameSidOrService(String hostName,
+      int port, String sid, String serviceName, String tnsName,
+      ConnectionConfig connectionConfig, MutableContext context) {
+
+      String jdbcUrl = null;
+      if (tnsName != null && !tnsName.isEmpty()) {
+        jdbcUrl = OracleUtilities.generateOracleTnsNameJdbcUrl(tnsName);
+      } else if (sid != null && !sid.isEmpty()) {
+        jdbcUrl = OracleUtilities.generateOracleSidJdbcUrl(hostName, port, sid);
+      } else {
+        jdbcUrl =
+            OracleUtilities.generateOracleServiceNameJdbcUrl(hostName, port,
+                serviceName);
+      }
+
+      // Now store these connection strings in such a way that each mapper knows
+      // which one to use...
+      for (int idxMapper = 0; idxMapper < numMappers; idxMapper++) {
+        storeJdbcUrlForMapper(idxMapper, jdbcUrl, context);
+      }
+    }
+
+    private void
+        generateJdbcConnectionUrlsByActiveInstance(
+            List<OracleActiveInstance> activeInstances, int jdbcPort,
+            ConnectionConfig connectionConfig, MutableContext context) {
+
+      // Generate JDBC URLs for each of the instances in the RAC...
+      ArrayList<OracleUtilities.JdbcOracleThinConnection>
+          jdbcOracleActiveThinConnections =
+              new ArrayList<OracleUtilities.JdbcOracleThinConnection>(
+                  activeInstances.size());
+
+      for (OracleActiveInstance activeInstance : activeInstances) {
+
+        OracleUtilities.JdbcOracleThinConnection
+            jdbcActiveInstanceThinConnection =
+                new OracleUtilities.JdbcOracleThinConnection(
+                    activeInstance.getHostName(),
+                    jdbcPort, activeInstance.getInstanceName(), "", "");
+
+        if (testDynamicallyGeneratedOracleRacInstanceConnection(
+            jdbcActiveInstanceThinConnection.toString(),
+            connectionConfig.username,
+            connectionConfig.password, connectionConfig.jdbcProperties,
+            true, activeInstance.getInstanceName())) {
+          jdbcOracleActiveThinConnections.add(jdbcActiveInstanceThinConnection);
+        }
+      }
+
+      // If there are multiple JDBC URLs that work okay for the RAC, then we'll
+      // make use of them...
+      if (jdbcOracleActiveThinConnections.size() < OracleJdbcConnectorConstants.
+          MIN_NUM_RAC_ACTIVE_INSTANCES_FOR_DYNAMIC_JDBC_URLS) {
+        LOG.info(String
+            .format(
+                "%s will not attempt to load-balance sessions across instances "
+              + "of an Oracle RAC - as multiple JDBC URLs to the "
+              + "Oracle RAC could not be dynamically generated.",
+              OracleJdbcConnectorConstants.CONNECTOR_NAME));
+        return;
+      } else {
+        StringBuilder msg = new StringBuilder();
+        msg.append(String
+            .format(
+                "%s will load-balance sessions across the following instances of"
+              + "the Oracle RAC:\n",
+              OracleJdbcConnectorConstants.CONNECTOR_NAME));
+
+        for (OracleUtilities.JdbcOracleThinConnection thinConnection
+                 : jdbcOracleActiveThinConnections) {
+          msg.append(String.format("\tInstance: %s \t URL: %s\n",
+              thinConnection.getSid(), thinConnection.toString()));
+        }
+        LOG.info(msg.toString());
+      }
+
+      // Now store these connection strings in such a way that each mapper knows
+      // which one to use...
+      int racInstanceIdx = 0;
+      OracleUtilities.JdbcOracleThinConnection thinUrl;
+      for (int idxMapper = 0; idxMapper < numMappers; idxMapper++) {
+        if (racInstanceIdx > jdbcOracleActiveThinConnections.size() - 1) {
+          racInstanceIdx = 0;
+        }
+        thinUrl = jdbcOracleActiveThinConnections.get(racInstanceIdx);
+        racInstanceIdx++;
+        storeJdbcUrlForMapper(idxMapper, thinUrl.toString(), context);
+      }
+    }
+
+    private void storeJdbcUrlForMapper(int mapperIdx, String jdbcUrl,
+        MutableContext context) {
+
+      // Now store these connection strings in such a way that each mapper knows
+      // which one to use...
+      String mapperJdbcUrlPropertyName =
+          OracleUtilities.getMapperJdbcUrlPropertyName(mapperIdx);
+      LOG.debug("Setting mapper url " + mapperJdbcUrlPropertyName + " = "
+        + jdbcUrl);
+      context.setString(mapperJdbcUrlPropertyName, jdbcUrl);
+    }
+
+    private boolean testDynamicallyGeneratedOracleRacInstanceConnection(
+        String url, String userName, String password,
+        Map<String, String> jdbcProperties,
+        boolean showInstanceSysTimestamp, String instanceDescription) {
+
+      boolean result = false;
+
+      // Test the connection...
+      try {
+        Properties additionalProps = new Properties();
+        if(jdbcProperties != null) {
+          additionalProps.putAll(jdbcProperties);
+        }
+        Connection testConnection =
+            OracleConnectionFactory.createOracleJdbcConnection(
+                OracleJdbcConnectorConstants.ORACLE_JDBC_DRIVER_CLASS,
+                url, userName, password, additionalProps);
+
+        // Show the system time on each instance...
+        if (showInstanceSysTimestamp) {
+          LOG.info(String.format("\tDatabase time on %s is %s",
+              instanceDescription, OracleQueries
+                  .getSysTimeStamp(testConnection)));
+        }
+
+        testConnection.close();
+        result = true;
+      } catch (SQLException ex) {
+        LOG.warn(
+            String
+                .format(
+                    "The dynamically generated JDBC URL \"%s\" was unable to "
+                    + "connect to an instance in the Oracle RAC.",
+                    url), ex);
+      }
+
+      return result;
+    }
+
+    private void showUserTheOracleCommandToKillOraOop(MutableContext context) {
+
+      String moduleName =
+          OracleJdbcConnectorConstants.ORACLE_SESSION_MODULE_NAME;
+      String actionName = context.getString(
+          OracleJdbcConnectorConstants.ORACLE_SESSION_ACTION_NAME);
+
+      String msg = String.format(
+          "\nNote: This %s job can be killed via Oracle by executing the "
+        + "following statement:\n\tbegin\n"
+        + "\t\tfor row in (select sid,serial# from v$session where module='%s' "
+        + "and action='%s') loop\n"
+        + "\t\t\texecute immediate 'alter system kill session ''' || row.sid || "
+        + "',' || row.serial# || '''';\n"
+        + "\t\tend loop;\n" + "\tend;",
+        OracleJdbcConnectorConstants.CONNECTOR_NAME, moduleName, actionName);
+      LOG.info(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnector.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnector.java
new file mode 100644
index 0000000..ae0b9dc
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnector.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.VersionInfo;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
+
+public class OracleJdbcConnector extends SqoopConnector {
+
+  private static final To TO = new To(
+      OracleJdbcToInitializer.class,
+      OracleJdbcLoader.class,
+      OracleJdbcToDestroyer.class);
+
+  private static final From FROM = new From(
+      OracleJdbcFromInitializer.class,
+      OracleJdbcPartitioner.class,
+      OracleJdbcPartition.class,
+      OracleJdbcExtractor.class,
+      OracleJdbcFromDestroyer.class);
+
+  @Override
+  public String getVersion() {
+    return VersionInfo.getBuildVersion();
+  }
+
+  @Override
+  public ResourceBundle getBundle(Locale locale) {
+    return ResourceBundle.getBundle(
+        OracleJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Class getLinkConfigurationClass() {
+    return LinkConfiguration.class;
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Class getJobConfigurationClass(Direction jobType) {
+    switch (jobType) {
+      case FROM:
+        return FromJobConfiguration.class;
+      case TO:
+        return ToJobConfiguration.class;
+      default:
+        return null;
+    }
+  }
+
+  @Override
+  public From getFrom() {
+    return FROM;
+  }
+
+  @Override
+  public To getTo() {
+    return TO;
+  }
+
+  @Override
+  public ConnectorConfigurableUpgrader getConfigurableUpgrader(String oldConnectorVersion) {
+    return new OracleJdbcConnectorUpgrader();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorConstants.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorConstants.java
new file mode 100644
index 0000000..2215cf3
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorConstants.java
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+public final class OracleJdbcConnectorConstants {
+
+  // Resource bundle name
+  public static final String RESOURCE_BUNDLE_NAME =
+      "oracle-jdbc-connector-config";
+
+  public static final String CONNECTOR_NAME = "Sqoop Oracle Connector";
+
+  // The string we want to pass to dbms_application_info.set_module() via the
+  // "module_name" parameter...
+  public static final String ORACLE_SESSION_MODULE_NAME = CONNECTOR_NAME;
+
+  public static final String ORACLE_SESSION_ACTION_NAME =
+      "oracle.session.module.action";
+
+  //How many rows to pre-fetch when executing Oracle queries...
+  public static final int ORACLE_ROW_FETCH_SIZE_DEFAULT = 5000;
+
+  // The name of the Oracle JDBC class...
+  public static final String ORACLE_JDBC_DRIVER_CLASS =
+      "oracle.jdbc.OracleDriver";
+
+  public static final String ORACLE_SESSION_INITIALIZATION_STATEMENTS_DEFAULT =
+      "alter session disable parallel query;" +
+      "alter session set \"_serial_direct_read\"=true;" +
+      "alter session set tracefile_identifier=oraoop;" +
+      "--alter session set events '10046 trace name context forever, level 8';";
+
+
+  /////////////////////////////////////////////////////////////////////
+
+//  // Whether to log Oracle session statistics using Guy Harrison's jar file...
+//  public static final String ORAOOP_REPORT_SESSION_STATISTICS =
+//      "oraoop.report.session.statistics";
+//
+//  // Disables dynamic JDBC URL generation for each mapper...
+//  public static final String ORAOOP_JDBC_URL_VERBATIM =
+//      "oraoop.jdbc.url.verbatim";
+//
+//  // The name of the Oracle RAC service each mapper should connect to, via their
+//  // dynamically generated JDBC URL...
+//  public static final String ORAOOP_ORACLE_RAC_SERVICE_NAME =
+//      "oraoop.oracle.rac.service.name";
+//
+//  // The log4j log-level for OraOop...
+//  public static final String ORAOOP_LOGGING_LEVEL = "oraoop.logging.level";
+//
+//  // The file names for the configuration properties of OraOop...
+//  public static final String ORAOOP_SITE_TEMPLATE_FILENAME =
+//      "oraoop-site-template.xml";
+//  public static final String ORAOOP_SITE_FILENAME = "oraoop-site.xml";
+//
+//  // A flag that indicates that the OraOop job has been cancelled.
+//  // E.g. An Oracle DBA killed our Oracle session.
+//  // public static final String ORAOOP_JOB_CANCELLED = "oraoop.job.cancelled";
+//
+  // The SYSDATE from the Oracle database when this OraOop job was started.
+  // This is used to generate unique names for partitions and temporary tables
+  // that we create during the job...
+  public static final String SQOOP_ORACLE_JOB_SYSDATE =
+      "sqoop.oracle.job.sysdate";
+//
+//  // The properties are used internally by OraOop to indicate the schema and
+//  // name of
+//  // the table being imported/exported...
+//  public static final String ORAOOP_TABLE_OWNER = "oraoop.table.owner";
+//  public static final String ORAOOP_TABLE_NAME = "oraoop.table.name";
+//
+//  // Constants used to indicate the desired location of the WHERE clause within
+//  // the SQL generated by the record-reader.
+//  // E.g. A WHERE clause like "rownum <= 10" would want to be located so that
+//  // it had an impact on the total number of rows returned by the split;
+//  // as opposed to impacting the number of rows returned for each of the
+//  // unioned data-chunks within each split.
+//  public static final String ORAOOP_TABLE_IMPORT_WHERE_CLAUSE_LOCATION =
+//      "oraoop.table.import.where.clause.location";
+//
+//
+//  // Reliably stores the number mappers requested for the sqoop map-reduce
+//  // job...
+//  public static final String ORAOOP_DESIRED_NUMBER_OF_MAPPERS =
+//      "oraoop.desired.num.mappers";
+//
+//  // The minimum number of mappers required for OraOop to accept the import
+//  // job...
+//  public static final String ORAOOP_MIN_IMPORT_MAPPERS =
+//      "oraoop.min.import.mappers";
+//  public static final int MIN_NUM_IMPORT_MAPPERS_ACCEPTED_BY_ORAOOP = 2;
+//
+//  // The minimum number of mappers required for OraOop to accept the export
+//  // job...
+//  public static final String ORAOOP_MIN_EXPORT_MAPPERS =
+//      "oraoop.min.export.mappers";
+//  public static final int MIN_NUM_EXPORT_MAPPERS_ACCEPTED_BY_ORAOOP = 2;
+//
+//  // The query used to fetch oracle data chunks...
+//  public static final String ORAOOP_ORACLE_DATA_CHUNKS_QUERY =
+//      "oraoop.oracle.data.chunks.query";
+//
+  // The minimum number of active instances in an Oracle RAC required for OraOop
+  // to use dynamically generated JDBC URLs...
+  public static final int MIN_NUM_RAC_ACTIVE_INSTANCES_FOR_DYNAMIC_JDBC_URLS =
+      2;
+//
+//
+//
+//  // OraOop does not require a "--split-by" column to be defined...
+//  public static final String TABLE_SPLIT_COLUMN_NOT_REQUIRED = "not-required";
+//
+  // The name of the data_chunk_id column the OraOop appends to each (import)
+  // query...
+  public static final String COLUMN_NAME_DATA_CHUNK_ID = "data_chunk_id";
+//
+//  // The hint that will be used on the SELECT statement for import jobs
+//  public static final String IMPORT_QUERY_HINT = "oraoop.import.hint";
+//
+//  // Pseudo-columns added to an partitioned export table (created by OraOop from
+//  // a template table)
+//  // to store the partition value and subpartition value. The partition value is
+//  // the sysdate when
+//  // the job was performed. The subpartition value is the mapper index...
+  public static final String COLUMN_NAME_EXPORT_PARTITION =
+      "SQOOP_EXPORT_SYSDATE";
+  public static final String COLUMN_NAME_EXPORT_SUBPARTITION =
+      "SQOOP_MAPPER_ID";
+  public static final String COLUMN_NAME_EXPORT_MAPPER_ROW =
+      "SQOOP_MAPPER_ROW";
+
+  public static final String ORAOOP_EXPORT_PARTITION_DATE_VALUE =
+      "oraoop.export.partition.date.value";
+  public static final String ORAOOP_EXPORT_PARTITION_DATE_FORMAT =
+      "yyyy-mm-dd hh24:mi:ss";
+//
+//
+//
+//  // Boolean whether to do a consistent read based off an SCN
+//  public static final String ORAOOP_IMPORT_CONSISTENT_READ =
+//      "oraoop.import.consistent.read";
+//
+  // The SCN number to use for the consistent read
+  public static final String ORACLE_IMPORT_CONSISTENT_READ_SCN =
+      "oracle.import.consistent.read.scn";
+//
+//  // The method that will be used to create data chunks - ROWID ranges or
+//  // partitions
+//  public static final String ORAOOP_ORACLE_DATA_CHUNK_METHOD =
+//      "oraoop.chunk.method";
+
+//  // List of partitions to be imported, comma seperated list
+//  public static final String ORAOOP_IMPORT_PARTITION_LIST =
+//      "oraoop.import.partitions";
+//
+//  public static final OraOopOracleDataChunkMethod
+//                          ORAOOP_ORACLE_DATA_CHUNK_METHOD_DEFAULT =
+//                              OraOopOracleDataChunkMethod.ROWID;
+//
+//  // How to allocate data-chunks into splits...
+//  public static final String ORAOOP_ORACLE_BLOCK_TO_SPLIT_ALLOCATION_METHOD =
+//      "oraoop.block.allocation";
+//
+
+//
+//  // Whether to omit LOB and LONG columns during an import...
+//  public static final String ORAOOP_IMPORT_OMIT_LOBS_AND_LONG =
+//      "oraoop.import.omit.lobs.and.long";
+//
+//  // Identifies an existing Oracle table used to create a new table as the
+//  // destination of a Sqoop export.
+//  // Hence, use of this property implies that the "-table" does not exist in
+//  // Oracle and OraOop should create it.
+//  public static final String ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE =
+//      "oraoop.template.table";
+//
+//  // If the table already exists that we want to create, should we drop it?...
+//  public static final String ORAOOP_EXPORT_CREATE_TABLE_DROP =
+//      "oraoop.drop.table";
+//
+//  // If ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE has been specified, then this flag
+//  // indicates whether the created Oracle
+//  // tables should have NOLOGGING...
+//  public static final String ORAOOP_EXPORT_CREATE_TABLE_NO_LOGGING =
+//      "oraoop.no.logging";
+//
+//  // If ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE has been specified, then this flag
+//  // indicates whether the created Oracle
+//  // tables should be partitioned by job and mapper...
+//  public static final String ORAOOP_EXPORT_CREATE_TABLE_PARTITIONED =
+//      "oraoop.partitioned";
+//
+  // Indicates (internally) the the export table we're dealling with has been
+  // paritioned by Sqoop...
+  public static final String EXPORT_TABLE_HAS_SQOOP_PARTITIONS =
+      "sqoop.export.table.has.sqoop.partitions";
+
+  // When using the Oracle hint... /* +APPEND_VALUES */ ...a commit must be
+  // performed after each batch insert.
+  // Therefore, the batches need to be quite large to avoid a performance
+  // penality (for the 'extra' commits).
+  // This is the minimum batch size to use under these conditions...
+//  public static final String ORAOOP_MIN_APPEND_VALUES_BATCH_SIZE =
+//      "oraoop.min.append.values.batch.size";
+  public static final int MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT = 5000;
+//
+//  // The version of the Oracle database we're connected to...
+//  public static final String ORAOOP_ORACLE_DATABASE_VERSION_MAJOR =
+//      "oraoop.oracle.database.version.major";
+//  public static final String ORAOOP_ORACLE_DATABASE_VERSION_MINOR =
+//      "oraoop.oracle.database.version.minor";
+//
+//  // When OraOop creates a table for a Sqoop export (from a template table) and
+//  // the table contains partitions,
+//  // this is the prefix of those partition names. (This also allows us to later
+//  // identify partitions that OraOop
+//  // created.)
+  public static final String EXPORT_TABLE_PARTITION_NAME_PREFIX = "SQOOP_";
+
+  // When OraOop creates temporary tables for each mapper during a Sqoop export
+  // this is the prefix of table names...
+  public static final String EXPORT_MAPPER_TABLE_NAME_PREFIX = "SQOOP_";
+
+  // The format string used to turn a DATE into a string for use within the
+  // names of Oracle objects
+  // that we create. For example, temporary tables, table partitions, table
+  // subpartitions...
+  public static final String ORACLE_OBJECT_NAME_DATE_TO_STRING_FORMAT_STRING =
+      "yyyymmdd_hh24miss";
+
+//  // Indicates whether to perform a "merge" operation when performing a Sqoop
+//  // export.
+//  // If false, 'insert' statements will be used (i.e. no 'updates')...
+//  public static final String ORAOOP_EXPORT_MERGE = "oraoop.export.merge";
+//
+//  // This property allows the user to enable parallelization during exports...
+//  public static final String ORAOOP_EXPORT_PARALLEL =
+//      "oraoop.export.oracle.parallelization.enabled";
+//
+//  // Flag used to indicate that the Oracle table contains at least one column of
+//  // type BINARY_DOUBLE...
+//  public static final String TABLE_CONTAINS_BINARY_DOUBLE_COLUMN =
+//      "oraoop.table.contains.binary.double.column";
+//  // Flag used to indicate that the Oracle table contains at least one column of
+//  // type BINARY_FLOAT...
+//  public static final String TABLE_CONTAINS_BINARY_FLOAT_COLUMN =
+//      "oraoop.table.contains.binary.float.column";
+//
+//  // The storage clause to append to the end of any CREATE TABLE statements we
+//  // execute for temporary Oracle tables...
+//  public static final String ORAOOP_TEMPORARY_TABLE_STORAGE_CLAUSE =
+//      "oraoop.temporary.table.storage.clause";
+//
+//  // The storage clause to append to the end of any CREATE TABLE statements we
+//  // execute for permanent (export) Oracle tables...
+//  public static final String ORAOOP_EXPORT_TABLE_STORAGE_CLAUSE =
+//      "oraoop.table.storage.clause";
+//
+//  // Additional columns to include with the --update-key column...
+//  public static final String ORAOOP_UPDATE_KEY_EXTRA_COLUMNS =
+//      "oraoop.update.key.extra.columns";
+//
+//  // Should OraOop map Timestamps as java.sql.Timestamp as Sqoop does, or as
+//  // String
+//  public static final String ORAOOP_MAP_TIMESTAMP_AS_STRING =
+//      "oraoop.timestamp.string";
+//  public static final boolean ORAOOP_MAP_TIMESTAMP_AS_STRING_DEFAULT = true;
+//
+//  // This flag allows the user to force use of the APPEND_VALUES Oracle hint
+//  // either ON, OFF or AUTO...
+//  public static final String ORAOOP_ORACLE_APPEND_VALUES_HINT_USAGE =
+//      "oraoop.oracle.append.values.hint.usage";
+//
+  /**
+   * Whether to use the append values hint for exports.
+   */
+  public enum AppendValuesHintUsage {
+    AUTO, ON, OFF
+  }
+
+  // http://download.oracle.com/docs/cd/E11882_01/server.112/e17118/
+  //     sql_elements001.htm#i45441
+  public static final String SUPPORTED_IMPORT_ORACLE_DATA_TYPES_CLAUSE =
+      "(DATA_TYPE IN ("
+          +
+          // "'BFILE',"+
+          "'BINARY_DOUBLE',"
+          + "'BINARY_FLOAT',"
+          + "'BLOB',"
+          + "'CHAR',"
+          + "'CLOB',"
+          + "'DATE',"
+          + "'FLOAT',"
+          + "'LONG',"
+          +
+          // "'LONG RAW',"+
+          // "'MLSLABEL',"+
+          "'NCHAR',"
+          + "'NCLOB',"
+          + "'NUMBER',"
+          + "'NVARCHAR2',"
+          + "'RAW',"
+          + "'ROWID',"
+          +
+          // "'UNDEFINED',"+
+          "'URITYPE',"
+          +
+          // "'UROWID',"+ //<- SqlType = 1111 = "OTHER" Not supported as
+          // "AAAAACAADAAAAAEAAF" is being returned as "AAAAAAgADAAAA"
+          "'VARCHAR2'"
+          + // <- Columns declared as VARCHAR are listed as VARCHAR2 in
+            // dba_tabl_columns
+          // "'XMLTYPE',"+
+          ")" + " OR DATA_TYPE LIKE 'INTERVAL YEAR(%) TO MONTH'"
+          + " OR DATA_TYPE LIKE 'INTERVAL DAY(%) TO SECOND(%)'"
+          + " OR DATA_TYPE LIKE 'TIMESTAMP(%)'"
+          + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH TIME ZONE'"
+          + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH LOCAL TIME ZONE'" + ")";
+
+  public static final String SUPPORTED_EXPORT_ORACLE_DATA_TYPES_CLAUSE =
+      "(DATA_TYPE IN ("
+          +
+          // "'BFILE',"+
+          "'BINARY_DOUBLE',"
+          + "'BINARY_FLOAT',"
+          +
+          // "'BLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data
+          "'CHAR',"
+          +
+          // "'CLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data
+          "'DATE',"
+          + "'FLOAT',"
+          +
+          // "'LONG',"+ //<- "create table as select..." and
+          // "insert into table as select..." do not work when a long column
+          // exists.
+          // "'LONG RAW',"+
+          // "'MLSLABEL',"+
+          "'NCHAR',"
+          +
+          // "'NCLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data
+          "'NUMBER',"
+          + "'NVARCHAR2',"
+          +
+          // "'RAW',"+
+          "'ROWID',"
+          +
+          // "'UNDEFINED',"+
+          "'URITYPE',"
+          +
+          // "'UROWID',"+ //<- SqlType = 1111 = "OTHER" Not supported as
+          // "AAAAACAADAAAAAEAAF" is being returned as "AAAAAAgADAAAA"
+          "'VARCHAR2'"
+          + // <- Columns declared as VARCHAR are listed as VARCHAR2 in
+            // dba_tabl_columns
+          // "'XMLTYPE',"+
+          ")" + " OR DATA_TYPE LIKE 'INTERVAL YEAR(%) TO MONTH'"
+          + " OR DATA_TYPE LIKE 'INTERVAL DAY(%) TO SECOND(%)'"
+          + " OR DATA_TYPE LIKE 'TIMESTAMP(%)'"
+          + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH TIME ZONE'"
+          + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH LOCAL TIME ZONE'" + ")";
+
+//  // Query to get current logged on user
+//  public static final String QUERY_GET_SESSION_USER = "SELECT USER FROM DUAL";
+//
+//  // public static final int[] SUPPORTED_ORACLE_DATA_TYPES = {
+//  // oracle.jdbc.OracleTypes.BIT // -7;
+//  // ,oracle.jdbc.OracleTypes.TINYINT // -6;
+//  // ,oracle.jdbc.OracleTypes.SMALLINT // 5;
+//  // ,oracle.jdbc.OracleTypes.INTEGER // 4;
+//  // ,oracle.jdbc.OracleTypes.BIGINT // -5;
+//  // ,oracle.jdbc.OracleTypes.FLOAT // 6;
+//  // ,oracle.jdbc.OracleTypes.REAL // 7;
+//  // ,oracle.jdbc.OracleTypes.DOUBLE // 8;
+//  // ,oracle.jdbc.OracleTypes.NUMERIC // 2;
+//  // ,oracle.jdbc.OracleTypes.DECIMAL // 3;
+//  // ,oracle.jdbc.OracleTypes.CHAR // 1;
+//  // ,oracle.jdbc.OracleTypes.VARCHAR // 12;
+//  // ,oracle.jdbc.OracleTypes.LONGVARCHAR // -1;
+//  // ,oracle.jdbc.OracleTypes.DATE // 91;
+//  // ,oracle.jdbc.OracleTypes.TIME // 92;
+//  // ,oracle.jdbc.OracleTypes.TIMESTAMP // 93;
+//  // // ,oracle.jdbc.OracleTypes.TIMESTAMPNS // -100; //<- Deprecated
+//  // ,oracle.jdbc.OracleTypes.TIMESTAMPTZ // -101;
+//  // ,oracle.jdbc.OracleTypes.TIMESTAMPLTZ // -102;
+//  // ,oracle.jdbc.OracleTypes.INTERVALYM // -103;
+//  // ,oracle.jdbc.OracleTypes.INTERVALDS // -104;
+//  // ,oracle.jdbc.OracleTypes.BINARY // -2;
+//  // /// ,oracle.jdbc.OracleTypes.VARBINARY // -3;
+//  // ,oracle.jdbc.OracleTypes.LONGVARBINARY // -4;
+//  // ,oracle.jdbc.OracleTypes.ROWID // -8;
+//  // ,oracle.jdbc.OracleTypes.CURSOR // -10;
+//  // ,oracle.jdbc.OracleTypes.BLOB // 2004;
+//  // ,oracle.jdbc.OracleTypes.CLOB // 2005;
+//  // // ,oracle.jdbc.OracleTypes.BFILE // -13;
+//  // // ,oracle.jdbc.OracleTypes.STRUCT // 2002;
+//  // // ,oracle.jdbc.OracleTypes.ARRAY // 2003;
+//  // ,oracle.jdbc.OracleTypes.REF // 2006;
+//  // ,oracle.jdbc.OracleTypes.NCHAR // -15;
+//  // ,oracle.jdbc.OracleTypes.NCLOB // 2011;
+//  // ,oracle.jdbc.OracleTypes.NVARCHAR // -9;
+//  // ,oracle.jdbc.OracleTypes.LONGNVARCHAR // -16;
+//  // // ,oracle.jdbc.OracleTypes.SQLXML // 2009;
+//  // // ,oracle.jdbc.OracleTypes.OPAQUE // 2007;
+//  // // ,oracle.jdbc.OracleTypes.JAVA_STRUCT // 2008;
+//  // // ,oracle.jdbc.OracleTypes.JAVA_OBJECT // 2000;
+//  // // ,oracle.jdbc.OracleTypes.PLSQL_INDEX_TABLE // -14;
+//  // ,oracle.jdbc.OracleTypes.BINARY_FLOAT // 100;
+//  // ,oracle.jdbc.OracleTypes.BINARY_DOUBLE // 101;
+//  // ,oracle.jdbc.OracleTypes.NULL // 0;
+//  // ,oracle.jdbc.OracleTypes.NUMBER // 2;
+//  // // ,oracle.jdbc.OracleTypes.RAW // -2;
+//  // // ,oracle.jdbc.OracleTypes.OTHER // 1111;
+//  // ,oracle.jdbc.OracleTypes.FIXED_CHAR // 999;
+//  // // ,oracle.jdbc.OracleTypes.DATALINK // 70;
+//  // ,oracle.jdbc.OracleTypes.BOOLEAN // 16;
+//  // };
+//
+//  /**
+//   * Constants for things belonging to sqoop...
+//   */
+//  public static final class Sqoop {
+//    private Sqoop() {
+//    }
+//
+//    /**
+//     * What type of Sqoop tool is being run.
+//     */
+//    public enum Tool {
+//      UNKNOWN, IMPORT, EXPORT
+//    }
+//
+//    public static final String IMPORT_TOOL_NAME = "import";
+//    public static final String MAX_MAPREDUCE_ATTEMPTS =
+//        "mapred.map.max.attempts";
+//  }
+//
+/**
+ * Constants for things belonging to Oracle...
+ */
+  public static final class Oracle {
+    private Oracle() {
+    }
+
+    public static final int ROWID_EXTENDED_ROWID_TYPE = 1;
+    public static final int ROWID_MAX_ROW_NUMBER_PER_BLOCK = 32767;
+
+    // This is how you comment-out a line of SQL text in Oracle.
+    public static final String ORACLE_SQL_STATEMENT_COMMENT_TOKEN = "--";
+
+    public static final String OBJECT_TYPE_TABLE = "TABLE";
+
+    public static final String URITYPE = "URITYPE";
+
+    public static final int MAX_IDENTIFIER_LENGTH = 30; // <- Max length of an
+                                                        // Oracle name
+                                                        // (table-name,
+                                                        // partition-name etc.)
+
+    public static final String HINT_SYNTAX = "/*+ %s */ "; // Syntax for a hint
+                                                           // in Oracle
+  }
+//
+//  /**
+//   * Logging constants.
+//   */
+//  public static class Logging {
+//    /**
+//     * Level of log to output.
+//     */
+//    public enum Level {
+//      TRACE, DEBUG, INFO, WARN, ERROR, FATAL
+//    }
+//  }
+//
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorUpgrader.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorUpgrader.java
new file mode 100644
index 0000000..30693af
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorUpgrader.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
+import org.apache.sqoop.model.MFromConfig;
+import org.apache.sqoop.model.MLinkConfig;
+import org.apache.sqoop.model.MToConfig;
+
+// NOTE: All config types have the similar upgrade path at this point
+public class OracleJdbcConnectorUpgrader extends ConnectorConfigurableUpgrader {
+
+  @Override
+  public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
+    ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+  }
+
+  @Override
+  public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
+    ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+  }
+
+  @Override
+  public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
+    ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcExtractor.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcExtractor.java
new file mode 100644
index 0000000..df15fc2
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcExtractor.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleDataChunk;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumn;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumns;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.ColumnType;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+public class OracleJdbcExtractor extends
+    Extractor<LinkConfiguration, FromJobConfiguration, OracleJdbcPartition> {
+
+  private static final Logger LOG = Logger.getLogger(OracleJdbcExtractor.class);
+
+  private Connection connection;
+  private OracleTable table;
+  private int mapperId; // <- The index of this Hadoop mapper
+  private long rowsRead = 0;
+
+  private OracleTableColumns tableColumns;
+
+  private OracleJdbcPartition dbInputSplit; // <- The split this record-reader
+                                            // is working on.
+  private int numberOfBlocksInThisSplit; // <- The number of Oracle blocks in
+                                         // this Oracle data-chunk.
+  private int numberOfBlocksProcessedInThisSplit; // <- How many Oracle blocks
+                                                  // we've processed with this
+                                                  // record-reader.
+  private String currentDataChunkId; // <- The id of the current data-chunk
+                                     // being processed
+  private ResultSet results; // <- The ResultSet containing the data from the
+                             // query returned by getSelectQuery()
+  private int columnIndexDataChunkIdZeroBased = -1; // <- The zero-based column
+                                                    // index of the
+                                                    // data_chunk_id column.
+  private boolean progressCalculationErrorLogged; // <- Whether we've logged a
+                                                  // problem with the progress
+                                                  // calculation during
+                                                  // nextKeyValue().
+  private Object oraOopOraStats; // <- A reference to the Oracle statistics
+                                 // object that is being tracked for this Oracle
+                                 // session.
+  private boolean profilingEnabled; // <- Whether to collect profiling metrics
+  private long timeSpentInNextKeyValueInNanoSeconds; // <- Total time spent in
+                                                     // super.nextKeyValue()
+
+  private static final DateTimeFormatter TIMESTAMP_TIMEZONE =
+      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z");
+
+  @Override
+  public void extract(ExtractorContext context,
+      LinkConfiguration linkConfiguration,
+      FromJobConfiguration jobConfiguration, OracleJdbcPartition partition) {
+    //TODO: Mapper ID
+    mapperId = 1;
+    dbInputSplit = partition;
+
+    // Retrieve the JDBC URL that should be used by this mapper.
+    String mapperJdbcUrlPropertyName =
+        OracleUtilities.getMapperJdbcUrlPropertyName(mapperId);
+    String mapperJdbcUrl = context.getString(mapperJdbcUrlPropertyName, null);
+
+    LOG.debug(String.format("Mapper %d has a JDBC URL of: %s", mapperId,
+        mapperJdbcUrl == null ? "<null>" : mapperJdbcUrl));
+
+    try {
+      connection = OracleConnectionFactory.createOracleJdbcConnection(
+          OracleJdbcConnectorConstants.ORACLE_JDBC_DRIVER_CLASS,
+          mapperJdbcUrl,
+          linkConfiguration.connectionConfig.username,
+          linkConfiguration.connectionConfig.password);
+    } catch (SQLException ex) {
+      throw new RuntimeException(String.format(
+          "Unable to connect to the Oracle database at %s\nError:%s",
+          linkConfiguration.connectionConfig.connectionString, ex
+              .getMessage()), ex);
+    }
+
+    table = OracleUtilities.decodeOracleTableName(
+        linkConfiguration.connectionConfig.username,
+        jobConfiguration.fromJobConfig.tableName);
+
+    try {
+    String thisOracleInstanceName =
+        OracleQueries.getCurrentOracleInstanceName(connection);
+
+    LOG.info(String.format(
+        "This record reader is connected to Oracle via the JDBC URL: \n"
+            + "\t\"%s\"\n" + "\tto the Oracle instance: \"%s\"", mapperJdbcUrl,
+            thisOracleInstanceName));
+
+    OracleConnectionFactory.initializeOracleConnection(
+        connection, linkConfiguration.connectionConfig);
+    } catch(SQLException ex) {
+      throw new RuntimeException(String.format(
+          "Unable to initialize connection to the Oracle database at %s\n"
+              + "Error:%s",
+              linkConfiguration.connectionConfig.connectionString, ex
+              .getMessage()), ex);
+    }
+
+    try {
+      tableColumns =
+          OracleQueries.getFromTableColumns(connection, table, OracleUtilities.
+              omitLobAndLongColumnsDuringImport(jobConfiguration.fromJobConfig),
+              true // <- onlyOraOopSupportedTypes
+              );
+    } catch (SQLException ex) {
+      LOG.error(String.format(
+          "Unable to obtain the data-types of the columns in table %s.\n"
+              + "Error:\n%s", table.toString(), ex.getMessage()));
+      throw new RuntimeException(ex);
+    }
+
+    this.numberOfBlocksInThisSplit =
+        this.dbInputSplit.getTotalNumberOfBlocksInThisSplit();
+    this.numberOfBlocksProcessedInThisSplit = 0;
+
+    extractData(context, jobConfiguration.fromJobConfig);
+
+    try {
+      connection.close();
+    } catch(SQLException ex) {
+      throw new RuntimeException(String.format(
+          "Unable to close connection to the Oracle database at %s\nError:%s",
+          linkConfiguration.connectionConfig.connectionString, ex
+              .getMessage()), ex);
+    }
+  }
+
+  private Object getObjectAtName(ResultSet resultSet,
+      OracleTableColumn column, Column sqoopColumn) throws SQLException {
+    Object result = null;
+    if(sqoopColumn.getType() == ColumnType.TEXT) {
+      result = resultSet.getString(column.getName());
+    } else if (column.getOracleType() == OracleQueries
+        .getOracleType("TIMESTAMP")) {
+      Timestamp timestamp = resultSet.getTimestamp(column.getName());
+      if(timestamp!=null) {
+        result  = LocalDateTime.fromDateFields(timestamp);
+      }
+    } else if (column.getOracleType() == OracleQueries
+            .getOracleType("TIMESTAMPTZ")
+        || column.getOracleType() == OracleQueries
+            .getOracleType("TIMESTAMPLTZ")) {
+      Timestamp timestamp = resultSet.getTimestamp(column.getName());
+      if(timestamp!=null) {
+        //TODO: BC dates
+        String dateTimeStr = resultSet.getString(column.getName());
+        result  = DateTime.parse(dateTimeStr, TIMESTAMP_TIMEZONE);
+      }
+    } else {
+      result = resultSet.getObject(column.getName());
+    }
+    return result;
+  }
+
+  private void extractData(ExtractorContext context, FromJobConfig jobConfig) {
+    String sql = getSelectQuery(jobConfig, context.getContext());
+    Column[] columns = context.getSchema().getColumnsArray();
+    int columnCount = columns.length;
+    try {
+      PreparedStatement statement = connection.prepareStatement(sql);
+      ResultSet resultSet = statement.executeQuery();
+
+      while(resultSet.next()) {
+        Object[] array = new Object[columnCount];
+        for(int i = 0; i < columnCount; i++) {
+          OracleTableColumn tableColumn =
+              tableColumns.findColumnByName(columns[i].getName());
+          array[i] = getObjectAtName(resultSet, tableColumn, columns[i]);
+        }
+        context.getDataWriter().writeArrayRecord(array);
+        rowsRead++;
+      }
+
+      resultSet.close();
+      statement.close();
+    } catch (SQLException ex) {
+      LOG.error(String.format("Error in %s while executing the SQL query:\n"
+          + "%s\n\n" + "%s", OracleUtilities.getCurrentMethodName(), sql, ex
+          .getMessage()));
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public long getRowsRead() {
+    return rowsRead;
+  }
+
+  private String getSelectQuery(FromJobConfig jobConfig,
+      ImmutableContext context) {
+
+    boolean consistentRead = BooleanUtils.isTrue(jobConfig.consistentRead);
+    long consistentReadScn = context.getLong(
+        OracleJdbcConnectorConstants.ORACLE_IMPORT_CONSISTENT_READ_SCN, 0L);
+    if (consistentRead && consistentReadScn == 0L) {
+      throw new RuntimeException("Could not get SCN for consistent read.");
+    }
+
+    StringBuilder query = new StringBuilder();
+
+    if (this.dbInputSplit.getDataChunks() == null) {
+      String errMsg =
+          String.format("The %s does not contain any data-chunks, within %s.",
+              this.dbInputSplit.getClass().getName(), OracleUtilities
+                  .getCurrentMethodName());
+      throw new RuntimeException(errMsg);
+    }
+
+    OracleUtilities.OracleTableImportWhereClauseLocation whereClauseLocation =
+        OracleUtilities.getTableImportWhereClauseLocation(jobConfig,
+            OracleUtilities.OracleTableImportWhereClauseLocation.SUBSPLIT);
+
+    int numberOfDataChunks = this.dbInputSplit.getNumberOfDataChunks();
+    for (int idx = 0; idx < numberOfDataChunks; idx++) {
+
+      OracleDataChunk dataChunk =
+          this.dbInputSplit.getDataChunks().get(idx);
+
+      if (idx > 0) {
+        query.append("UNION ALL \n");
+      }
+
+      query.append(getColumnNamesClause(tableColumns,
+               dataChunk.getId(), jobConfig)) // <- SELECT clause
+           .append("\n");
+
+      query.append(" FROM ").append(table.toString()).append(" ");
+
+      if (consistentRead) {
+        query.append("AS OF SCN ").append(consistentReadScn).append(" ");
+      }
+
+      query.append(getPartitionClauseForDataChunk(this.dbInputSplit, idx))
+          .append(" t").append("\n");
+
+      query.append(" WHERE (").append(
+          getWhereClauseForDataChunk(this.dbInputSplit, idx)).append(")\n");
+
+      // If the user wants the WHERE clause applied to each data-chunk...
+      if (whereClauseLocation == OracleUtilities.
+              OracleTableImportWhereClauseLocation.SUBSPLIT) {
+        String conditions = jobConfig.conditions;
+        if (conditions != null && conditions.length() > 0) {
+          query.append(" AND (").append(conditions).append(")\n");
+        }
+      }
+
+    }
+
+    // If the user wants the WHERE clause applied to the whole split...
+    if (whereClauseLocation == OracleUtilities.
+            OracleTableImportWhereClauseLocation.SPLIT) {
+      String conditions = jobConfig.conditions;
+      if (conditions != null && conditions.length() > 0) {
+
+        // Insert a "select everything" line at the start of the SQL query...
+        query.insert(0, getColumnNamesClause(tableColumns, null, jobConfig) +
+            " FROM (\n");
+
+        // ...and then apply the WHERE clause to all the UNIONed sub-queries...
+        query.append(")\n").append("WHERE\n").append(conditions).append("\n");
+      }
+    }
+
+    LOG.info("SELECT QUERY = \n" + query.toString());
+
+    return query.toString();
+  }
+
+  private String getColumnNamesClause(OracleTableColumns tableColumns,
+      String dataChunkId, FromJobConfig jobConfig) {
+
+    StringBuilder result = new StringBuilder();
+
+    result.append("SELECT ");
+    result.append(OracleUtilities.getImportHint(jobConfig));
+
+    int firstFieldIndex = 0;
+    int lastFieldIndex = tableColumns.size();
+    for (int i = firstFieldIndex; i < lastFieldIndex; i++) {
+      if (i > firstFieldIndex) {
+        result.append(",");
+      }
+
+      OracleTableColumn oracleTableColumn = tableColumns.get(i);
+      String fieldName = oracleTableColumn.getName();
+      if (oracleTableColumn != null) {
+        if (oracleTableColumn.getDataType().equals(
+            OracleJdbcConnectorConstants.Oracle.URITYPE)) {
+          fieldName = String.format("uritype.geturl(%s) %s", fieldName,
+              fieldName);
+        }
+      }
+
+      result.append(fieldName);
+    }
+    // We need to insert the value of that data_chunk_id now...
+    if (dataChunkId != null && !dataChunkId.isEmpty()) {
+      String fieldName =
+          String.format(",'%s' %s", dataChunkId,
+              OracleJdbcConnectorConstants.COLUMN_NAME_DATA_CHUNK_ID);
+      result.append(fieldName);
+    }
+    return result.toString();
+  }
+
+  private String getPartitionClauseForDataChunk(OracleJdbcPartition split,
+      int dataChunkIndex) {
+    OracleDataChunk dataChunk = split.getDataChunks().get(dataChunkIndex);
+    return dataChunk.getPartitionClause();
+  }
+
+  private String getWhereClauseForDataChunk(OracleJdbcPartition split,
+      int dataChunkIndex) {
+
+    OracleDataChunk dataChunk = split.getDataChunks().get(dataChunkIndex);
+    return dataChunk.getWhereClause();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromDestroyer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromDestroyer.java
new file mode 100644
index 0000000..bd6fd0a
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromDestroyer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class OracleJdbcFromDestroyer extends
+    Destroyer<LinkConfiguration, FromJobConfiguration> {
+
+  @Override
+  public void destroy(DestroyerContext context,
+      LinkConfiguration linkConfiguration,
+      FromJobConfiguration jobConfiguration) {
+    // TODO Auto-generated method stub
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromInitializer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromInitializer.java
new file mode 100644
index 0000000..62a0e84
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromInitializer.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.sql.SQLException;
+
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.job.etl.InitializerContext;
+
+public class OracleJdbcFromInitializer extends
+    OracleJdbcCommonInitializer<FromJobConfiguration> {
+
+  private static final Logger LOG =
+      Logger.getLogger(OracleJdbcFromInitializer.class);
+
+    @Override
+    public void connect(InitializerContext context,
+        LinkConfiguration linkConfiguration,
+        FromJobConfiguration jobConfiguration) throws SQLException {
+      super.connect(context, linkConfiguration, jobConfiguration);
+      table = OracleUtilities.decodeOracleTableName(
+          linkConfiguration.connectionConfig.username,
+          jobConfiguration.fromJobConfig.tableName);
+    }
+
+    @Override
+    public void initialize(InitializerContext context,
+        LinkConfiguration linkConfiguration,
+        FromJobConfiguration jobConfiguration) {
+      super.initialize(context, linkConfiguration, jobConfiguration);
+      LOG.debug("Running Oracle JDBC connector FROM initializer");
+
+      try {
+        if(OracleQueries.isTableAnIndexOrganizedTable(connection, table)) {
+          if(OracleUtilities.getOraOopOracleDataChunkMethod(
+              jobConfiguration.fromJobConfig) !=
+                  OracleUtilities.OracleDataChunkMethod.PARTITION) {
+          throw new RuntimeException(String.format("Cannot process this Sqoop"
+              + " connection, as the Oracle table %s is an"
+              + " index-organized table. If the table is"
+              + " partitioned, set the data chunk method to "
+              + OracleUtilities.OracleDataChunkMethod.PARTITION
+              + ".",
+              table.toString()));
+          }
+        }
+      } catch (SQLException e) {
+        throw new RuntimeException(String.format(
+            "Unable to determine whether the Oracle table %s is an"
+            + "index-organized table.", table.toString()), e);
+      }
+
+      if(BooleanUtils.isTrue(jobConfiguration.fromJobConfig.consistentRead)) {
+        Long scn = jobConfiguration.fromJobConfig.consistentReadScn;
+        if(scn==null || scn.equals(Long.valueOf(0L))) {
+          try {
+            scn = OracleQueries.getCurrentScn(connection);
+          } catch(SQLException e) {
+            throw new RuntimeException("Unable to determine SCN of database.",
+                e);
+          }
+        }
+        context.getContext().setLong(
+            OracleJdbcConnectorConstants.ORACLE_IMPORT_CONSISTENT_READ_SCN,
+            scn);
+        LOG.info("Performing a consistent read using SCN: " + scn);
+      }
+    }
+
+}