You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/11/05 18:41:00 UTC
[6/6] sqoop git commit: SQOOP-2595: Add Oracle connector to Sqoop 2
SQOOP-2595: Add Oracle connector to Sqoop 2
(David Robson via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/fa3c77b6
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/fa3c77b6
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/fa3c77b6
Branch: refs/heads/sqoop2
Commit: fa3c77b6a8352f68ec429164f48aee00ae2480d8
Parents: 2a9ae31
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Thu Nov 5 09:40:31 2015 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Thu Nov 5 09:40:31 2015 -0800
----------------------------------------------------------------------
connector/connector-oracle-jdbc/pom.xml | 134 ++
.../oracle/OracleJdbcCommonInitializer.java | 477 +++++
.../jdbc/oracle/OracleJdbcConnector.java | 92 +
.../oracle/OracleJdbcConnectorConstants.java | 493 +++++
.../oracle/OracleJdbcConnectorUpgrader.java | 43 +
.../jdbc/oracle/OracleJdbcExtractor.java | 361 ++++
.../jdbc/oracle/OracleJdbcFromDestroyer.java | 36 +
.../jdbc/oracle/OracleJdbcFromInitializer.java | 90 +
.../connector/jdbc/oracle/OracleJdbcLoader.java | 615 +++++++
.../jdbc/oracle/OracleJdbcPartition.java | 183 ++
.../jdbc/oracle/OracleJdbcPartitioner.java | 252 +++
.../jdbc/oracle/OracleJdbcToDestroyer.java | 273 +++
.../jdbc/oracle/OracleJdbcToInitializer.java | 498 +++++
.../oracle/configuration/ConnectionConfig.java | 78 +
.../oracle/configuration/FromJobConfig.java | 61 +
.../configuration/FromJobConfiguration.java | 33 +
.../oracle/configuration/LinkConfiguration.java | 34 +
.../jdbc/oracle/configuration/ToJobConfig.java | 64 +
.../configuration/ToJobConfiguration.java | 33 +
.../jdbc/oracle/util/OracleActiveInstance.java | 44 +
.../oracle/util/OracleConnectionFactory.java | 246 +++
.../jdbc/oracle/util/OracleDataChunk.java | 48 +
.../jdbc/oracle/util/OracleDataChunkExtent.java | 109 ++
.../oracle/util/OracleDataChunkPartition.java | 85 +
.../jdbc/oracle/util/OracleGenerics.java | 64 +
.../jdbc/oracle/util/OracleJdbcUrl.java | 244 +++
.../jdbc/oracle/util/OracleQueries.java | 1721 ++++++++++++++++++
.../jdbc/oracle/util/OracleSqlTypesUtils.java | 176 ++
.../connector/jdbc/oracle/util/OracleTable.java | 68 +
.../jdbc/oracle/util/OracleTableColumn.java | 59 +
.../jdbc/oracle/util/OracleTableColumns.java | 43 +
.../jdbc/oracle/util/OracleTablePartition.java | 50 +
.../jdbc/oracle/util/OracleTablePartitions.java | 62 +
.../jdbc/oracle/util/OracleUtilities.java | 1446 +++++++++++++++
.../jdbc/oracle/util/OracleVersion.java | 84 +
.../oracle-jdbc-connector-config.properties | 136 ++
.../main/resources/sqoopconnector.properties | 18 +
.../jdbc/oracle/TestOracleJdbcPartitioner.java | 102 ++
.../jdbc/oracle/TestOracleJdbcUrl.java | 249 +++
.../connector/jdbc/oracle/TestOracleTable.java | 42 +
.../jdbc/oracle/TestOracleUtilities.java | 613 +++++++
.../OracleConnectionFactoryTest.java | 497 +++++
.../oracle/integration/OracleQueriesTest.java | 49 +
.../jdbc/oracle/integration/OracleTestCase.java | 41 +
connector/pom.xml | 1 +
pom.xml | 11 +
server/pom.xml | 5 +
47 files changed, 10163 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/pom.xml b/connector/connector-oracle-jdbc/pom.xml
new file mode 100644
index 0000000..325790d
--- /dev/null
+++ b/connector/connector-oracle-jdbc/pom.xml
@@ -0,0 +1,134 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>connector</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.sqoop.connector</groupId>
+ <artifactId>sqoop-connector-oracle-jdbc</artifactId>
+ <name>Sqoop Oracle JDBC Connector</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>connector-sdk</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>sqoop-common-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <finalName>sqoop</finalName>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludedGroups>oracle</excludedGroups>
+
+ <excludes>
+ <exclude>**/integration/**</exclude>
+ </excludes>
+ </configuration>
+ <executions>
+ <execution>
+ <id>integration-test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <phase>integration-test</phase>
+ <configuration>
+ <excludes>
+ <exclude>none</exclude>
+ </excludes>
+ <includes>
+ <include>**/integration/**</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>jdbc-oracle</id>
+
+ <activation>
+ <property>
+ <name>jdbc.oracle</name>
+ </property>
+ </activation>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.oracle</groupId>
+ <artifactId>ojdbc14</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+
+ <configuration>
+ <excludedGroups>none</excludedGroups>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+</project>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcCommonInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcCommonInitializer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcCommonInitializer.java
new file mode 100644
index 0000000..1fd95c0
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcCommonInitializer.java
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ConnectionConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleActiveInstance;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleJdbcUrl;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.JdbcOracleThinConnectionParsingError;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleVersion;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+
+public class OracleJdbcCommonInitializer<JobConfiguration> extends Initializer<LinkConfiguration, JobConfiguration> {
+
+ private static final Logger LOG =
+ Logger.getLogger(OracleJdbcCommonInitializer.class);
+
+ protected Connection connection;
+ protected OracleTable table;
+ protected int numMappers = 8;
+
+ public void connect(InitializerContext context,
+ LinkConfiguration linkConfiguration,
+ JobConfiguration jobConfiguration) throws SQLException {
+ connection = OracleConnectionFactory.makeConnection(
+ linkConfiguration.connectionConfig);
+ }
+
+ @Override
+ public void initialize(InitializerContext context,
+ LinkConfiguration linkConfiguration,
+ JobConfiguration jobConfiguration) {
+ showUserTheOraOopWelcomeMessage();
+
+ try {
+ connect(context, linkConfiguration, jobConfiguration);
+ } catch (SQLException ex) {
+ throw new RuntimeException(String.format(
+ "Unable to connect to the Oracle database at %s\nError:%s",
+ linkConfiguration.connectionConfig.connectionString, ex
+ .getMessage()), ex);
+ }
+
+ // Generate the "action" name that we'll assign to our Oracle sessions
+ // so that the user knows which Oracle sessions belong to OraOop...
+ //TODO: Get the job name
+ context.getContext().setString(
+ OracleJdbcConnectorConstants.ORACLE_SESSION_ACTION_NAME,
+ getOracleSessionActionName(
+ linkConfiguration.connectionConfig.username));
+
+ //TODO: Don't think this can be done anymore
+ //OraOopUtilities.appendJavaSecurityEgd(sqoopOptions.getConf());
+
+ // Get the Oracle database version...
+ try {
+ OracleVersion oracleVersion =
+ OracleQueries.getOracleVersion(connection);
+ LOG.info(String.format("Oracle Database version: %s",
+ oracleVersion.getBanner()));
+ } catch (SQLException ex) {
+ LOG.error("Unable to obtain the Oracle database version.", ex);
+ }
+
+ // Generate the JDBC URLs to be used by each mapper...
+ setMapperConnectionDetails(linkConfiguration.connectionConfig,
+ context.getContext());
+
+ // Show the user the Oracle command that can be used to kill this
+ // OraOop
+ // job via Oracle...
+ showUserTheOracleCommandToKillOraOop(context.getContext());
+ }
+
+ @Override
+ public Schema getSchema(InitializerContext context,
+ LinkConfiguration linkConfiguration,
+ JobConfiguration jobConfiguration) {
+ try {
+ connect(context, linkConfiguration, jobConfiguration);
+ } catch (SQLException ex) {
+ throw new RuntimeException(String.format(
+ "Unable to connect to the Oracle database at %s\n"
+ + "Error:%s", linkConfiguration.connectionConfig.connectionString,
+ ex.getMessage()), ex);
+ }
+
+ Schema schema = new Schema(table.toString());
+
+ try {
+ List<String> colNames = OracleQueries.getToTableColumnNames(
+ connection, table, true, true);
+
+ List<Column> columnTypes =
+ OracleQueries.getColDataTypes(connection, table, colNames);
+
+ for(Column column : columnTypes) {
+ schema.addColumn(column);
+ }
+
+ return schema;
+ } catch(Exception e) {
+ throw new RuntimeException(
+ "Could not determine columns in Oracle Table.", e);
+ }
+ }
+
+ private void showUserTheOraOopWelcomeMessage() {
+
+ String msg1 =
+ String.format("Using %s",
+ OracleJdbcConnectorConstants.ORACLE_SESSION_MODULE_NAME);
+
+ int longestMessage = msg1.length();
+
+ msg1 = StringUtils.rightPad(msg1, longestMessage);
+
+ char[] asterisks = new char[longestMessage + 8];
+ Arrays.fill(asterisks, '*');
+
+ String msg =
+ String.format("\n" + "%1$s\n" + "*** %2$s ***\n" + "%1$s", new String(
+ asterisks), msg1);
+ LOG.info(msg);
+ }
+
+ private String getOracleSessionActionName(String jobName) {
+
+ String timeStr =
+ (new SimpleDateFormat("yyyyMMddHHmmsszzz")).format(new Date());
+
+ String result = String.format("%s %s", jobName, timeStr);
+
+ // NOTE: The "action" column of v$session is only a 32 character column.
+ // Therefore we need to ensure that the string returned by this
+ // method does not exceed 32 characters...
+ if (result.length() > 32) {
+ result = result.substring(0, 32).trim();
+ }
+
+ return result;
+ }
+
+ private void setMapperConnectionDetails(ConnectionConfig connectionConfig,
+ MutableContext context) {
+
+ // Query v$active_instances to get a list of all instances in the Oracle RAC
+ // (assuming this *could* be a RAC)...
+ List<OracleActiveInstance> activeInstances = null;
+ try {
+ activeInstances =
+ OracleQueries.getOracleActiveInstances(connection);
+ } catch (SQLException ex) {
+ throw new RuntimeException(
+ "An error was encountered when attempting to determine the "
+ + "configuration of the Oracle RAC.",
+ ex);
+ }
+
+ if (activeInstances == null) {
+ LOG.info("This Oracle database is not a RAC.");
+ } else {
+ LOG.info("This Oracle database is a RAC.");
+ }
+
+ // Is dynamic JDBC URL generation disabled?...
+ if (OracleUtilities.oracleJdbcUrlGenerationDisabled(connectionConfig)) {
+ LOG.info(String
+ .format(
+ "%s will not use dynamically generated JDBC URLs - this feature "
+ + "has been disabled.",
+ OracleJdbcConnectorConstants.CONNECTOR_NAME));
+ return;
+ }
+
+ boolean generateRacBasedJdbcUrls = false;
+
+ // Decide whether this is a multi-instance RAC, and whether we need to do
+ // anything more...
+ if (activeInstances != null) {
+ generateRacBasedJdbcUrls = true;
+
+ if (activeInstances.size() < OracleJdbcConnectorConstants.
+ MIN_NUM_RAC_ACTIVE_INSTANCES_FOR_DYNAMIC_JDBC_URLS) {
+ LOG.info(String.format(
+ "There are only %d active instances in the Oracle RAC. "
+ + "%s will not bother utilizing dynamically generated JDBC URLs.",
+ activeInstances.size(),
+ OracleJdbcConnectorConstants.CONNECTOR_NAME));
+ generateRacBasedJdbcUrls = false;
+ }
+ }
+
+ // E.g. jdbc:oracle:thin:@localhost.localdomain:1521:orcl
+ String jdbcConnectStr = connectionConfig.connectionString;
+
+ // Parse the JDBC URL to obtain the port number for the TNS listener...
+ String jdbcHost = "";
+ int jdbcPort = 0;
+ String jdbcSid = "";
+ String jdbcService = "";
+ String jdbcTnsName = "";
+ try {
+
+ OracleJdbcUrl oraOopJdbcUrl = new OracleJdbcUrl(jdbcConnectStr);
+ OracleUtilities.JdbcOracleThinConnection jdbcConnection =
+ oraOopJdbcUrl.parseJdbcOracleThinConnectionString();
+ jdbcHost = jdbcConnection.getHost();
+ jdbcPort = jdbcConnection.getPort();
+ jdbcSid = jdbcConnection.getSid();
+ jdbcService = jdbcConnection.getService();
+ jdbcTnsName = jdbcConnection.getTnsName();
+ } catch (JdbcOracleThinConnectionParsingError ex) {
+ LOG.info(String.format(
+ "Unable to parse the JDBC connection URL \"%s\" as a connection "
+ + "that uses the Oracle 'thin' JDBC driver.\n"
+ + "This problem prevents %s from being able to dynamically generate "
+ + "JDBC URLs that specify 'dedicated server connections' or spread "
+ + "mapper sessions across multiple Oracle instances.\n"
+ + "If the JDBC driver-type is 'OCI' (instead of 'thin'), then "
+ + "load-balancing should be appropriately managed automatically.",
+ jdbcConnectStr, OracleJdbcConnectorConstants.CONNECTOR_NAME, ex));
+ return;
+ }
+
+ if (generateRacBasedJdbcUrls) {
+
+ // Retrieve the Oracle service name to use when connecting to the RAC...
+ String oracleServiceName = connectionConfig.racServiceName;
+
+ // Generate JDBC URLs for each of the mappers...
+ if (!oracleServiceName.isEmpty()) {
+ if (!generateRacJdbcConnectionUrlsByServiceName(jdbcHost, jdbcPort,
+ oracleServiceName, connectionConfig, context)) {
+ throw new RuntimeException(String.format(
+ "Unable to connect to the Oracle database at %s "
+ + "via the service name \"%s\".", jdbcConnectStr,
+ oracleServiceName));
+ }
+ } else {
+ generateJdbcConnectionUrlsByActiveInstance(activeInstances, jdbcPort,
+ connectionConfig, context);
+ }
+ } else {
+ generateJdbcConnectionUrlsByTnsnameSidOrService(jdbcHost, jdbcPort,
+ jdbcSid, jdbcService, jdbcTnsName, connectionConfig, context);
+ }
+
+ }
+
+ private boolean generateRacJdbcConnectionUrlsByServiceName(String hostName,
+ int port, String serviceName, ConnectionConfig connectionConfig,
+ MutableContext context) {
+
+ boolean result = false;
+ String jdbcUrl =
+ OracleUtilities.generateOracleServiceNameJdbcUrl(hostName, port,
+ serviceName);
+
+ if (testDynamicallyGeneratedOracleRacInstanceConnection(jdbcUrl,
+ connectionConfig.username, connectionConfig.password,
+ connectionConfig.jdbcProperties
+ , false // <- ShowInstanceSysTimestamp
+ , "" // <- instanceDescription
+ )) {
+
+ LOG.info(String.format(
+ "%s will load-balance sessions across the Oracle RAC instances "
+ + "by connecting each mapper to the Oracle Service \"%s\".",
+ OracleJdbcConnectorConstants.CONNECTOR_NAME, serviceName));
+
+ // Now store these connection strings in such a way that each mapper knows
+ // which one to use...
+ for (int idxMapper = 0; idxMapper < numMappers; idxMapper++) {
+ storeJdbcUrlForMapper(idxMapper, jdbcUrl, context);
+ }
+ result = true;
+ }
+ return result;
+ }
+
+ private void generateJdbcConnectionUrlsByTnsnameSidOrService(String hostName,
+ int port, String sid, String serviceName, String tnsName,
+ ConnectionConfig connectionConfig, MutableContext context) {
+
+ String jdbcUrl = null;
+ if (tnsName != null && !tnsName.isEmpty()) {
+ jdbcUrl = OracleUtilities.generateOracleTnsNameJdbcUrl(tnsName);
+ } else if (sid != null && !sid.isEmpty()) {
+ jdbcUrl = OracleUtilities.generateOracleSidJdbcUrl(hostName, port, sid);
+ } else {
+ jdbcUrl =
+ OracleUtilities.generateOracleServiceNameJdbcUrl(hostName, port,
+ serviceName);
+ }
+
+ // Now store these connection strings in such a way that each mapper knows
+ // which one to use...
+ for (int idxMapper = 0; idxMapper < numMappers; idxMapper++) {
+ storeJdbcUrlForMapper(idxMapper, jdbcUrl, context);
+ }
+ }
+
+ private void
+ generateJdbcConnectionUrlsByActiveInstance(
+ List<OracleActiveInstance> activeInstances, int jdbcPort,
+ ConnectionConfig connectionConfig, MutableContext context) {
+
+ // Generate JDBC URLs for each of the instances in the RAC...
+ ArrayList<OracleUtilities.JdbcOracleThinConnection>
+ jdbcOracleActiveThinConnections =
+ new ArrayList<OracleUtilities.JdbcOracleThinConnection>(
+ activeInstances.size());
+
+ for (OracleActiveInstance activeInstance : activeInstances) {
+
+ OracleUtilities.JdbcOracleThinConnection
+ jdbcActiveInstanceThinConnection =
+ new OracleUtilities.JdbcOracleThinConnection(
+ activeInstance.getHostName(),
+ jdbcPort, activeInstance.getInstanceName(), "", "");
+
+ if (testDynamicallyGeneratedOracleRacInstanceConnection(
+ jdbcActiveInstanceThinConnection.toString(),
+ connectionConfig.username,
+ connectionConfig.password, connectionConfig.jdbcProperties,
+ true, activeInstance.getInstanceName())) {
+ jdbcOracleActiveThinConnections.add(jdbcActiveInstanceThinConnection);
+ }
+ }
+
+ // If there are multiple JDBC URLs that work okay for the RAC, then we'll
+ // make use of them...
+ if (jdbcOracleActiveThinConnections.size() < OracleJdbcConnectorConstants.
+ MIN_NUM_RAC_ACTIVE_INSTANCES_FOR_DYNAMIC_JDBC_URLS) {
+ LOG.info(String
+ .format(
+ "%s will not attempt to load-balance sessions across instances "
+ + "of an Oracle RAC - as multiple JDBC URLs to the "
+ + "Oracle RAC could not be dynamically generated.",
+ OracleJdbcConnectorConstants.CONNECTOR_NAME));
+ return;
+ } else {
+ StringBuilder msg = new StringBuilder();
+ msg.append(String
+ .format(
+ "%s will load-balance sessions across the following instances of"
+ + "the Oracle RAC:\n",
+ OracleJdbcConnectorConstants.CONNECTOR_NAME));
+
+ for (OracleUtilities.JdbcOracleThinConnection thinConnection
+ : jdbcOracleActiveThinConnections) {
+ msg.append(String.format("\tInstance: %s \t URL: %s\n",
+ thinConnection.getSid(), thinConnection.toString()));
+ }
+ LOG.info(msg.toString());
+ }
+
+ // Now store these connection strings in such a way that each mapper knows
+ // which one to use...
+ int racInstanceIdx = 0;
+ OracleUtilities.JdbcOracleThinConnection thinUrl;
+ for (int idxMapper = 0; idxMapper < numMappers; idxMapper++) {
+ if (racInstanceIdx > jdbcOracleActiveThinConnections.size() - 1) {
+ racInstanceIdx = 0;
+ }
+ thinUrl = jdbcOracleActiveThinConnections.get(racInstanceIdx);
+ racInstanceIdx++;
+ storeJdbcUrlForMapper(idxMapper, thinUrl.toString(), context);
+ }
+ }
+
+ private void storeJdbcUrlForMapper(int mapperIdx, String jdbcUrl,
+ MutableContext context) {
+
+ // Now store these connection strings in such a way that each mapper knows
+ // which one to use...
+ String mapperJdbcUrlPropertyName =
+ OracleUtilities.getMapperJdbcUrlPropertyName(mapperIdx);
+ LOG.debug("Setting mapper url " + mapperJdbcUrlPropertyName + " = "
+ + jdbcUrl);
+ context.setString(mapperJdbcUrlPropertyName, jdbcUrl);
+ }
+
+ private boolean testDynamicallyGeneratedOracleRacInstanceConnection(
+ String url, String userName, String password,
+ Map<String, String> jdbcProperties,
+ boolean showInstanceSysTimestamp, String instanceDescription) {
+
+ boolean result = false;
+
+ // Test the connection...
+ try {
+ Properties additionalProps = new Properties();
+ if(jdbcProperties != null) {
+ additionalProps.putAll(jdbcProperties);
+ }
+ Connection testConnection =
+ OracleConnectionFactory.createOracleJdbcConnection(
+ OracleJdbcConnectorConstants.ORACLE_JDBC_DRIVER_CLASS,
+ url, userName, password, additionalProps);
+
+ // Show the system time on each instance...
+ if (showInstanceSysTimestamp) {
+ LOG.info(String.format("\tDatabase time on %s is %s",
+ instanceDescription, OracleQueries
+ .getSysTimeStamp(testConnection)));
+ }
+
+ testConnection.close();
+ result = true;
+ } catch (SQLException ex) {
+ LOG.warn(
+ String
+ .format(
+ "The dynamically generated JDBC URL \"%s\" was unable to "
+ + "connect to an instance in the Oracle RAC.",
+ url), ex);
+ }
+
+ return result;
+ }
+
+ private void showUserTheOracleCommandToKillOraOop(MutableContext context) {
+
+ String moduleName =
+ OracleJdbcConnectorConstants.ORACLE_SESSION_MODULE_NAME;
+ String actionName = context.getString(
+ OracleJdbcConnectorConstants.ORACLE_SESSION_ACTION_NAME);
+
+ String msg = String.format(
+ "\nNote: This %s job can be killed via Oracle by executing the "
+ + "following statement:\n\tbegin\n"
+ + "\t\tfor row in (select sid,serial# from v$session where module='%s' "
+ + "and action='%s') loop\n"
+ + "\t\t\texecute immediate 'alter system kill session ''' || row.sid || "
+ + "',' || row.serial# || '''';\n"
+ + "\t\tend loop;\n" + "\tend;",
+ OracleJdbcConnectorConstants.CONNECTOR_NAME, moduleName, actionName);
+ LOG.info(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnector.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnector.java
new file mode 100644
index 0000000..ae0b9dc
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnector.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.VersionInfo;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
+
+public class OracleJdbcConnector extends SqoopConnector {
+
+ private static final To TO = new To(
+ OracleJdbcToInitializer.class,
+ OracleJdbcLoader.class,
+ OracleJdbcToDestroyer.class);
+
+ private static final From FROM = new From(
+ OracleJdbcFromInitializer.class,
+ OracleJdbcPartitioner.class,
+ OracleJdbcPartition.class,
+ OracleJdbcExtractor.class,
+ OracleJdbcFromDestroyer.class);
+
+ @Override
+ public String getVersion() {
+ return VersionInfo.getBuildVersion();
+ }
+
+ @Override
+ public ResourceBundle getBundle(Locale locale) {
+ return ResourceBundle.getBundle(
+ OracleJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Class getLinkConfigurationClass() {
+ return LinkConfiguration.class;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Class getJobConfigurationClass(Direction jobType) {
+ switch (jobType) {
+ case FROM:
+ return FromJobConfiguration.class;
+ case TO:
+ return ToJobConfiguration.class;
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public From getFrom() {
+ return FROM;
+ }
+
+ @Override
+ public To getTo() {
+ return TO;
+ }
+
+ @Override
+ public ConnectorConfigurableUpgrader getConfigurableUpgrader(String oldConnectorVersion) {
+ return new OracleJdbcConnectorUpgrader();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorConstants.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorConstants.java
new file mode 100644
index 0000000..2215cf3
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorConstants.java
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+public final class OracleJdbcConnectorConstants {
+
+ // Resource bundle name
+ public static final String RESOURCE_BUNDLE_NAME =
+ "oracle-jdbc-connector-config";
+
+ public static final String CONNECTOR_NAME = "Sqoop Oracle Connector";
+
+ // The string we want to pass to dbms_application_info.set_module() via the
+ // "module_name" parameter...
+ public static final String ORACLE_SESSION_MODULE_NAME = CONNECTOR_NAME;
+
+ public static final String ORACLE_SESSION_ACTION_NAME =
+ "oracle.session.module.action";
+
+ //How many rows to pre-fetch when executing Oracle queries...
+ public static final int ORACLE_ROW_FETCH_SIZE_DEFAULT = 5000;
+
+ // The name of the Oracle JDBC class...
+ public static final String ORACLE_JDBC_DRIVER_CLASS =
+ "oracle.jdbc.OracleDriver";
+
+ public static final String ORACLE_SESSION_INITIALIZATION_STATEMENTS_DEFAULT =
+ "alter session disable parallel query;" +
+ "alter session set \"_serial_direct_read\"=true;" +
+ "alter session set tracefile_identifier=oraoop;" +
+ "--alter session set events '10046 trace name context forever, level 8';";
+
+
+ /////////////////////////////////////////////////////////////////////
+
+// // Whether to log Oracle session statistics using Guy Harrison's jar file...
+// public static final String ORAOOP_REPORT_SESSION_STATISTICS =
+// "oraoop.report.session.statistics";
+//
+// // Disables dynamic JDBC URL generation for each mapper...
+// public static final String ORAOOP_JDBC_URL_VERBATIM =
+// "oraoop.jdbc.url.verbatim";
+//
+// // The name of the Oracle RAC service each mapper should connect to, via their
+// // dynamically generated JDBC URL...
+// public static final String ORAOOP_ORACLE_RAC_SERVICE_NAME =
+// "oraoop.oracle.rac.service.name";
+//
+// // The log4j log-level for OraOop...
+// public static final String ORAOOP_LOGGING_LEVEL = "oraoop.logging.level";
+//
+// // The file names for the configuration properties of OraOop...
+// public static final String ORAOOP_SITE_TEMPLATE_FILENAME =
+// "oraoop-site-template.xml";
+// public static final String ORAOOP_SITE_FILENAME = "oraoop-site.xml";
+//
+// // A flag that indicates that the OraOop job has been cancelled.
+// // E.g. An Oracle DBA killed our Oracle session.
+// // public static final String ORAOOP_JOB_CANCELLED = "oraoop.job.cancelled";
+//
+ // The SYSDATE from the Oracle database when this OraOop job was started.
+ // This is used to generate unique names for partitions and temporary tables
+ // that we create during the job...
+ public static final String SQOOP_ORACLE_JOB_SYSDATE =
+ "sqoop.oracle.job.sysdate";
+//
+// // The properties are used internally by OraOop to indicate the schema and
+// // name of
+// // the table being imported/exported...
+// public static final String ORAOOP_TABLE_OWNER = "oraoop.table.owner";
+// public static final String ORAOOP_TABLE_NAME = "oraoop.table.name";
+//
+// // Constants used to indicate the desired location of the WHERE clause within
+// // the SQL generated by the record-reader.
+// // E.g. A WHERE clause like "rownum <= 10" would want to be located so that
+// // it had an impact on the total number of rows returned by the split;
+// // as opposed to impacting the number of rows returned for each of the
+// // unioned data-chunks within each split.
+// public static final String ORAOOP_TABLE_IMPORT_WHERE_CLAUSE_LOCATION =
+// "oraoop.table.import.where.clause.location";
+//
+//
+// // Reliably stores the number mappers requested for the sqoop map-reduce
+// // job...
+// public static final String ORAOOP_DESIRED_NUMBER_OF_MAPPERS =
+// "oraoop.desired.num.mappers";
+//
+// // The minimum number of mappers required for OraOop to accept the import
+// // job...
+// public static final String ORAOOP_MIN_IMPORT_MAPPERS =
+// "oraoop.min.import.mappers";
+// public static final int MIN_NUM_IMPORT_MAPPERS_ACCEPTED_BY_ORAOOP = 2;
+//
+// // The minimum number of mappers required for OraOop to accept the export
+// // job...
+// public static final String ORAOOP_MIN_EXPORT_MAPPERS =
+// "oraoop.min.export.mappers";
+// public static final int MIN_NUM_EXPORT_MAPPERS_ACCEPTED_BY_ORAOOP = 2;
+//
+// // The query used to fetch oracle data chunks...
+// public static final String ORAOOP_ORACLE_DATA_CHUNKS_QUERY =
+// "oraoop.oracle.data.chunks.query";
+//
+ // The minimum number of active instances in an Oracle RAC required for OraOop
+ // to use dynamically generated JDBC URLs...
+ public static final int MIN_NUM_RAC_ACTIVE_INSTANCES_FOR_DYNAMIC_JDBC_URLS =
+ 2;
+//
+//
+//
+// // OraOop does not require a "--split-by" column to be defined...
+// public static final String TABLE_SPLIT_COLUMN_NOT_REQUIRED = "not-required";
+//
+ // The name of the data_chunk_id column the OraOop appends to each (import)
+ // query...
+ public static final String COLUMN_NAME_DATA_CHUNK_ID = "data_chunk_id";
+//
+// // The hint that will be used on the SELECT statement for import jobs
+// public static final String IMPORT_QUERY_HINT = "oraoop.import.hint";
+//
+// // Pseudo-columns added to an partitioned export table (created by OraOop from
+// // a template table)
+// // to store the partition value and subpartition value. The partition value is
+// // the sysdate when
+// // the job was performed. The subpartition value is the mapper index...
+ public static final String COLUMN_NAME_EXPORT_PARTITION =
+ "SQOOP_EXPORT_SYSDATE";
+ public static final String COLUMN_NAME_EXPORT_SUBPARTITION =
+ "SQOOP_MAPPER_ID";
+ public static final String COLUMN_NAME_EXPORT_MAPPER_ROW =
+ "SQOOP_MAPPER_ROW";
+
+ public static final String ORAOOP_EXPORT_PARTITION_DATE_VALUE =
+ "oraoop.export.partition.date.value";
+ public static final String ORAOOP_EXPORT_PARTITION_DATE_FORMAT =
+ "yyyy-mm-dd hh24:mi:ss";
+//
+//
+//
+// // Boolean whether to do a consistent read based off an SCN
+// public static final String ORAOOP_IMPORT_CONSISTENT_READ =
+// "oraoop.import.consistent.read";
+//
+ // The SCN number to use for the consistent read
+ public static final String ORACLE_IMPORT_CONSISTENT_READ_SCN =
+ "oracle.import.consistent.read.scn";
+//
+// // The method that will be used to create data chunks - ROWID ranges or
+// // partitions
+// public static final String ORAOOP_ORACLE_DATA_CHUNK_METHOD =
+// "oraoop.chunk.method";
+
+// // List of partitions to be imported, comma seperated list
+// public static final String ORAOOP_IMPORT_PARTITION_LIST =
+// "oraoop.import.partitions";
+//
+// public static final OraOopOracleDataChunkMethod
+// ORAOOP_ORACLE_DATA_CHUNK_METHOD_DEFAULT =
+// OraOopOracleDataChunkMethod.ROWID;
+//
+// // How to allocate data-chunks into splits...
+// public static final String ORAOOP_ORACLE_BLOCK_TO_SPLIT_ALLOCATION_METHOD =
+// "oraoop.block.allocation";
+//
+
+//
+// // Whether to omit LOB and LONG columns during an import...
+// public static final String ORAOOP_IMPORT_OMIT_LOBS_AND_LONG =
+// "oraoop.import.omit.lobs.and.long";
+//
+// // Identifies an existing Oracle table used to create a new table as the
+// // destination of a Sqoop export.
+// // Hence, use of this property implies that the "-table" does not exist in
+// // Oracle and OraOop should create it.
+// public static final String ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE =
+// "oraoop.template.table";
+//
+// // If the table already exists that we want to create, should we drop it?...
+// public static final String ORAOOP_EXPORT_CREATE_TABLE_DROP =
+// "oraoop.drop.table";
+//
+// // If ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE has been specified, then this flag
+// // indicates whether the created Oracle
+// // tables should have NOLOGGING...
+// public static final String ORAOOP_EXPORT_CREATE_TABLE_NO_LOGGING =
+// "oraoop.no.logging";
+//
+// // If ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE has been specified, then this flag
+// // indicates whether the created Oracle
+// // tables should be partitioned by job and mapper...
+// public static final String ORAOOP_EXPORT_CREATE_TABLE_PARTITIONED =
+// "oraoop.partitioned";
+//
+ // Indicates (internally) the the export table we're dealling with has been
+ // paritioned by Sqoop...
+ public static final String EXPORT_TABLE_HAS_SQOOP_PARTITIONS =
+ "sqoop.export.table.has.sqoop.partitions";
+
+ // When using the Oracle hint... /* +APPEND_VALUES */ ...a commit must be
+ // performed after each batch insert.
+ // Therefore, the batches need to be quite large to avoid a performance
+ // penality (for the 'extra' commits).
+ // This is the minimum batch size to use under these conditions...
+// public static final String ORAOOP_MIN_APPEND_VALUES_BATCH_SIZE =
+// "oraoop.min.append.values.batch.size";
+ public static final int MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT = 5000;
+//
+// // The version of the Oracle database we're connected to...
+// public static final String ORAOOP_ORACLE_DATABASE_VERSION_MAJOR =
+// "oraoop.oracle.database.version.major";
+// public static final String ORAOOP_ORACLE_DATABASE_VERSION_MINOR =
+// "oraoop.oracle.database.version.minor";
+//
+// // When OraOop creates a table for a Sqoop export (from a template table) and
+// // the table contains partitions,
+// // this is the prefix of those partition names. (This also allows us to later
+// // identify partitions that OraOop
+// // created.)
+ public static final String EXPORT_TABLE_PARTITION_NAME_PREFIX = "SQOOP_";
+
+ // When OraOop creates temporary tables for each mapper during a Sqoop export
+ // this is the prefix of table names...
+ public static final String EXPORT_MAPPER_TABLE_NAME_PREFIX = "SQOOP_";
+
+ // The format string used to turn a DATE into a string for use within the
+ // names of Oracle objects
+ // that we create. For example, temporary tables, table partitions, table
+ // subpartitions...
+ public static final String ORACLE_OBJECT_NAME_DATE_TO_STRING_FORMAT_STRING =
+ "yyyymmdd_hh24miss";
+
+// // Indicates whether to perform a "merge" operation when performing a Sqoop
+// // export.
+// // If false, 'insert' statements will be used (i.e. no 'updates')...
+// public static final String ORAOOP_EXPORT_MERGE = "oraoop.export.merge";
+//
+// // This property allows the user to enable parallelization during exports...
+// public static final String ORAOOP_EXPORT_PARALLEL =
+// "oraoop.export.oracle.parallelization.enabled";
+//
+// // Flag used to indicate that the Oracle table contains at least one column of
+// // type BINARY_DOUBLE...
+// public static final String TABLE_CONTAINS_BINARY_DOUBLE_COLUMN =
+// "oraoop.table.contains.binary.double.column";
+// // Flag used to indicate that the Oracle table contains at least one column of
+// // type BINARY_FLOAT...
+// public static final String TABLE_CONTAINS_BINARY_FLOAT_COLUMN =
+// "oraoop.table.contains.binary.float.column";
+//
+// // The storage clause to append to the end of any CREATE TABLE statements we
+// // execute for temporary Oracle tables...
+// public static final String ORAOOP_TEMPORARY_TABLE_STORAGE_CLAUSE =
+// "oraoop.temporary.table.storage.clause";
+//
+// // The storage clause to append to the end of any CREATE TABLE statements we
+// // execute for permanent (export) Oracle tables...
+// public static final String ORAOOP_EXPORT_TABLE_STORAGE_CLAUSE =
+// "oraoop.table.storage.clause";
+//
+// // Additional columns to include with the --update-key column...
+// public static final String ORAOOP_UPDATE_KEY_EXTRA_COLUMNS =
+// "oraoop.update.key.extra.columns";
+//
+// // Should OraOop map Timestamps as java.sql.Timestamp as Sqoop does, or as
+// // String
+// public static final String ORAOOP_MAP_TIMESTAMP_AS_STRING =
+// "oraoop.timestamp.string";
+// public static final boolean ORAOOP_MAP_TIMESTAMP_AS_STRING_DEFAULT = true;
+//
+// // This flag allows the user to force use of the APPEND_VALUES Oracle hint
+// // either ON, OFF or AUTO...
+// public static final String ORAOOP_ORACLE_APPEND_VALUES_HINT_USAGE =
+// "oraoop.oracle.append.values.hint.usage";
+//
+ /**
+ * Whether to use the append values hint for exports.
+ */
+ public enum AppendValuesHintUsage {
+ AUTO, ON, OFF
+ }
+
+ // http://download.oracle.com/docs/cd/E11882_01/server.112/e17118/
+ // sql_elements001.htm#i45441
+ public static final String SUPPORTED_IMPORT_ORACLE_DATA_TYPES_CLAUSE =
+ "(DATA_TYPE IN ("
+ +
+ // "'BFILE',"+
+ "'BINARY_DOUBLE',"
+ + "'BINARY_FLOAT',"
+ + "'BLOB',"
+ + "'CHAR',"
+ + "'CLOB',"
+ + "'DATE',"
+ + "'FLOAT',"
+ + "'LONG',"
+ +
+ // "'LONG RAW',"+
+ // "'MLSLABEL',"+
+ "'NCHAR',"
+ + "'NCLOB',"
+ + "'NUMBER',"
+ + "'NVARCHAR2',"
+ + "'RAW',"
+ + "'ROWID',"
+ +
+ // "'UNDEFINED',"+
+ "'URITYPE',"
+ +
+ // "'UROWID',"+ //<- SqlType = 1111 = "OTHER" Not supported as
+ // "AAAAACAADAAAAAEAAF" is being returned as "AAAAAAgADAAAA"
+ "'VARCHAR2'"
+ + // <- Columns declared as VARCHAR are listed as VARCHAR2 in
+ // dba_tabl_columns
+ // "'XMLTYPE',"+
+ ")" + " OR DATA_TYPE LIKE 'INTERVAL YEAR(%) TO MONTH'"
+ + " OR DATA_TYPE LIKE 'INTERVAL DAY(%) TO SECOND(%)'"
+ + " OR DATA_TYPE LIKE 'TIMESTAMP(%)'"
+ + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH TIME ZONE'"
+ + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH LOCAL TIME ZONE'" + ")";
+
+ public static final String SUPPORTED_EXPORT_ORACLE_DATA_TYPES_CLAUSE =
+ "(DATA_TYPE IN ("
+ +
+ // "'BFILE',"+
+ "'BINARY_DOUBLE',"
+ + "'BINARY_FLOAT',"
+ +
+ // "'BLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data
+ "'CHAR',"
+ +
+ // "'CLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data
+ "'DATE',"
+ + "'FLOAT',"
+ +
+ // "'LONG',"+ //<- "create table as select..." and
+ // "insert into table as select..." do not work when a long column
+ // exists.
+ // "'LONG RAW',"+
+ // "'MLSLABEL',"+
+ "'NCHAR',"
+ +
+ // "'NCLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data
+ "'NUMBER',"
+ + "'NVARCHAR2',"
+ +
+ // "'RAW',"+
+ "'ROWID',"
+ +
+ // "'UNDEFINED',"+
+ "'URITYPE',"
+ +
+ // "'UROWID',"+ //<- SqlType = 1111 = "OTHER" Not supported as
+ // "AAAAACAADAAAAAEAAF" is being returned as "AAAAAAgADAAAA"
+ "'VARCHAR2'"
+ + // <- Columns declared as VARCHAR are listed as VARCHAR2 in
+ // dba_tabl_columns
+ // "'XMLTYPE',"+
+ ")" + " OR DATA_TYPE LIKE 'INTERVAL YEAR(%) TO MONTH'"
+ + " OR DATA_TYPE LIKE 'INTERVAL DAY(%) TO SECOND(%)'"
+ + " OR DATA_TYPE LIKE 'TIMESTAMP(%)'"
+ + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH TIME ZONE'"
+ + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH LOCAL TIME ZONE'" + ")";
+
+// // Query to get current logged on user
+// public static final String QUERY_GET_SESSION_USER = "SELECT USER FROM DUAL";
+//
+// // public static final int[] SUPPORTED_ORACLE_DATA_TYPES = {
+// // oracle.jdbc.OracleTypes.BIT // -7;
+// // ,oracle.jdbc.OracleTypes.TINYINT // -6;
+// // ,oracle.jdbc.OracleTypes.SMALLINT // 5;
+// // ,oracle.jdbc.OracleTypes.INTEGER // 4;
+// // ,oracle.jdbc.OracleTypes.BIGINT // -5;
+// // ,oracle.jdbc.OracleTypes.FLOAT // 6;
+// // ,oracle.jdbc.OracleTypes.REAL // 7;
+// // ,oracle.jdbc.OracleTypes.DOUBLE // 8;
+// // ,oracle.jdbc.OracleTypes.NUMERIC // 2;
+// // ,oracle.jdbc.OracleTypes.DECIMAL // 3;
+// // ,oracle.jdbc.OracleTypes.CHAR // 1;
+// // ,oracle.jdbc.OracleTypes.VARCHAR // 12;
+// // ,oracle.jdbc.OracleTypes.LONGVARCHAR // -1;
+// // ,oracle.jdbc.OracleTypes.DATE // 91;
+// // ,oracle.jdbc.OracleTypes.TIME // 92;
+// // ,oracle.jdbc.OracleTypes.TIMESTAMP // 93;
+// // // ,oracle.jdbc.OracleTypes.TIMESTAMPNS // -100; //<- Deprecated
+// // ,oracle.jdbc.OracleTypes.TIMESTAMPTZ // -101;
+// // ,oracle.jdbc.OracleTypes.TIMESTAMPLTZ // -102;
+// // ,oracle.jdbc.OracleTypes.INTERVALYM // -103;
+// // ,oracle.jdbc.OracleTypes.INTERVALDS // -104;
+// // ,oracle.jdbc.OracleTypes.BINARY // -2;
+// // /// ,oracle.jdbc.OracleTypes.VARBINARY // -3;
+// // ,oracle.jdbc.OracleTypes.LONGVARBINARY // -4;
+// // ,oracle.jdbc.OracleTypes.ROWID // -8;
+// // ,oracle.jdbc.OracleTypes.CURSOR // -10;
+// // ,oracle.jdbc.OracleTypes.BLOB // 2004;
+// // ,oracle.jdbc.OracleTypes.CLOB // 2005;
+// // // ,oracle.jdbc.OracleTypes.BFILE // -13;
+// // // ,oracle.jdbc.OracleTypes.STRUCT // 2002;
+// // // ,oracle.jdbc.OracleTypes.ARRAY // 2003;
+// // ,oracle.jdbc.OracleTypes.REF // 2006;
+// // ,oracle.jdbc.OracleTypes.NCHAR // -15;
+// // ,oracle.jdbc.OracleTypes.NCLOB // 2011;
+// // ,oracle.jdbc.OracleTypes.NVARCHAR // -9;
+// // ,oracle.jdbc.OracleTypes.LONGNVARCHAR // -16;
+// // // ,oracle.jdbc.OracleTypes.SQLXML // 2009;
+// // // ,oracle.jdbc.OracleTypes.OPAQUE // 2007;
+// // // ,oracle.jdbc.OracleTypes.JAVA_STRUCT // 2008;
+// // // ,oracle.jdbc.OracleTypes.JAVA_OBJECT // 2000;
+// // // ,oracle.jdbc.OracleTypes.PLSQL_INDEX_TABLE // -14;
+// // ,oracle.jdbc.OracleTypes.BINARY_FLOAT // 100;
+// // ,oracle.jdbc.OracleTypes.BINARY_DOUBLE // 101;
+// // ,oracle.jdbc.OracleTypes.NULL // 0;
+// // ,oracle.jdbc.OracleTypes.NUMBER // 2;
+// // // ,oracle.jdbc.OracleTypes.RAW // -2;
+// // // ,oracle.jdbc.OracleTypes.OTHER // 1111;
+// // ,oracle.jdbc.OracleTypes.FIXED_CHAR // 999;
+// // // ,oracle.jdbc.OracleTypes.DATALINK // 70;
+// // ,oracle.jdbc.OracleTypes.BOOLEAN // 16;
+// // };
+//
+// /**
+// * Constants for things belonging to sqoop...
+// */
+// public static final class Sqoop {
+// private Sqoop() {
+// }
+//
+// /**
+// * What type of Sqoop tool is being run.
+// */
+// public enum Tool {
+// UNKNOWN, IMPORT, EXPORT
+// }
+//
+// public static final String IMPORT_TOOL_NAME = "import";
+// public static final String MAX_MAPREDUCE_ATTEMPTS =
+// "mapred.map.max.attempts";
+// }
+//
+/**
+ * Constants for things belonging to Oracle...
+ */
+ public static final class Oracle {
+ private Oracle() {
+ }
+
+ public static final int ROWID_EXTENDED_ROWID_TYPE = 1;
+ public static final int ROWID_MAX_ROW_NUMBER_PER_BLOCK = 32767;
+
+ // This is how you comment-out a line of SQL text in Oracle.
+ public static final String ORACLE_SQL_STATEMENT_COMMENT_TOKEN = "--";
+
+ public static final String OBJECT_TYPE_TABLE = "TABLE";
+
+ public static final String URITYPE = "URITYPE";
+
+ public static final int MAX_IDENTIFIER_LENGTH = 30; // <- Max length of an
+ // Oracle name
+ // (table-name,
+ // partition-name etc.)
+
+ public static final String HINT_SYNTAX = "/*+ %s */ "; // Syntax for a hint
+ // in Oracle
+ }
+//
+// /**
+// * Logging constants.
+// */
+// public static class Logging {
+// /**
+// * Level of log to output.
+// */
+// public enum Level {
+// TRACE, DEBUG, INFO, WARN, ERROR, FATAL
+// }
+// }
+//
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorUpgrader.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorUpgrader.java
new file mode 100644
index 0000000..30693af
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcConnectorUpgrader.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
+import org.apache.sqoop.model.MFromConfig;
+import org.apache.sqoop.model.MLinkConfig;
+import org.apache.sqoop.model.MToConfig;
+
+// NOTE: All config types have the similar upgrade path at this point
+public class OracleJdbcConnectorUpgrader extends ConnectorConfigurableUpgrader {
+
+ @Override
+ public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
+ ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+ }
+
+ @Override
+ public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
+ ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+ }
+
+ @Override
+ public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
+ ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcExtractor.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcExtractor.java
new file mode 100644
index 0000000..df15fc2
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcExtractor.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleDataChunk;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumn;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumns;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.ColumnType;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+public class OracleJdbcExtractor extends
+ Extractor<LinkConfiguration, FromJobConfiguration, OracleJdbcPartition> {
+
+ private static final Logger LOG = Logger.getLogger(OracleJdbcExtractor.class);
+
+ private Connection connection;
+ private OracleTable table;
+ private int mapperId; // <- The index of this Hadoop mapper
+ private long rowsRead = 0;
+
+ private OracleTableColumns tableColumns;
+
+ private OracleJdbcPartition dbInputSplit; // <- The split this record-reader
+ // is working on.
+ private int numberOfBlocksInThisSplit; // <- The number of Oracle blocks in
+ // this Oracle data-chunk.
+ private int numberOfBlocksProcessedInThisSplit; // <- How many Oracle blocks
+ // we've processed with this
+ // record-reader.
+ private String currentDataChunkId; // <- The id of the current data-chunk
+ // being processed
+ private ResultSet results; // <- The ResultSet containing the data from the
+ // query returned by getSelectQuery()
+ private int columnIndexDataChunkIdZeroBased = -1; // <- The zero-based column
+ // index of the
+ // data_chunk_id column.
+ private boolean progressCalculationErrorLogged; // <- Whether we've logged a
+ // problem with the progress
+ // calculation during
+ // nextKeyValue().
+ private Object oraOopOraStats; // <- A reference to the Oracle statistics
+ // object that is being tracked for this Oracle
+ // session.
+ private boolean profilingEnabled; // <- Whether to collect profiling metrics
+ private long timeSpentInNextKeyValueInNanoSeconds; // <- Total time spent in
+ // super.nextKeyValue()
+
+ private static final DateTimeFormatter TIMESTAMP_TIMEZONE =
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z");
+
+ @Override
+ public void extract(ExtractorContext context,
+ LinkConfiguration linkConfiguration,
+ FromJobConfiguration jobConfiguration, OracleJdbcPartition partition) {
+ //TODO: Mapper ID
+ mapperId = 1;
+ dbInputSplit = partition;
+
+ // Retrieve the JDBC URL that should be used by this mapper.
+ String mapperJdbcUrlPropertyName =
+ OracleUtilities.getMapperJdbcUrlPropertyName(mapperId);
+ String mapperJdbcUrl = context.getString(mapperJdbcUrlPropertyName, null);
+
+ LOG.debug(String.format("Mapper %d has a JDBC URL of: %s", mapperId,
+ mapperJdbcUrl == null ? "<null>" : mapperJdbcUrl));
+
+ try {
+ connection = OracleConnectionFactory.createOracleJdbcConnection(
+ OracleJdbcConnectorConstants.ORACLE_JDBC_DRIVER_CLASS,
+ mapperJdbcUrl,
+ linkConfiguration.connectionConfig.username,
+ linkConfiguration.connectionConfig.password);
+ } catch (SQLException ex) {
+ throw new RuntimeException(String.format(
+ "Unable to connect to the Oracle database at %s\nError:%s",
+ linkConfiguration.connectionConfig.connectionString, ex
+ .getMessage()), ex);
+ }
+
+ table = OracleUtilities.decodeOracleTableName(
+ linkConfiguration.connectionConfig.username,
+ jobConfiguration.fromJobConfig.tableName);
+
+ try {
+ String thisOracleInstanceName =
+ OracleQueries.getCurrentOracleInstanceName(connection);
+
+ LOG.info(String.format(
+ "This record reader is connected to Oracle via the JDBC URL: \n"
+ + "\t\"%s\"\n" + "\tto the Oracle instance: \"%s\"", mapperJdbcUrl,
+ thisOracleInstanceName));
+
+ OracleConnectionFactory.initializeOracleConnection(
+ connection, linkConfiguration.connectionConfig);
+ } catch(SQLException ex) {
+ throw new RuntimeException(String.format(
+ "Unable to initialize connection to the Oracle database at %s\n"
+ + "Error:%s",
+ linkConfiguration.connectionConfig.connectionString, ex
+ .getMessage()), ex);
+ }
+
+ try {
+ tableColumns =
+ OracleQueries.getFromTableColumns(connection, table, OracleUtilities.
+ omitLobAndLongColumnsDuringImport(jobConfiguration.fromJobConfig),
+ true // <- onlyOraOopSupportedTypes
+ );
+ } catch (SQLException ex) {
+ LOG.error(String.format(
+ "Unable to obtain the data-types of the columns in table %s.\n"
+ + "Error:\n%s", table.toString(), ex.getMessage()));
+ throw new RuntimeException(ex);
+ }
+
+ this.numberOfBlocksInThisSplit =
+ this.dbInputSplit.getTotalNumberOfBlocksInThisSplit();
+ this.numberOfBlocksProcessedInThisSplit = 0;
+
+ extractData(context, jobConfiguration.fromJobConfig);
+
+ try {
+ connection.close();
+ } catch(SQLException ex) {
+ throw new RuntimeException(String.format(
+ "Unable to close connection to the Oracle database at %s\nError:%s",
+ linkConfiguration.connectionConfig.connectionString, ex
+ .getMessage()), ex);
+ }
+ }
+
+ private Object getObjectAtName(ResultSet resultSet,
+ OracleTableColumn column, Column sqoopColumn) throws SQLException {
+ Object result = null;
+ if(sqoopColumn.getType() == ColumnType.TEXT) {
+ result = resultSet.getString(column.getName());
+ } else if (column.getOracleType() == OracleQueries
+ .getOracleType("TIMESTAMP")) {
+ Timestamp timestamp = resultSet.getTimestamp(column.getName());
+ if(timestamp!=null) {
+ result = LocalDateTime.fromDateFields(timestamp);
+ }
+ } else if (column.getOracleType() == OracleQueries
+ .getOracleType("TIMESTAMPTZ")
+ || column.getOracleType() == OracleQueries
+ .getOracleType("TIMESTAMPLTZ")) {
+ Timestamp timestamp = resultSet.getTimestamp(column.getName());
+ if(timestamp!=null) {
+ //TODO: BC dates
+ String dateTimeStr = resultSet.getString(column.getName());
+ result = DateTime.parse(dateTimeStr, TIMESTAMP_TIMEZONE);
+ }
+ } else {
+ result = resultSet.getObject(column.getName());
+ }
+ return result;
+ }
+
+ private void extractData(ExtractorContext context, FromJobConfig jobConfig) {
+ String sql = getSelectQuery(jobConfig, context.getContext());
+ Column[] columns = context.getSchema().getColumnsArray();
+ int columnCount = columns.length;
+ try {
+ PreparedStatement statement = connection.prepareStatement(sql);
+ ResultSet resultSet = statement.executeQuery();
+
+ while(resultSet.next()) {
+ Object[] array = new Object[columnCount];
+ for(int i = 0; i < columnCount; i++) {
+ OracleTableColumn tableColumn =
+ tableColumns.findColumnByName(columns[i].getName());
+ array[i] = getObjectAtName(resultSet, tableColumn, columns[i]);
+ }
+ context.getDataWriter().writeArrayRecord(array);
+ rowsRead++;
+ }
+
+ resultSet.close();
+ statement.close();
+ } catch (SQLException ex) {
+ LOG.error(String.format("Error in %s while executing the SQL query:\n"
+ + "%s\n\n" + "%s", OracleUtilities.getCurrentMethodName(), sql, ex
+ .getMessage()));
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public long getRowsRead() {
+ return rowsRead;
+ }
+
+ private String getSelectQuery(FromJobConfig jobConfig,
+ ImmutableContext context) {
+
+ boolean consistentRead = BooleanUtils.isTrue(jobConfig.consistentRead);
+ long consistentReadScn = context.getLong(
+ OracleJdbcConnectorConstants.ORACLE_IMPORT_CONSISTENT_READ_SCN, 0L);
+ if (consistentRead && consistentReadScn == 0L) {
+ throw new RuntimeException("Could not get SCN for consistent read.");
+ }
+
+ StringBuilder query = new StringBuilder();
+
+ if (this.dbInputSplit.getDataChunks() == null) {
+ String errMsg =
+ String.format("The %s does not contain any data-chunks, within %s.",
+ this.dbInputSplit.getClass().getName(), OracleUtilities
+ .getCurrentMethodName());
+ throw new RuntimeException(errMsg);
+ }
+
+ OracleUtilities.OracleTableImportWhereClauseLocation whereClauseLocation =
+ OracleUtilities.getTableImportWhereClauseLocation(jobConfig,
+ OracleUtilities.OracleTableImportWhereClauseLocation.SUBSPLIT);
+
+ int numberOfDataChunks = this.dbInputSplit.getNumberOfDataChunks();
+ for (int idx = 0; idx < numberOfDataChunks; idx++) {
+
+ OracleDataChunk dataChunk =
+ this.dbInputSplit.getDataChunks().get(idx);
+
+ if (idx > 0) {
+ query.append("UNION ALL \n");
+ }
+
+ query.append(getColumnNamesClause(tableColumns,
+ dataChunk.getId(), jobConfig)) // <- SELECT clause
+ .append("\n");
+
+ query.append(" FROM ").append(table.toString()).append(" ");
+
+ if (consistentRead) {
+ query.append("AS OF SCN ").append(consistentReadScn).append(" ");
+ }
+
+ query.append(getPartitionClauseForDataChunk(this.dbInputSplit, idx))
+ .append(" t").append("\n");
+
+ query.append(" WHERE (").append(
+ getWhereClauseForDataChunk(this.dbInputSplit, idx)).append(")\n");
+
+ // If the user wants the WHERE clause applied to each data-chunk...
+ if (whereClauseLocation == OracleUtilities.
+ OracleTableImportWhereClauseLocation.SUBSPLIT) {
+ String conditions = jobConfig.conditions;
+ if (conditions != null && conditions.length() > 0) {
+ query.append(" AND (").append(conditions).append(")\n");
+ }
+ }
+
+ }
+
+ // If the user wants the WHERE clause applied to the whole split...
+ if (whereClauseLocation == OracleUtilities.
+ OracleTableImportWhereClauseLocation.SPLIT) {
+ String conditions = jobConfig.conditions;
+ if (conditions != null && conditions.length() > 0) {
+
+ // Insert a "select everything" line at the start of the SQL query...
+ query.insert(0, getColumnNamesClause(tableColumns, null, jobConfig) +
+ " FROM (\n");
+
+ // ...and then apply the WHERE clause to all the UNIONed sub-queries...
+ query.append(")\n").append("WHERE\n").append(conditions).append("\n");
+ }
+ }
+
+ LOG.info("SELECT QUERY = \n" + query.toString());
+
+ return query.toString();
+ }
+
+ private String getColumnNamesClause(OracleTableColumns tableColumns,
+ String dataChunkId, FromJobConfig jobConfig) {
+
+ StringBuilder result = new StringBuilder();
+
+ result.append("SELECT ");
+ result.append(OracleUtilities.getImportHint(jobConfig));
+
+ int firstFieldIndex = 0;
+ int lastFieldIndex = tableColumns.size();
+ for (int i = firstFieldIndex; i < lastFieldIndex; i++) {
+ if (i > firstFieldIndex) {
+ result.append(",");
+ }
+
+ OracleTableColumn oracleTableColumn = tableColumns.get(i);
+ String fieldName = oracleTableColumn.getName();
+ if (oracleTableColumn != null) {
+ if (oracleTableColumn.getDataType().equals(
+ OracleJdbcConnectorConstants.Oracle.URITYPE)) {
+ fieldName = String.format("uritype.geturl(%s) %s", fieldName,
+ fieldName);
+ }
+ }
+
+ result.append(fieldName);
+ }
+ // We need to insert the value of that data_chunk_id now...
+ if (dataChunkId != null && !dataChunkId.isEmpty()) {
+ String fieldName =
+ String.format(",'%s' %s", dataChunkId,
+ OracleJdbcConnectorConstants.COLUMN_NAME_DATA_CHUNK_ID);
+ result.append(fieldName);
+ }
+ return result.toString();
+ }
+
+ private String getPartitionClauseForDataChunk(OracleJdbcPartition split,
+ int dataChunkIndex) {
+ OracleDataChunk dataChunk = split.getDataChunks().get(dataChunkIndex);
+ return dataChunk.getPartitionClause();
+ }
+
+ private String getWhereClauseForDataChunk(OracleJdbcPartition split,
+ int dataChunkIndex) {
+
+ OracleDataChunk dataChunk = split.getDataChunks().get(dataChunkIndex);
+ return dataChunk.getWhereClause();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromDestroyer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromDestroyer.java
new file mode 100644
index 0000000..bd6fd0a
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromDestroyer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class OracleJdbcFromDestroyer extends
+ Destroyer<LinkConfiguration, FromJobConfiguration> {
+
+ @Override
+ public void destroy(DestroyerContext context,
+ LinkConfiguration linkConfiguration,
+ FromJobConfiguration jobConfiguration) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromInitializer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromInitializer.java
new file mode 100644
index 0000000..62a0e84
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcFromInitializer.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.sql.SQLException;
+
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.job.etl.InitializerContext;
+
+public class OracleJdbcFromInitializer extends
+ OracleJdbcCommonInitializer<FromJobConfiguration> {
+
+ private static final Logger LOG =
+ Logger.getLogger(OracleJdbcFromInitializer.class);
+
+ @Override
+ public void connect(InitializerContext context,
+ LinkConfiguration linkConfiguration,
+ FromJobConfiguration jobConfiguration) throws SQLException {
+ super.connect(context, linkConfiguration, jobConfiguration);
+ table = OracleUtilities.decodeOracleTableName(
+ linkConfiguration.connectionConfig.username,
+ jobConfiguration.fromJobConfig.tableName);
+ }
+
+ @Override
+ public void initialize(InitializerContext context,
+ LinkConfiguration linkConfiguration,
+ FromJobConfiguration jobConfiguration) {
+ super.initialize(context, linkConfiguration, jobConfiguration);
+ LOG.debug("Running Oracle JDBC connector FROM initializer");
+
+ try {
+ if(OracleQueries.isTableAnIndexOrganizedTable(connection, table)) {
+ if(OracleUtilities.getOraOopOracleDataChunkMethod(
+ jobConfiguration.fromJobConfig) !=
+ OracleUtilities.OracleDataChunkMethod.PARTITION) {
+ throw new RuntimeException(String.format("Cannot process this Sqoop"
+ + " connection, as the Oracle table %s is an"
+ + " index-organized table. If the table is"
+ + " partitioned, set the data chunk method to "
+ + OracleUtilities.OracleDataChunkMethod.PARTITION
+ + ".",
+ table.toString()));
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(String.format(
+ "Unable to determine whether the Oracle table %s is an"
+ + "index-organized table.", table.toString()), e);
+ }
+
+ if(BooleanUtils.isTrue(jobConfiguration.fromJobConfig.consistentRead)) {
+ Long scn = jobConfiguration.fromJobConfig.consistentReadScn;
+ if(scn==null || scn.equals(Long.valueOf(0L))) {
+ try {
+ scn = OracleQueries.getCurrentScn(connection);
+ } catch(SQLException e) {
+ throw new RuntimeException("Unable to determine SCN of database.",
+ e);
+ }
+ }
+ context.getContext().setLong(
+ OracleJdbcConnectorConstants.ORACLE_IMPORT_CONSISTENT_READ_SCN,
+ scn);
+ LOG.info("Performing a consistent read using SCN: " + scn);
+ }
+ }
+
+}