You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ve...@apache.org on 2014/06/29 18:03:18 UTC
[7/7] git commit: SQOOP-1287: Add high performance Oracle connector
into Sqoop (David Robson via Venkat Ranganathan)
SQOOP-1287: Add high performance Oracle connector into Sqoop
(David Robson via Venkat Ranganathan)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/6bfaa9d6
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/6bfaa9d6
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/6bfaa9d6
Branch: refs/heads/trunk
Commit: 6bfaa9d65395cf1cc365a4a6b9df492550c5c847
Parents: d03faf3
Author: Venkat Ranganathan <ve...@hortonworks.com>
Authored: Sun Jun 29 09:02:05 2014 -0700
Committer: Venkat Ranganathan <ve...@hortonworks.com>
Committed: Sun Jun 29 09:02:05 2014 -0700
----------------------------------------------------------------------
COMPILING.txt | 8 +-
build.xml | 5 +
conf/oraoop-site-template.xml | 103 ++
src/java/org/apache/sqoop/ConnFactory.java | 10 +-
.../sqoop/manager/oracle/OraOopConnManager.java | 630 +++++++
.../sqoop/manager/oracle/OraOopConstants.java | 512 ++++++
.../manager/oracle/OraOopDBInputSplit.java | 195 ++
.../manager/oracle/OraOopDBRecordReader.java | 468 +++++
.../oracle/OraOopDataDrivenDBInputFormat.java | 359 ++++
.../sqoop/manager/oracle/OraOopGenerics.java | 64 +
.../sqoop/manager/oracle/OraOopJdbcUrl.java | 232 +++
.../apache/sqoop/manager/oracle/OraOopLog.java | 235 +++
.../sqoop/manager/oracle/OraOopLogFactory.java | 54 +
.../sqoop/manager/oracle/OraOopLogMessage.java | 61 +
.../manager/oracle/OraOopManagerFactory.java | 1126 ++++++++++++
.../manager/oracle/OraOopOracleDataChunk.java | 68 +
.../oracle/OraOopOracleDataChunkExtent.java | 93 +
.../oracle/OraOopOracleDataChunkPartition.java | 78 +
.../manager/oracle/OraOopOracleQueries.java | 1687 ++++++++++++++++++
.../manager/oracle/OraOopOutputFormatBase.java | 713 ++++++++
.../oracle/OraOopOutputFormatInsert.java | 263 +++
.../oracle/OraOopOutputFormatUpdate.java | 418 +++++
.../sqoop/manager/oracle/OraOopUtilities.java | 1461 +++++++++++++++
.../manager/oracle/OracleActiveInstance.java | 44 +
.../manager/oracle/OracleConnectionFactory.java | 217 +++
.../sqoop/manager/oracle/OracleTable.java | 68 +
.../sqoop/manager/oracle/OracleTableColumn.java | 59 +
.../manager/oracle/OracleTableColumns.java | 43 +
.../manager/oracle/OracleTablePartition.java | 50 +
.../manager/oracle/OracleTablePartitions.java | 62 +
.../sqoop/manager/oracle/OracleVersion.java | 84 +
.../com/cloudera/sqoop/manager/OracleUtils.java | 10 +
.../sqoop/testutil/BaseSqoopTestCase.java | 2 +
.../sqoop/testutil/ExportJobTestCase.java | 2 +
.../sqoop/testutil/ImportJobTestCase.java | 4 +
src/test/oraoop/create_users.sql | 49 +
src/test/oraoop/pkg_tst_product_gen.pbk | 126 ++
src/test/oraoop/pkg_tst_product_gen.psk | 45 +
src/test/oraoop/table_tst_product.xml | 90 +
src/test/oraoop/table_tst_product_part.xml | 103 ++
.../oraoop/table_tst_product_special_chars.xml | 90 +
src/test/oraoop/table_tst_product_subpart.xml | 105 ++
.../apache/sqoop/manager/oracle/ExportTest.java | 68 +
.../apache/sqoop/manager/oracle/ImportTest.java | 241 +++
.../manager/oracle/OraOopOracleQueriesTest.java | 54 +
.../sqoop/manager/oracle/OraOopTestCase.java | 321 ++++
.../manager/oracle/OraOopTestConstants.java | 62 +
.../oracle/OracleConnectionFactoryTest.java | 520 ++++++
.../sqoop/manager/oracle/SystemImportTest.java | 315 ++++
.../TestOraOopDataDrivenDBInputFormat.java | 131 ++
.../sqoop/manager/oracle/TestOraOopJdbcUrl.java | 276 +++
.../manager/oracle/TestOraOopUtilities.java | 619 +++++++
.../sqoop/manager/oracle/TestOracleTable.java | 42 +
.../sqoop/manager/oracle/TimestampDataTest.java | 51 +
.../oracle/util/BigDecimalGenerator.java | 57 +
.../oracle/util/BinaryDoubleGenerator.java | 32 +
.../oracle/util/BinaryFloatGenerator.java | 32 +
.../manager/oracle/util/BlobGenerator.java | 103 ++
.../manager/oracle/util/BytesGenerator.java | 52 +
.../manager/oracle/util/CharGenerator.java | 54 +
.../manager/oracle/util/FloatGenerator.java | 57 +
.../sqoop/manager/oracle/util/HadoopFiles.java | 37 +
.../oracle/util/IntervalDaySecondGenerator.java | 64 +
.../oracle/util/IntervalYearMonthGenerator.java | 50 +
.../manager/oracle/util/NCharGenerator.java | 54 +
.../oracle/util/OraOopTestDataGenerator.java | 67 +
.../manager/oracle/util/OraOopTestUtils.java | 60 +
.../sqoop/manager/oracle/util/OracleData.java | 192 ++
.../oracle/util/OracleDataDefinition.java | 66 +
.../oracle/util/OracleTableDefinition.java | 150 ++
.../manager/oracle/util/RowIdGenerator.java | 64 +
.../manager/oracle/util/TimestampGenerator.java | 71 +
.../sqoop/manager/oracle/util/URIGenerator.java | 57 +
73 files changed, 14112 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/COMPILING.txt
----------------------------------------------------------------------
diff --git a/COMPILING.txt b/COMPILING.txt
index ddbed30..ae28411 100644
--- a/COMPILING.txt
+++ b/COMPILING.txt
@@ -85,9 +85,12 @@ jdbc:mysql://localhost/
=== Oracle
-Install Oracle XE (Express edition) 10.2.0. Instructions for configuring the
+Install Oracle Enterprise Edition 10.2.0+. Instructions for configuring the
database are in OracleManagerTest. Download the ojdbc6_g jar.
+If running the tests against Oracle XE (Express Edition) - a lot of them will
+fail as it does not include the partitioning feature.
+
Use the system property sqoop.test.oracle.connectstring to specify the
connection string for Oracle host used for testing. Specify this property on the
command line or via the build.properties file. For example:
@@ -97,6 +100,9 @@ sqoop.test.oracle.connectstring=jdbc:oracle:thin:@//host.example.com/xe
If not specified, the default value used for this property is:
jdbc:oracle:thin:@//localhost/xe
+Users sqooptest and sqooptest2 should be created prior to running the tests.
+SQL script is available in src/test/oraoop/create_users.sql
+
=== PostgreSQL
Install PostgreSQL 8.3.9. Download the postgresql 8.4 jdbc driver. Instructions
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 2dc99a8..ec5d2fa 100644
--- a/build.xml
+++ b/build.xml
@@ -827,6 +827,11 @@
<mkdir dir="${cobertura.class.dir}" />
<copy file="${test.dir}/fi-site.xml"
todir="${test.build.extraconf}" />
+ <copy file="${basedir}/conf/oraoop-site-template.xml"
+ todir="${test.build.extraconf}" />
+ <copy todir="${test.build.extraconf}/oraoop">
+ <fileset dir="${test.dir}/oraoop"/>
+ </copy>
<junit
printsummary="yes" showoutput="${test.output}"
haltonfailure="no" fork="yes" maxmemory="512m"
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/conf/oraoop-site-template.xml
----------------------------------------------------------------------
diff --git a/conf/oraoop-site-template.xml b/conf/oraoop-site-template.xml
new file mode 100644
index 0000000..eac9796
--- /dev/null
+++ b/conf/oraoop-site-template.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!-- Put OraOop-specific properties in this file. -->
+
+<configuration>
+
+ <property>
+ <name>oraoop.oracle.session.initialization.statements</name>
+ <value>alter session disable parallel query;
+ alter session set "_serial_direct_read"=true;
+ alter session set tracefile_identifier=oraoop;
+ --alter session set events '10046 trace name context forever, level 8';
+ </value>
+ <description>A semicolon-delimited list of Oracle statements that are executed, in order, to initialize each Oracle session.
+ Use {[property_name]|[default_value]} characters to refer to a Sqoop/Hadoop configuration property.
+ If the property does not exist, the specified default value will be used.
+ E.g. {oracle.sessionTimeZone|GMT} will equate to the value of the property named "oracle.sessionTimeZone" or
+ to "GMT" if this property has not been set.
+ </description>
+ </property>
+
+ <property>
+ <name>mapred.map.tasks.speculative.execution</name>
+ <value>false</value>
+ <description>Speculative execution is disabled to prevent redundant load on the Oracle database.
+ </description>
+ </property>
+
+ <property>
+ <name>oraoop.import.hint</name>
+ <value>NO_INDEX(t)</value>
+ <description>Hint to add to the SELECT statement for an IMPORT job.
+ The table will have an alias of t which can be used in the hint.
+ By default the NO_INDEX hint is applied to stop the use of an index.
+ To override this in oraoop-site.xml set the value to a blank string.
+ </description>
+ </property>
+
+<!--
+ <property>
+ <name>oraoop.block.allocation</name>
+ <value>ROUNDROBIN</value>
+ <description>Supported values are: ROUNDROBIN or SEQUENTIAL or RANDOM.
+ Refer to the OraOop documentation for more details.
+ </description>
+ </property>
+-->
+
+<!--
+ <property>
+ <name>oraoop.import.omit.lobs.and.long</name>
+ <value>false</value>
+ <description>If true, OraOop will omit BLOB, CLOB, NCLOB and LONG columns during an Import.
+ </description>
+ </property>
+-->
+
+<!--
+ <property>
+ <name>oraoop.table.import.where.clause.location</name>
+ <value>SUBSPLIT</value>
+ <description>Supported values are: SUBSPLIT or SPLIT.
+ Refer to the OraOop documentation for more details.
+ </description>
+ </property>
+-->
+
+<!--
+ <property>
+ <name>oraoop.oracle.append.values.hint.usage</name>
+ <value>AUTO</value>
+ <description>Supported values are: AUTO or ON or OFF.
+ ON:
+ OraOop will use the APPEND_VALUES Oracle hint during a Sqoop export, when inserting
+ data into an Oracle table.
+ OFF:
+ OraOop will not use the APPEND_VALUES Oracle hint during a Sqoop export.
+ AUTO:
+ For OraOop 1.1, the AUTO setting will not use the APPEND_VALUES hint.
+ </description>
+ </property>
+-->
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/ConnFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/ConnFactory.java b/src/java/org/apache/sqoop/ConnFactory.java
index 61d3307..2276525 100644
--- a/src/java/org/apache/sqoop/ConnFactory.java
+++ b/src/java/org/apache/sqoop/ConnFactory.java
@@ -43,6 +43,7 @@ import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.util.ClassLoaderStack;
import org.apache.sqoop.manager.GenericJdbcManager;
+import org.apache.sqoop.manager.oracle.OraOopManagerFactory;
/**
* Factory class to create the ConnManager type required
@@ -70,8 +71,12 @@ public class ConnFactory {
// The default value for sqoop.connection.factories is the
// name of the DefaultManagerFactory.
+ public static final String[] DEFAULT_FACTORY_CLASS_NAMES_ARR =
+ {OraOopManagerFactory.class.getName(),
+ DefaultManagerFactory.class.getName(), };
+
public static final String DEFAULT_FACTORY_CLASS_NAMES =
- DefaultManagerFactory.class.getName();
+ StringUtils.arrayToString(DEFAULT_FACTORY_CLASS_NAMES_ARR);
/** The list of ManagerFactory instances consulted by getManager().
*/
@@ -84,7 +89,8 @@ public class ConnFactory {
private void instantiateFactories(Configuration conf) {
loadManagersFromConfDir(conf);
String [] classNameArray =
- conf.getStrings(FACTORY_CLASS_NAMES_KEY, DEFAULT_FACTORY_CLASS_NAMES);
+ conf.getStrings(FACTORY_CLASS_NAMES_KEY,
+ DEFAULT_FACTORY_CLASS_NAMES_ARR);
for (String className : classNameArray) {
try {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java b/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
new file mode 100644
index 0000000..302849c
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
@@ -0,0 +1,630 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.OutputFormat;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.manager.GenericJdbcManager;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.JdbcExportJob;
+import com.cloudera.sqoop.mapreduce.JdbcUpdateExportJob;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * OraOop manager for high performance Oracle import / export.
+ * NOTES:
+ * Escaping Column Names:
+ * ----------------------
+ * There are 3 main queries that occur during a Sqoop import.
+ * (1) Selecting columns to obtain their data-type via getColTypesQuery();
+ * (2) selecting column names via getColNamesQuery(); and
+ * (3) getting the data during the import via
+ * OraOopDBRecordReader.getSelectQuery();
+ * In each of these queries, we'd ideally escape the column names so that
+ * Oracle columns that require this work okay.
+ * Unfortunately we can't do this, because if the user specifies column
+ * names via the "--columns" clause, these names will be used (verbatim)
+ * during OraOopDBRecordReader.getSelectQuery(). This means that we could
+ * only escape the column names during OraOopDBRecordReader.getSelectQuery()
+ * if the user entered them in the correct case.
+ * Therefore, escapeColName() in this class does not actually do anything so
+ * that OraOopDBRecordReader.getSelectQuery() generates a valid SQL statement
+ * when the user utilises the "--columns" clause.
+ * However, getColTypesQuery() and getColNamesQuery() do escape column names
+ * via the method escapeOracleColumnName(). We also get getColumnTypes() to
+ * unescape the column names so that Sqoop has the most accurate column
+ * name strings.
+ */
+public class OraOopConnManager extends GenericJdbcManager {
+
+ public static final OraOopLog LOG = OraOopLogFactory
+ .getLog(OraOopConnManager.class.getName());
+ private List<String> columnNamesInOracleTable = null;
+ private Map<String, Integer> columnTypesInOracleTable = null;
+ private final String timestampJavaType;
+
+ public OraOopConnManager(final SqoopOptions sqoopOptions) {
+ super(OraOopConstants.ORACLE_JDBC_DRIVER_CLASS, sqoopOptions);
+ if (this.options.getConf().getBoolean(
+ OraOopConstants.ORAOOP_MAP_TIMESTAMP_AS_STRING,
+ OraOopConstants.ORAOOP_MAP_TIMESTAMP_AS_STRING_DEFAULT)) {
+ timestampJavaType = "String";
+ } else {
+ timestampJavaType = super.toJavaType(Types.TIMESTAMP);
+ }
+ }
+
+ @Override
+ protected Connection makeConnection() throws SQLException {
+
+ String connectStr = this.options.getConnectString();
+ String username = this.options.getUsername();
+ String password = this.options.getPassword();
+ Properties additionalProps = this.options.getConnectionParams();
+
+ Connection connection =
+ OracleConnectionFactory.createOracleJdbcConnection(this
+ .getDriverClass(), connectStr, username, password, additionalProps);
+
+ return connection;
+ }
+
+ @Override
+ public void close() throws SQLException {
+
+ super.close();
+ }
+
+ private List<String> getColumnNamesInOracleTable(String tableName) {
+
+ if (this.columnNamesInOracleTable == null) {
+
+ OracleTable tableContext = null;
+
+ try {
+ tableContext = getOracleTableContext();
+
+ Configuration conf = this.options.getConf();
+
+ this.columnNamesInOracleTable =
+ OraOopOracleQueries.getTableColumnNames(getConnection(),
+ tableContext, OraOopUtilities
+ .omitLobAndLongColumnsDuringImport(conf), OraOopUtilities
+ .recallSqoopJobType(conf), true, // <-
+ // onlyOraOopSupportedTypes
+ true // <- omitOraOopPseudoColumns
+ );
+ } catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ // Return a copy of our list, so the original will not be inadvertently
+ // altered...
+ return OraOopUtilities.copyStringList(this.columnNamesInOracleTable);
+ }
+
+ protected List<String> getSelectedColumnNamesInOracleTable(String tableName) {
+
+ List<String> colNamesInTable = getColumnNamesInOracleTable(tableName);
+
+ String[] selectedColumns = this.options.getColumns();
+ if (selectedColumns != null && selectedColumns.length > 0) {
+
+ for (int idx = 0; idx < selectedColumns.length; idx++) {
+
+ String selectedColumn = selectedColumns[idx];
+ // If the user did not escape this column name, then we should
+ // uppercase it...
+ if (!isEscaped(selectedColumn)) {
+ selectedColumns[idx] = selectedColumn.toUpperCase();
+ } else {
+ // If the user escaped this column name, then we should
+ // retain its case...
+ selectedColumns[idx] = unescapeOracleColumnName(selectedColumn);
+ }
+ }
+
+ // Ensure there are no duplicated column names...
+ String[] duplicates =
+ OraOopUtilities
+ .getDuplicatedStringArrayValues(selectedColumns, false);
+ if (duplicates.length > 0) {
+ StringBuilder msg = new StringBuilder();
+ msg.append("The following column names have been duplicated in the ");
+ msg.append("\"--columns\" clause:\n");
+
+ for (String duplicate : duplicates) {
+ msg.append("\t" + duplicate + "\n");
+ }
+
+ throw new RuntimeException(msg.toString());
+ }
+
+ // Ensure the user selected column names that actually exist...
+ for (String selectedColumn : selectedColumns) {
+ if (!colNamesInTable.contains(selectedColumn)) {
+ OracleTable tableContext = getOracleTableContext();
+ throw new RuntimeException(String.format(
+ "The column named \"%s\" does not exist within the table"
+ + "%s (or is of an unsupported data-type).", selectedColumn,
+ tableContext.toString()));
+ }
+ }
+
+ // Remove any columns (that exist in the table) that were not
+ // selected by the user...
+ for (int idx = colNamesInTable.size() - 1; idx >= 0; idx--) {
+ String colName = colNamesInTable.get(idx);
+ if (!OraOopUtilities.stringArrayContains(selectedColumns, colName,
+ false)) {
+ colNamesInTable.remove(idx);
+ }
+ }
+ }
+
+ // To assist development/testing of Oracle data-types, you can use this
+ // to limit the number of columns from the table...
+ int columnNameLimit =
+ this.options.getConf().getInt("oraoop.column.limit", 0);
+ if (columnNameLimit > 0) {
+ columnNameLimit = Math.min(columnNameLimit, colNamesInTable.size());
+ colNamesInTable = colNamesInTable.subList(0, columnNameLimit);
+ }
+
+ return colNamesInTable;
+ }
+
+ @Override
+ protected String getColTypesQuery(String tableName) {
+
+ List<String> colNames = getSelectedColumnNamesInOracleTable(tableName);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ");
+ for (int idx = 0; idx < colNames.size(); idx++) {
+ if (idx > 0) {
+ sb.append(",");
+ }
+ sb.append(escapeOracleColumnName(colNames.get(idx))); // <- See notes at
+ // top about escaped
+ // column names
+ }
+ sb.append(String.format(" FROM %s WHERE 0=1", tableName));
+
+ return sb.toString();
+ }
+
+ @Override
+ protected String getColNamesQuery(String tableName) {
+
+ // NOTE: This code is similar to getColTypesQuery() - except the
+ // escaping of column names and table name differs.
+
+ List<String> colNames = getSelectedColumnNamesInOracleTable(tableName);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ");
+ for (int idx = 0; idx < colNames.size(); idx++) {
+ if (idx > 0) {
+ sb.append(",");
+ }
+ sb.append(escapeColName(colNames.get(idx))); // <- See notes at top about
+ // escaped column names
+ }
+ sb.append(String.format(" FROM %s WHERE 1=0", escapeTableName(tableName)));
+
+ return sb.toString();
+ }
+
+ @Override
+ protected String getSplitColumn(SqoopOptions opts, String tableName) {
+
+ // If we're importing an Oracle table and will be generating
+ // "splits" based on its Oracle data-files, we don't actually require
+ // a primary key to exist, or for the user to identify the split-column.
+ // As a consequence, return "NotRequired" to prevent sqoop code
+ // such as SqlManager.importTable() from throwing an exception.
+ //
+ // NB: The tableName parameter will be null if no table is involved,
+ // such as when importing data via an (arbitrary) SQL query.
+ if (tableName != null) {
+ return OraOopConstants.TABLE_SPLIT_COLUMN_NOT_REQUIRED;
+ } else {
+ return super.getSplitColumn(opts, tableName);
+ }
+ }
+
+ @Override
+ public void importTable(ImportJobContext context) throws IOException,
+ ImportException {
+
+ logImportTableDetails(context);
+
+ context.setConnManager(this);
+
+ // Specify the Oracle-specific DBInputFormat for import.
+ context.setInputFormat(OraOopDataDrivenDBInputFormat.class);
+
+ super.importTable(context);
+ }
+
+ @Override
+ public void exportTable(ExportJobContext context) throws IOException,
+ ExportException {
+
+ logExportTableDetails(context);
+
+ if (this.columnTypesInOracleTable == null) {
+ throw new ExportException("The column-types for the table are not"
+ + "known.");
+ }
+ if (this.columnTypesInOracleTable.containsValue(OraOopOracleQueries
+ .getOracleType("BINARY_DOUBLE"))) {
+ context.getOptions().getConf().setBoolean(
+ OraOopConstants.TABLE_CONTAINS_BINARY_DOUBLE_COLUMN, true);
+ }
+ if (this.columnTypesInOracleTable.containsValue(OraOopOracleQueries
+ .getOracleType("BINARY_FLOAT"))) {
+ context.getOptions().getConf().setBoolean(
+ OraOopConstants.TABLE_CONTAINS_BINARY_FLOAT_COLUMN, true);
+ }
+
+ context.setConnManager(this);
+
+ @SuppressWarnings("rawtypes")
+ Class<? extends OutputFormat> oraOopOutputFormatClass;
+ try {
+ oraOopOutputFormatClass = OraOopOutputFormatInsert.class;
+ } catch (NoClassDefFoundError ex) {
+ explainWhyExportClassCannotBeLoaded(ex, "OraOopOutputFormatInsert");
+ throw ex;
+ }
+ JdbcExportJob exportJob =
+ new JdbcExportJob(context, null, null, oraOopOutputFormatClass);
+ exportJob.runExport();
+ }
+
+ @Override
+ public void updateTable(ExportJobContext context) throws IOException,
+ ExportException {
+
+ logExportTableDetails(context);
+
+ context.setConnManager(this);
+
+ @SuppressWarnings("rawtypes")
+ Class<? extends OutputFormat> oraOopOutputFormatClass;
+ try {
+ oraOopOutputFormatClass = OraOopOutputFormatUpdate.class;
+ } catch (NoClassDefFoundError ex) {
+ explainWhyExportClassCannotBeLoaded(ex, "OraOopOutputFormatUpdate");
+ throw ex;
+ }
+
+ JdbcUpdateExportJob exportJob =
+ new JdbcUpdateExportJob(context, null, null, oraOopOutputFormatClass);
+ exportJob.runExport();
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+
+ close();
+ super.finalize();
+ }
+
+ @Override
+ public String toHiveType(int sqlType) {
+
+ String hiveType = super.toHiveType(sqlType);
+
+ if (hiveType == null) {
+
+ // http://wiki.apache.org/hadoop/Hive/Tutorial#Primitive_Types
+
+ if (sqlType == OraOopOracleQueries.getOracleType("BFILE")
+ || sqlType == OraOopOracleQueries.getOracleType("INTERVALYM")
+ || sqlType == OraOopOracleQueries.getOracleType("INTERVALDS")
+ || sqlType == OraOopOracleQueries.getOracleType("NCLOB")
+ || sqlType == OraOopOracleQueries.getOracleType("NCHAR")
+ || sqlType == OraOopOracleQueries.getOracleType("NVARCHAR")
+ || sqlType == OraOopOracleQueries.getOracleType("OTHER")
+ || sqlType == OraOopOracleQueries.getOracleType("ROWID")
+ || sqlType == OraOopOracleQueries.getOracleType("TIMESTAMPTZ")
+ || sqlType == OraOopOracleQueries.getOracleType("TIMESTAMPLTZ")
+ || sqlType == OraOopOracleQueries.getOracleType("STRUCT")) {
+ hiveType = "STRING";
+ }
+
+ if (sqlType == OraOopOracleQueries.getOracleType("BINARY_FLOAT")) {
+ hiveType = "FLOAT";
+ }
+
+ if (sqlType == OraOopOracleQueries.getOracleType("BINARY_DOUBLE")) {
+ hiveType = "DOUBLE";
+ }
+ }
+
+ if (hiveType == null) {
+ LOG.warn(String.format("%s should be updated to cater for data-type: %d",
+ OraOopUtilities.getCurrentMethodName(), sqlType));
+ }
+
+ return hiveType;
+ }
+
+ @Override
+ public String toJavaType(int sqlType) {
+
+ String javaType = super.toJavaType(sqlType);
+
+ if (sqlType == OraOopOracleQueries.getOracleType("TIMESTAMP")) {
+ // Get the Oracle JDBC driver to convert this value to a string
+ // instead of the generic JDBC driver.
+ // If the generic JDBC driver is used, it will take into account the
+ // timezone of the client machine's locale. The problem with this is that
+ // timestamp data should not be associated with a timezone. In practice,
+ // this
+ // leads to problems, for example, the time '2010-10-03 02:01:00' being
+ // changed to '2010-10-03 03:01:00' if the client machine's locale is
+ // Melbourne.
+ // (This is in response to daylight saving starting in Melbourne on
+ // this date at 2am.)
+ javaType = timestampJavaType;
+ }
+
+ if (sqlType == OraOopOracleQueries.getOracleType("TIMESTAMPTZ")) {
+ // Returning "String" produces: "2010-08-08 09:00:00.0 +10:00"
+ // Returning "java.sql.Timestamp" produces: "2010-08-08 09:00:00.0"
+
+ // If we use "java.sql.Timestamp", the field's value will not
+ // contain the timezone when converted to a string and written to the HDFS
+ // CSV file.
+ // I.e. Get the Oracle JDBC driver to convert this value to a string
+ // instead of the generic JDBC driver...
+ javaType = timestampJavaType;
+ }
+
+ if (sqlType == OraOopOracleQueries.getOracleType("TIMESTAMPLTZ")) {
+ // Returning "String" produces:
+ // "2010-08-08 09:00:00.0 Australia/Melbourne"
+ // Returning "java.sql.Timestamp" produces: "2010-08-08 09:00:00.0"
+ javaType = timestampJavaType;
+ }
+
+ /*
+ * http://www.oracle.com/technology/sample_code/tech/java/sqlj_jdbc/files
+ * /oracle10g/ieee/Readme.html
+ *
+ * BINARY_DOUBLE is a 64-bit, double-precision floating-point number
+ * datatype. (IEEE 754) Each BINARY_DOUBLE value requires 9 bytes, including
+ * a length byte. A 64-bit double format number X is divided as sign s 1-bit
+ * exponent e 11-bits fraction f 52-bits
+ *
+ * BINARY_FLOAT is a 32-bit, single-precision floating-point number
+ * datatype. (IEEE 754) Each BINARY_FLOAT value requires 5 bytes, including
+ * a length byte. A 32-bit single format number X is divided as sign s 1-bit
+ * exponent e 8-bits fraction f 23-bits
+ */
+ if (sqlType == OraOopOracleQueries.getOracleType("BINARY_FLOAT")) {
+ // http://people.uncw.edu/tompkinsj/133/numbers/Reals.htm
+ javaType = "Float";
+ }
+
+ if (sqlType == OraOopOracleQueries.getOracleType("BINARY_DOUBLE")) {
+ // http://people.uncw.edu/tompkinsj/133/numbers/Reals.htm
+ javaType = "Double";
+ }
+
+ if (sqlType == OraOopOracleQueries.getOracleType("STRUCT")) {
+ // E.g. URITYPE
+ javaType = "String";
+ }
+
+ if (javaType == null) {
+
+ // For constant values, refer to:
+ // http://oracleadvisor.com/documentation/oracle/database/11.2/
+ // appdev.112/e13995/constant-values.html#oracle_jdbc
+
+ if (sqlType == OraOopOracleQueries.getOracleType("BFILE")
+ || sqlType == OraOopOracleQueries.getOracleType("NCLOB")
+ || sqlType == OraOopOracleQueries.getOracleType("NCHAR")
+ || sqlType == OraOopOracleQueries.getOracleType("NVARCHAR")
+ || sqlType == OraOopOracleQueries.getOracleType("ROWID")
+ || sqlType == OraOopOracleQueries.getOracleType("INTERVALYM")
+ || sqlType == OraOopOracleQueries.getOracleType("INTERVALDS")
+ || sqlType == OraOopOracleQueries.getOracleType("OTHER")) {
+ javaType = "String";
+ }
+
+ }
+
+ if (javaType == null) {
+ LOG.warn(String.format("%s should be updated to cater for data-type: %d",
+ OraOopUtilities.getCurrentMethodName(), sqlType));
+ }
+
+ return javaType;
+ }
+
+ @Override
+ public String timestampToQueryString(Timestamp ts) {
+
+ return "TO_TIMESTAMP('" + ts + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
+ }
+
+ public OracleTable getOracleTableContext() {
+
+ return OraOopUtilities.decodeOracleTableName(this.options.getUsername(),
+ this.options.getTableName(), this.options.getConf());
+ }
+
+ @Override
+ public Map<String, Integer> getColumnTypes(String tableName) {
+
+ if (this.columnTypesInOracleTable == null) {
+
+ Map<String, Integer> columnTypes = super.getColumnTypes(tableName);
+ this.columnTypesInOracleTable = new HashMap<String, Integer>();
+
+ List<String> colNames = getColumnNamesInOracleTable(tableName);
+
+ for (int idx = 0; idx < colNames.size(); idx++) {
+
+ String columnNameInTable = colNames.get(idx);
+ if (columnTypes.containsKey(columnNameInTable)) {
+
+ // Unescape the column names being returned...
+ int colType = columnTypes.get(columnNameInTable);
+ String key = unescapeOracleColumnName(columnNameInTable); // <- See
+ // notes at
+ // top about
+ // escaped
+ // column
+ // names
+ this.columnTypesInOracleTable.put(key, colType);
+ }
+ }
+ }
+
+ return this.columnTypesInOracleTable;
+ }
+
+ private boolean isEscaped(String name) {
+
+ return name.startsWith("\"") && name.endsWith("\"");
+ }
+
+ private String escapeOracleColumnName(String columnName) {
+ // See notes at top about escaped column names
+ if (isEscaped(columnName)) {
+ return columnName;
+ } else {
+ return "\"" + columnName + "\"";
+ }
+ }
+
+ @Override
+ public String escapeColName(String colName) {
+
+ return super.escapeColName(colName); // <- See notes at top about escaped
+ // column names
+ }
+
+ private String unescapeOracleColumnName(String columnName) {
+
+ if (isEscaped(columnName)) {
+ return columnName.substring(1, columnName.length() - 1);
+ } else {
+ return columnName;
+ }
+ }
+
+ private void logImportTableDetails(ImportJobContext context) {
+
+ Path outputDirectory = context.getDestination();
+ if (outputDirectory != null) {
+ LOG.debug("The output directory for the sqoop table import is : "
+ + outputDirectory.getName());
+ }
+
+ // Indicate whether we can load the class named: OraOopOraStats
+ showUserWhetherOraOopOraStatsIsAvailable(context.getOptions().getConf());
+ }
+
+ private void logExportTableDetails(ExportJobContext context) {
+
+ // Indicate whether we can load the class named: OraOopOraStats
+ showUserWhetherOraOopOraStatsIsAvailable(context.getOptions().getConf());
+
+ // Indicate what the update/merge columns are...
+ String[] updateKeyColumns =
+ OraOopUtilities.getExportUpdateKeyColumnNames(context.getOptions());
+ if (updateKeyColumns.length > 0) {
+ LOG.info(String.format(
+ "The column%s used to match rows in the HDFS file with rows in "
+ + "the Oracle table %s: %s", updateKeyColumns.length > 1 ? "s"
+ : "", updateKeyColumns.length > 1 ? "are" : "is", OraOopUtilities
+ .stringArrayToCSV(updateKeyColumns)));
+ }
+ }
+
+ private void showUserWhetherOraOopOraStatsIsAvailable(Configuration conf) {
+
+ if (OraOopUtilities.userWantsOracleSessionStatisticsReports(conf)) {
+
+ LOG.info(String.format("%s=true",
+ OraOopConstants.ORAOOP_REPORT_SESSION_STATISTICS));
+
+ // This will log a warning if it's unable to load the OraOopOraStats
+ // class...
+ OraOopUtilities.startSessionSnapshot(null);
+ }
+ }
+
+ @Override
+ protected String getCurTimestampQuery() {
+
+ return "SELECT SYSTIMESTAMP FROM DUAL";
+ }
+
+ @Override
+ protected void checkTableImportOptions(ImportJobContext context)
+ throws IOException, ImportException {
+
+ // Update the unit-test code if you modify this method.
+ super.checkTableImportOptions(context);
+ }
+
+ private void explainWhyExportClassCannotBeLoaded(NoClassDefFoundError ex,
+ String exportClassName) {
+
+ String msg =
+ String.format("Unable to load class %s.\n"
+ + "This is most likely caused by the Cloudera Shim Jar "
+ + "not being included in the Java Classpath.\n" + "Either:\n"
+ + "\tUse \"-libjars\" on the Sqoop command-line to "
+ + "include the Cloudera shim jar in the Java Classpath; or"
+ + "\n\tCopy the Cloudera shim jar into the Sqoop/lib "
+ + "directory so that it is automatically included in the "
+ + "Java Classpath; or\n"
+ + "\tObtain an updated version of Sqoop that addresses "
+ + "the Sqoop Jira \"SQOOP-127\".\n" + "\n"
+ + "The Java Classpath is:\n%s", exportClassName, OraOopUtilities
+ .getJavaClassPath());
+ LOG.fatal(msg, ex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OraOopConstants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopConstants.java b/src/java/org/apache/sqoop/manager/oracle/OraOopConstants.java
new file mode 100644
index 0000000..874ef02
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopConstants.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+/**
+ * Constants for OraOop.
+ */
+public final class OraOopConstants {
+ private OraOopConstants() {
+ }
+
+ public static final String ORAOOP_PRODUCT_NAME =
+ "Data Connector for Oracle and Hadoop";
+ public static final String ORAOOP_JAR_FILENAME = "oraoop.jar";
+
+ // Disables OraOop - falling back to the OracleManager inside Sqoop...
+ public static final String ORAOOP_DISABLED = "oraoop.disabled";
+
+ // Whether to log Oracle session statistics using Guy Harrison's jar file...
+ public static final String ORAOOP_REPORT_SESSION_STATISTICS =
+ "oraoop.report.session.statistics";
+
+ // Disables dynamic JDBC URL generation for each mapper...
+ public static final String ORAOOP_JDBC_URL_VERBATIM =
+ "oraoop.jdbc.url.verbatim";
+
+ // The name of the Oracle RAC service each mapper should connect to, via their
+ // dynamically generated JDBC URL...
+ public static final String ORAOOP_ORACLE_RAC_SERVICE_NAME =
+ "oraoop.oracle.rac.service.name";
+
+ // The log4j log-level for OraOop...
+ public static final String ORAOOP_LOGGING_LEVEL = "oraoop.logging.level";
+
+ // The file names for the configuration properties of OraOop...
+ public static final String ORAOOP_SITE_TEMPLATE_FILENAME =
+ "oraoop-site-template.xml";
+ public static final String ORAOOP_SITE_FILENAME = "oraoop-site.xml";
+
+ // A flag that indicates that the OraOop job has been cancelled.
+ // E.g. An Oracle DBA killed our Oracle session.
+ // public static final String ORAOOP_JOB_CANCELLED = "oraoop.job.cancelled";
+
+ // The SYSDATE from the Oracle database when this OraOop job was started.
+ // This is used to generate unique names for partitions and temporary tables
+ // that we create during the job...
+ public static final String ORAOOP_JOB_SYSDATE = "oraoop.job.sysdate";
+
+ // The properties are used internally by OraOop to indicate the schema and
+ // name of
+ // the table being imported/exported...
+ public static final String ORAOOP_TABLE_OWNER = "oraoop.table.owner";
+ public static final String ORAOOP_TABLE_NAME = "oraoop.table.name";
+
+ // Constants used to indicate the desired location of the WHERE clause within
+ // the SQL generated by the record-reader.
+ // E.g. A WHERE clause like "rownum <= 10" would want to be located so that
+ // it had an impact on the total number of rows returned by the split;
+ // as opposed to impacting the number of rows returned for each of the
+ // unioned data-chunks within each split.
+ public static final String ORAOOP_TABLE_IMPORT_WHERE_CLAUSE_LOCATION =
+ "oraoop.table.import.where.clause.location";
+
+ /**
+ * Location to place the WHERE clause.
+ */
+ public enum OraOopTableImportWhereClauseLocation {
+ SUBSPLIT, SPLIT
+ }
+
+ // The SQL statements to execute for each new Oracle session that is
+ // created...
+ public static final String ORAOOP_SESSION_INITIALIZATION_STATEMENTS =
+ "oraoop.oracle.session.initialization.statements";
+
+ // Reliably stores the number mappers requested for the sqoop map-reduce
+ // job...
+ public static final String ORAOOP_DESIRED_NUMBER_OF_MAPPERS =
+ "oraoop.desired.num.mappers";
+
+ // The minimum number of mappers required for OraOop to accept the import
+ // job...
+ public static final String ORAOOP_MIN_IMPORT_MAPPERS =
+ "oraoop.min.import.mappers";
+ public static final int MIN_NUM_IMPORT_MAPPERS_ACCEPTED_BY_ORAOOP = 2;
+
+ // The minimum number of mappers required for OraOop to accept the export
+ // job...
+ public static final String ORAOOP_MIN_EXPORT_MAPPERS =
+ "oraoop.min.export.mappers";
+ public static final int MIN_NUM_EXPORT_MAPPERS_ACCEPTED_BY_ORAOOP = 2;
+
+ // The query used to fetch oracle data chunks...
+ public static final String ORAOOP_ORACLE_DATA_CHUNKS_QUERY =
+ "oraoop.oracle.data.chunks.query";
+
+ // The minimum number of active instances in an Oracle RAC required for OraOop
+ // to use dynamically generated JDBC URLs...
+ public static final String ORAOOP_MIN_RAC_ACTIVE_INSTANCES =
+ "oraoop.min.rac.active.instances";
+ public static final int MIN_NUM_RAC_ACTIVE_INSTANCES_FOR_DYNAMIC_JDBC_URLS =
+ 2;
+
+ // The name of the Oracle JDBC class...
+ public static final String ORACLE_JDBC_DRIVER_CLASS =
+ "oracle.jdbc.OracleDriver";
+
+ // How many rows to pre-fetch when executing Oracle queries...
+ public static final String ORACLE_ROW_FETCH_SIZE = "oracle.row.fetch.size";
+ public static final int ORACLE_ROW_FETCH_SIZE_DEFAULT = 5000;
+
+ // OraOop does not require a "--split-by" column to be defined...
+ public static final String TABLE_SPLIT_COLUMN_NOT_REQUIRED = "not-required";
+
+ // The name of the data_chunk_id column the OraOop appends to each (import)
+ // query...
+ public static final String COLUMN_NAME_DATA_CHUNK_ID = "data_chunk_id";
+
+ // The hint that will be used on the SELECT statement for import jobs
+ public static final String IMPORT_QUERY_HINT = "oraoop.import.hint";
+
+ // Pseudo-columns added to an partitioned export table (created by OraOop from
+ // a template table)
+ // to store the partition value and subpartition value. The partition value is
+ // the sysdate when
+ // the job was performed. The subpartition value is the mapper index...
+ public static final String COLUMN_NAME_EXPORT_PARTITION =
+ "ORAOOP_EXPORT_SYSDATE";
+ public static final String COLUMN_NAME_EXPORT_SUBPARTITION =
+ "ORAOOP_MAPPER_ID";
+ public static final String COLUMN_NAME_EXPORT_MAPPER_ROW =
+ "ORAOOP_MAPPER_ROW";
+
+ public static final String ORAOOP_EXPORT_PARTITION_DATE_VALUE =
+ "oraoop.export.partition.date.value";
+ public static final String ORAOOP_EXPORT_PARTITION_DATE_FORMAT =
+ "yyyy-mm-dd hh24:mi:ss";
+
+ // The string we want to pass to dbms_application_info.set_module() via the
+ // "module_name" parameter...
+ public static final String ORACLE_SESSION_MODULE_NAME = ORAOOP_PRODUCT_NAME;
+
+ // The name of the configuration property containing the string we want to
+ // pass to
+ // dbms_application_info.set_module() via the "action_name" parameter...
+ public static final String ORACLE_SESSION_ACTION_NAME =
+ "oraoop.oracle.session.module.action";
+
+ // Boolean whether to do a consistent read based off an SCN
+ public static final String ORAOOP_IMPORT_CONSISTENT_READ =
+ "oraoop.import.consistent.read";
+
+ // The SCN number to use for the consistent read - calculated automatically -
+ // cannot be overridden
+ public static final String ORAOOP_IMPORT_CONSISTENT_READ_SCN =
+ "oraoop.import.consistent.read.scn";
+
+ // The method that will be used to create data chunks - ROWID ranges or
+ // partitions
+ public static final String ORAOOP_ORACLE_DATA_CHUNK_METHOD =
+ "oraoop.chunk.method";
+
+ /**
+ * How should data be split up - by ROWID range, or by partition.
+ */
+ public enum OraOopOracleDataChunkMethod {
+ ROWID, PARTITION
+ }
+
+ // List of partitions to be imported, comma seperated list
+ public static final String ORAOOP_IMPORT_PARTITION_LIST =
+ "oraoop.import.partitions";
+
+ public static final OraOopOracleDataChunkMethod
+ ORAOOP_ORACLE_DATA_CHUNK_METHOD_DEFAULT =
+ OraOopOracleDataChunkMethod.ROWID;
+
+ // How to allocate data-chunks into splits...
+ public static final String ORAOOP_ORACLE_BLOCK_TO_SPLIT_ALLOCATION_METHOD =
+ "oraoop.block.allocation";
+
+ /**
+ * How splits should be allocated to the mappers.
+ */
+ public enum OraOopOracleBlockToSplitAllocationMethod {
+ ROUNDROBIN, SEQUENTIAL, RANDOM
+ }
+
+ // Whether to omit LOB and LONG columns during an import...
+ public static final String ORAOOP_IMPORT_OMIT_LOBS_AND_LONG =
+ "oraoop.import.omit.lobs.and.long";
+
+ // Identifies an existing Oracle table used to create a new table as the
+ // destination of a Sqoop export.
+ // Hence, use of this property implies that the "-table" does not exist in
+ // Oracle and OraOop should create it.
+ public static final String ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE =
+ "oraoop.template.table";
+
+ // If the table already exists that we want to create, should we drop it?...
+ public static final String ORAOOP_EXPORT_CREATE_TABLE_DROP =
+ "oraoop.drop.table";
+
+ // If ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE has been specified, then this flag
+ // indicates whether the created Oracle
+ // tables should have NOLOGGING...
+ public static final String ORAOOP_EXPORT_CREATE_TABLE_NO_LOGGING =
+ "oraoop.no.logging";
+
+ // If ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE has been specified, then this flag
+ // indicates whether the created Oracle
+ // tables should be partitioned by job and mapper...
+ public static final String ORAOOP_EXPORT_CREATE_TABLE_PARTITIONED =
+ "oraoop.partitioned";
+
+ // Indicates (internally) the the export table we're dealling with has been
+ // paritioned by OraOop...
+ public static final String EXPORT_TABLE_HAS_ORAOOP_PARTITIONS =
+ "oraoop.export.table.has.oraoop.partitions";
+
+ // When using the Oracle hint... /* +APPEND_VALUES */ ...a commit must be
+ // performed after each batch insert.
+ // Therefore, the batches need to be quite large to avoid a performance
+ // penality (for the 'extra' commits).
+ // This is the minimum batch size to use under these conditions...
+ public static final String ORAOOP_MIN_APPEND_VALUES_BATCH_SIZE =
+ "oraoop.min.append.values.batch.size";
+ public static final int ORAOOP_MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT = 5000;
+
+ // The version of the Oracle database we're connected to...
+ public static final String ORAOOP_ORACLE_DATABASE_VERSION_MAJOR =
+ "oraoop.oracle.database.version.major";
+ public static final String ORAOOP_ORACLE_DATABASE_VERSION_MINOR =
+ "oraoop.oracle.database.version.minor";
+
+ // When OraOop creates a table for a Sqoop export (from a template table) and
+ // the table contains partitions,
+ // this is the prefix of those partition names. (This also allows us to later
+ // identify partitions that OraOop
+ // created.)
+ public static final String EXPORT_TABLE_PARTITION_NAME_PREFIX = "ORAOOP_";
+
+ // When OraOop creates temporary tables for each mapper during a Sqoop export
+ // this is the prefix of table names...
+ public static final String EXPORT_MAPPER_TABLE_NAME_PREFIX = "ORAOOP_";
+
+ // The format string used to turn a DATE into a string for use within the
+ // names of Oracle objects
+ // that we create. For example, temporary tables, table partitions, table
+ // subpartitions...
+ public static final String ORACLE_OBJECT_NAME_DATE_TO_STRING_FORMAT_STRING =
+ "yyyymmdd_hh24miss";
+
+ // Indicates whether to perform a "merge" operation when performing a Sqoop
+ // export.
+ // If false, 'insert' statements will be used (i.e. no 'updates')...
+ public static final String ORAOOP_EXPORT_MERGE = "oraoop.export.merge";
+
+ // This property allows the user to enable parallelization during exports...
+ public static final String ORAOOP_EXPORT_PARALLEL =
+ "oraoop.export.oracle.parallelization.enabled";
+
+ // Flag used to indicate that the Oracle table contains at least one column of
+ // type BINARY_DOUBLE...
+ public static final String TABLE_CONTAINS_BINARY_DOUBLE_COLUMN =
+ "oraoop.table.contains.binary.double.column";
+ // Flag used to indicate that the Oracle table contains at least one column of
+ // type BINARY_FLOAT...
+ public static final String TABLE_CONTAINS_BINARY_FLOAT_COLUMN =
+ "oraoop.table.contains.binary.float.column";
+
+ // The storage clause to append to the end of any CREATE TABLE statements we
+ // execute for temporary Oracle tables...
+ public static final String ORAOOP_TEMPORARY_TABLE_STORAGE_CLAUSE =
+ "oraoop.temporary.table.storage.clause";
+
+ // The storage clause to append to the end of any CREATE TABLE statements we
+ // execute for permanent (export) Oracle tables...
+ public static final String ORAOOP_EXPORT_TABLE_STORAGE_CLAUSE =
+ "oraoop.table.storage.clause";
+
+ // Additional columns to include with the --update-key column...
+ public static final String ORAOOP_UPDATE_KEY_EXTRA_COLUMNS =
+ "oraoop.update.key.extra.columns";
+
+ // Should OraOop map Timestamps as java.sql.Timestamp as Sqoop does, or as
+ // String
+ public static final String ORAOOP_MAP_TIMESTAMP_AS_STRING =
+ "oraoop.timestamp.string";
+ public static final boolean ORAOOP_MAP_TIMESTAMP_AS_STRING_DEFAULT = true;
+
+ // This flag allows the user to force use of the APPEND_VALUES Oracle hint
+ // either ON, OFF or AUTO...
+ public static final String ORAOOP_ORACLE_APPEND_VALUES_HINT_USAGE =
+ "oraoop.oracle.append.values.hint.usage";
+
+ /**
+ * Whether to use the append values hint for exports.
+ */
+ public enum AppendValuesHintUsage {
+ AUTO, ON, OFF
+ }
+
+ // http://download.oracle.com/docs/cd/E11882_01/server.112/e17118/
+ // sql_elements001.htm#i45441
+ public static final String SUPPORTED_IMPORT_ORACLE_DATA_TYPES_CLAUSE =
+ "(DATA_TYPE IN ("
+ +
+ // "'BFILE',"+
+ "'BINARY_DOUBLE',"
+ + "'BINARY_FLOAT',"
+ + "'BLOB',"
+ + "'CHAR',"
+ + "'CLOB',"
+ + "'DATE',"
+ + "'FLOAT',"
+ + "'LONG',"
+ +
+ // "'LONG RAW',"+
+ // "'MLSLABEL',"+
+ "'NCHAR',"
+ + "'NCLOB',"
+ + "'NUMBER',"
+ + "'NVARCHAR2',"
+ + "'RAW',"
+ + "'ROWID',"
+ +
+ // "'UNDEFINED',"+
+ "'URITYPE',"
+ +
+ // "'UROWID',"+ //<- SqlType = 1111 = "OTHER" Not supported as
+ // "AAAAACAADAAAAAEAAF" is being returned as "AAAAAAgADAAAA"
+ "'VARCHAR2'"
+ + // <- Columns declared as VARCHAR are listed as VARCHAR2 in
+ // dba_tabl_columns
+ // "'XMLTYPE',"+
+ ")" + " OR DATA_TYPE LIKE 'INTERVAL YEAR(%) TO MONTH'"
+ + " OR DATA_TYPE LIKE 'INTERVAL DAY(%) TO SECOND(%)'"
+ + " OR DATA_TYPE LIKE 'TIMESTAMP(%)'"
+ + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH TIME ZONE'"
+ + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH LOCAL TIME ZONE'" + ")";
+
+ public static final String SUPPORTED_EXPORT_ORACLE_DATA_TYPES_CLAUSE =
+ "(DATA_TYPE IN ("
+ +
+ // "'BFILE',"+
+ "'BINARY_DOUBLE',"
+ + "'BINARY_FLOAT',"
+ +
+ // "'BLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data
+ "'CHAR',"
+ +
+ // "'CLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data
+ "'DATE',"
+ + "'FLOAT',"
+ +
+ // "'LONG',"+ //<- "create table as select..." and
+ // "insert into table as select..." do not work when a long column
+ // exists.
+ // "'LONG RAW',"+
+ // "'MLSLABEL',"+
+ "'NCHAR',"
+ +
+ // "'NCLOB',"+ //<- Jira: SQOOP-117 Sqoop cannot export LOB data
+ "'NUMBER',"
+ + "'NVARCHAR2',"
+ +
+ // "'RAW',"+
+ "'ROWID',"
+ +
+ // "'UNDEFINED',"+
+ "'URITYPE',"
+ +
+ // "'UROWID',"+ //<- SqlType = 1111 = "OTHER" Not supported as
+ // "AAAAACAADAAAAAEAAF" is being returned as "AAAAAAgADAAAA"
+ "'VARCHAR2'"
+ + // <- Columns declared as VARCHAR are listed as VARCHAR2 in
+ // dba_tabl_columns
+ // "'XMLTYPE',"+
+ ")" + " OR DATA_TYPE LIKE 'INTERVAL YEAR(%) TO MONTH'"
+ + " OR DATA_TYPE LIKE 'INTERVAL DAY(%) TO SECOND(%)'"
+ + " OR DATA_TYPE LIKE 'TIMESTAMP(%)'"
+ + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH TIME ZONE'"
+ + " OR DATA_TYPE LIKE 'TIMESTAMP(%) WITH LOCAL TIME ZONE'" + ")";
+
+ // public static final int[] SUPPORTED_ORACLE_DATA_TYPES = {
+ // oracle.jdbc.OracleTypes.BIT // -7;
+ // ,oracle.jdbc.OracleTypes.TINYINT // -6;
+ // ,oracle.jdbc.OracleTypes.SMALLINT // 5;
+ // ,oracle.jdbc.OracleTypes.INTEGER // 4;
+ // ,oracle.jdbc.OracleTypes.BIGINT // -5;
+ // ,oracle.jdbc.OracleTypes.FLOAT // 6;
+ // ,oracle.jdbc.OracleTypes.REAL // 7;
+ // ,oracle.jdbc.OracleTypes.DOUBLE // 8;
+ // ,oracle.jdbc.OracleTypes.NUMERIC // 2;
+ // ,oracle.jdbc.OracleTypes.DECIMAL // 3;
+ // ,oracle.jdbc.OracleTypes.CHAR // 1;
+ // ,oracle.jdbc.OracleTypes.VARCHAR // 12;
+ // ,oracle.jdbc.OracleTypes.LONGVARCHAR // -1;
+ // ,oracle.jdbc.OracleTypes.DATE // 91;
+ // ,oracle.jdbc.OracleTypes.TIME // 92;
+ // ,oracle.jdbc.OracleTypes.TIMESTAMP // 93;
+ // // ,oracle.jdbc.OracleTypes.TIMESTAMPNS // -100; //<- Deprecated
+ // ,oracle.jdbc.OracleTypes.TIMESTAMPTZ // -101;
+ // ,oracle.jdbc.OracleTypes.TIMESTAMPLTZ // -102;
+ // ,oracle.jdbc.OracleTypes.INTERVALYM // -103;
+ // ,oracle.jdbc.OracleTypes.INTERVALDS // -104;
+ // ,oracle.jdbc.OracleTypes.BINARY // -2;
+ // /// ,oracle.jdbc.OracleTypes.VARBINARY // -3;
+ // ,oracle.jdbc.OracleTypes.LONGVARBINARY // -4;
+ // ,oracle.jdbc.OracleTypes.ROWID // -8;
+ // ,oracle.jdbc.OracleTypes.CURSOR // -10;
+ // ,oracle.jdbc.OracleTypes.BLOB // 2004;
+ // ,oracle.jdbc.OracleTypes.CLOB // 2005;
+ // // ,oracle.jdbc.OracleTypes.BFILE // -13;
+ // // ,oracle.jdbc.OracleTypes.STRUCT // 2002;
+ // // ,oracle.jdbc.OracleTypes.ARRAY // 2003;
+ // ,oracle.jdbc.OracleTypes.REF // 2006;
+ // ,oracle.jdbc.OracleTypes.NCHAR // -15;
+ // ,oracle.jdbc.OracleTypes.NCLOB // 2011;
+ // ,oracle.jdbc.OracleTypes.NVARCHAR // -9;
+ // ,oracle.jdbc.OracleTypes.LONGNVARCHAR // -16;
+ // // ,oracle.jdbc.OracleTypes.SQLXML // 2009;
+ // // ,oracle.jdbc.OracleTypes.OPAQUE // 2007;
+ // // ,oracle.jdbc.OracleTypes.JAVA_STRUCT // 2008;
+ // // ,oracle.jdbc.OracleTypes.JAVA_OBJECT // 2000;
+ // // ,oracle.jdbc.OracleTypes.PLSQL_INDEX_TABLE // -14;
+ // ,oracle.jdbc.OracleTypes.BINARY_FLOAT // 100;
+ // ,oracle.jdbc.OracleTypes.BINARY_DOUBLE // 101;
+ // ,oracle.jdbc.OracleTypes.NULL // 0;
+ // ,oracle.jdbc.OracleTypes.NUMBER // 2;
+ // // ,oracle.jdbc.OracleTypes.RAW // -2;
+ // // ,oracle.jdbc.OracleTypes.OTHER // 1111;
+ // ,oracle.jdbc.OracleTypes.FIXED_CHAR // 999;
+ // // ,oracle.jdbc.OracleTypes.DATALINK // 70;
+ // ,oracle.jdbc.OracleTypes.BOOLEAN // 16;
+ // };
+
+ /**
+ * Constants for things belonging to sqoop...
+ */
+ public static final class Sqoop {
+ private Sqoop() {
+ }
+
+ /**
+ * What type of Sqoop tool is being run.
+ */
+ public enum Tool {
+ UNKNOWN, IMPORT, EXPORT
+ }
+
+ public static final String IMPORT_TOOL_NAME = "import";
+ public static final String MAX_MAPREDUCE_ATTEMPTS =
+ "mapred.map.max.attempts";
+ }
+
+/**
+ * Constants for things belonging to Oracle...
+ */
+ public static final class Oracle {
+ private Oracle() {
+ }
+
+ public static final int ROWID_EXTENDED_ROWID_TYPE = 1;
+ public static final int ROWID_MAX_ROW_NUMBER_PER_BLOCK = 32767;
+
+ // This is how you comment-out a line of SQL text in Oracle.
+ public static final String ORACLE_SQL_STATEMENT_COMMENT_TOKEN = "--";
+
+ public static final String OBJECT_TYPE_TABLE = "TABLE";
+
+ public static final String URITYPE = "URITYPE";
+
+ public static final int MAX_IDENTIFIER_LENGTH = 30; // <- Max length of an
+ // Oracle name
+ // (table-name,
+ // partition-name etc.)
+
+ public static final String HINT_SYNTAX = "/*+ %s */ "; // Syntax for a hint
+ // in Oracle
+ }
+
+ /**
+ * Logging constants.
+ */
+ public static class Logging {
+ /**
+ * Level of log to output.
+ */
+ public enum Level {
+ TRACE, DEBUG, INFO, WARN, ERROR, FATAL
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OraOopDBInputSplit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopDBInputSplit.java b/src/java/org/apache/sqoop/manager/oracle/OraOopDBInputSplit.java
new file mode 100644
index 0000000..93efa76
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopDBInputSplit.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.Text;
+import com.cloudera.sqoop.mapreduce.db.DBInputFormat;
+
+class OraOopDBInputSplit extends DBInputFormat.DBInputSplit {
+
+ private int splitId;
+ private double totalNumberOfBlocksInAllSplits;
+ private String splitLocation;
+ private List<OraOopOracleDataChunk> oracleDataChunks;
+
+ // NB: Update write(), readFields() and getDebugDetails() if you add fields
+ // here.
+
+ public OraOopDBInputSplit() {
+
+ this.splitId = -1;
+ this.splitLocation = "";
+ this.oracleDataChunks = new ArrayList<OraOopOracleDataChunk>();
+ }
+
+ public OraOopDBInputSplit(List<OraOopOracleDataChunk> dataChunks) {
+
+ setOracleDataChunks(dataChunks);
+ }
+
+ public void setOracleDataChunks(List<OraOopOracleDataChunk> dataChunks) {
+
+ this.oracleDataChunks = dataChunks;
+ }
+
+ public List<OraOopOracleDataChunk> getDataChunks() {
+
+ return this.oracleDataChunks;
+ }
+
+ public int getNumberOfDataChunks() {
+
+ if (this.getDataChunks() == null) {
+ return 0;
+ } else {
+ return this.getDataChunks().size();
+ }
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+
+ if (this.splitLocation.isEmpty()) {
+ return new String[] {};
+ } else {
+ return new String[] { this.splitLocation };
+ }
+
+ }
+
+ /**
+ * @return The total number of blocks within the data-chunks of this split
+ */
+ @Override
+ public long getLength() {
+
+ return this.getTotalNumberOfBlocksInThisSplit();
+ }
+
+ public int getTotalNumberOfBlocksInThisSplit() {
+
+ if (this.getNumberOfDataChunks() == 0) {
+ return 0;
+ }
+
+ int result = 0;
+ for (OraOopOracleDataChunk dataChunk : this.getDataChunks()) {
+ result += dataChunk.getNumberOfBlocks();
+ }
+
+ return result;
+ }
+
+ public OraOopOracleDataChunk findDataChunkById(String id) {
+
+ for (OraOopOracleDataChunk dataChunk : this.getDataChunks()) {
+ if (dataChunk.getId().equals(id)) {
+ return dataChunk;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public void write(DataOutput output) throws IOException {
+
+ output.writeInt(splitId);
+
+ if (this.oracleDataChunks == null) {
+ output.writeInt(0);
+ } else {
+ output.writeInt(this.oracleDataChunks.size());
+ for (OraOopOracleDataChunk dataChunk : this.oracleDataChunks) {
+ Text.writeString(output, dataChunk.getClass().getName());
+ dataChunk.write(output);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ /** {@inheritDoc} */
+ public void readFields(DataInput input) throws IOException {
+
+ this.splitId = input.readInt();
+
+ int dataChunkCount = input.readInt();
+ if (dataChunkCount == 0) {
+ this.oracleDataChunks = null;
+ } else {
+ Class<? extends OraOopOracleDataChunk> dataChunkClass;
+ OraOopOracleDataChunk dataChunk;
+ this.oracleDataChunks =
+ new ArrayList<OraOopOracleDataChunk>(dataChunkCount);
+ for (int idx = 0; idx < dataChunkCount; idx++) {
+ try {
+ dataChunkClass =
+ (Class<? extends OraOopOracleDataChunk>) Class.forName(Text
+ .readString(input));
+ dataChunk = dataChunkClass.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ dataChunk.readFields(input);
+ this.oracleDataChunks.add(dataChunk);
+ }
+ }
+ }
+
+ public String getDebugDetails() {
+
+ StringBuilder result = new StringBuilder();
+
+ if (this.getNumberOfDataChunks() == 0) {
+ result.append(String.format(
+ "Split[%s] does not contain any Oracle data-chunks.", this.splitId));
+ } else {
+ result.append(String.format(
+ "Split[%s] includes the Oracle data-chunks:\n", this.splitId));
+ for (OraOopOracleDataChunk dataChunk : getDataChunks()) {
+ result.append(dataChunk.toString());
+ }
+ }
+ return result.toString();
+ }
+
+ protected int getSplitId() {
+ return this.splitId;
+ }
+
+ protected void setSplitId(int newSplitId) {
+ this.splitId = newSplitId;
+ }
+
+ protected void setSplitLocation(String newSplitLocation) {
+ this.splitLocation = newSplitLocation;
+ }
+
+ protected void setTotalNumberOfBlocksInAllSplits(
+ int newTotalNumberOfBlocksInAllSplits) {
+ this.totalNumberOfBlocksInAllSplits = newTotalNumberOfBlocksInAllSplits;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OraOopDBRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopDBRecordReader.java b/src/java/org/apache/sqoop/manager/oracle/OraOopDBRecordReader.java
new file mode 100644
index 0000000..45a88ef
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopDBRecordReader.java
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBInputFormat;
+import com.cloudera.sqoop.mapreduce.db.DBInputFormat.DBInputSplit;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBRecordReader;
+import org.apache.sqoop.manager.oracle.OraOopConstants.
+ OraOopTableImportWhereClauseLocation;
+import org.apache.sqoop.manager.oracle.OraOopUtilities.OraOopStatsReports;
+
+/*
+ * NOTES:
+ *
+ * T is the output-type of this record reader.
+ *
+ * getFieldNames() is overridden to insert an "data_chunk_id" column
+ * containing the id (integer) of the Oracle data-chunk the data
+ * was obtained from. This is used to calculate the "percentage complete"
+ * for this mapper.
+ *
+ * getSelectQuery() is overridden to inject the actual data_chunk_id number
+ * into the query that is executed (for the data-chunk being processed).
+ *
+ * This class extends DBRecordReader. Unfortunately, DBRecordReader does
+ * not expose its results property (of type ResultSet), so we have to
+ * override executeQuery() in order to obtain a reference to the data
+ * obtained when the SQL generated by getSelectQuery() is executed.
+ */
+class OraOopDBRecordReader<T extends SqoopRecord> extends
+ DataDrivenDBRecordReader<T> {
+
+ private static final OraOopLog LOG = OraOopLogFactory
+ .getLog(OraOopDBRecordReader.class);
+
+ private OraOopDBInputSplit dbInputSplit; // <- The split this record-reader is
+ // working on.
+ private int numberOfBlocksInThisSplit; // <- The number of Oracle blocks in
+ // this Oracle data-chunk.
+ private int numberOfBlocksProcessedInThisSplit; // <- How many Oracle blocks
+ // we've processed with this
+ // record-reader.
+ private String currentDataChunkId; // <- The id of the current data-chunk
+ // being processed
+ private ResultSet results; // <- The ResultSet containing the data from the
+ // query returned by getSelectQuery()
+ private int columnIndexDataChunkIdZeroBased = -1; // <- The zero-based column
+ // index of the
+ // data_chunk_id column.
+ private boolean progressCalculationErrorLogged; // <- Whether we've logged a
+ // problem with the progress
+ // calculation during
+ // nextKeyValue().
+ private Object oraOopOraStats; // <- A reference to the Oracle statistics
+ // object that is being tracked for this Oracle
+ // session.
+ private boolean profilingEnabled; // <- Whether to collect profiling metrics
+ private long timeSpentInNextKeyValueInNanoSeconds; // <- Total time spent in
+ // super.nextKeyValue()
+
+ public OraOopDBRecordReader(DBInputFormat.DBInputSplit split,
+ Class<T> inputClass, Configuration conf, Connection conn,
+ DBConfiguration dbConfig, String cond, String[] fields, String table)
+ throws SQLException {
+
+ super(split, inputClass, conf, conn, dbConfig, cond, fields, table,
+ "ORACLE-ORAOOP");
+
+ OraOopUtilities.enableDebugLoggingIfRequired(conf);
+
+ this.dbInputSplit = castSplit(split);
+
+ String thisOracleInstanceName =
+ OraOopOracleQueries.getCurrentOracleInstanceName(conn);
+ LOG.info(String.format(
+ "This record reader is connected to Oracle via the JDBC URL: \n"
+ + "\t\"%s\"\n" + "\tto the Oracle instance: \"%s\"", conn
+ .toString(), thisOracleInstanceName));
+
+ OracleConnectionFactory.initializeOracleConnection(conn, conf);
+
+ if (OraOopUtilities.userWantsOracleSessionStatisticsReports(conf)) {
+ this.oraOopOraStats = OraOopUtilities.startSessionSnapshot(conn);
+ }
+
+ this.numberOfBlocksInThisSplit =
+ this.dbInputSplit.getTotalNumberOfBlocksInThisSplit();
+ this.numberOfBlocksProcessedInThisSplit = 0;
+
+ this.profilingEnabled = conf.getBoolean("oraoop.profiling.enabled", false);
+ }
+
+ public static OraOopDBInputSplit castSplit(DBInputSplit split) {
+
+ // Check there's a split available...
+ if (split == null) {
+ throw new IllegalArgumentException("The DBInputSplit cannot be null.");
+ }
+
+ // Check that the split is the correct type...
+ Class<?> desiredSplitClass = OraOopDBInputSplit.class;
+ if (!(split.getClass() == desiredSplitClass)) {
+ String errMsg =
+ String.format("The type of Split available within %s "
+ + "should be an instance of class %s, "
+ + "but is actually an instance of class %s", OraOopUtilities
+ .getCurrentMethodName(), desiredSplitClass.getName(), split
+ .getClass().getName());
+ throw new RuntimeException(errMsg);
+ }
+
+ // TODO Cast this using desiredSplitClass, so we only need 1 line of code
+ // that
+ // identifies the type of the split class...
+ // inputSplit = (desiredSplitClass)this.getSplit();
+ return (OraOopDBInputSplit) split;
+ }
+
+ @Override
+ protected String[] getFieldNames() {
+
+ String[] fieldNames = super.getFieldNames();
+ ArrayList<String> result = new ArrayList<String>();
+
+ for (int idx = 0; idx < fieldNames.length; idx++) {
+ result.add(fieldNames[idx]);
+ }
+
+ result.add(OraOopConstants.COLUMN_NAME_DATA_CHUNK_ID);
+ this.columnIndexDataChunkIdZeroBased = result.size() - 1;
+
+ return result.toArray(new String[result.size()]);
+ }
+
+ @Override
+ protected String getSelectQuery() {
+
+ boolean consistentRead =
+ this.getDBConf().getConf().getBoolean(
+ OraOopConstants.ORAOOP_IMPORT_CONSISTENT_READ, false);
+ long consistentReadScn =
+ this.getDBConf().getConf().getLong(
+ OraOopConstants.ORAOOP_IMPORT_CONSISTENT_READ_SCN, 0L);
+ if (consistentRead && consistentReadScn == 0L) {
+ throw new RuntimeException("Could not get SCN for consistent read.");
+ }
+
+ StringBuilder query = new StringBuilder();
+
+ if (this.dbInputSplit.getDataChunks() == null) {
+ String errMsg =
+ String.format("The %s does not contain any data-chunks, within %s.",
+ this.dbInputSplit.getClass().getName(), OraOopUtilities
+ .getCurrentMethodName());
+ throw new RuntimeException(errMsg);
+ }
+
+ OraOopConstants.OraOopTableImportWhereClauseLocation whereClauseLocation =
+ OraOopUtilities.getOraOopTableImportWhereClauseLocation(this
+ .getDBConf().getConf(),
+ OraOopConstants.OraOopTableImportWhereClauseLocation.SUBSPLIT);
+
+ OracleTable tableContext = getOracleTableContext();
+ OracleTableColumns tableColumns = null;
+ try {
+
+ Configuration conf = this.getDBConf().getConf();
+
+ tableColumns =
+ OraOopOracleQueries.getTableColumns(getConnection(), tableContext,
+ OraOopUtilities.omitLobAndLongColumnsDuringImport(conf),
+ OraOopUtilities.recallSqoopJobType(conf)
+ , true // <- onlyOraOopSupportedTypes
+ , true // <- omitOraOopPseudoColumns
+ );
+ } catch (SQLException ex) {
+ LOG.error(String.format(
+ "Unable to obtain the data-types of the columns in table %s.\n"
+ + "Error:\n%s", tableContext.toString(), ex.getMessage()));
+ throw new RuntimeException(ex);
+ }
+
+ int numberOfDataChunks = this.dbInputSplit.getNumberOfDataChunks();
+ for (int idx = 0; idx < numberOfDataChunks; idx++) {
+
+ OraOopOracleDataChunk dataChunk =
+ this.dbInputSplit.getDataChunks().get(idx);
+
+ if (idx > 0) {
+ query.append("UNION ALL \n");
+ }
+
+ query.append(getColumnNamesClause(tableColumns, dataChunk.getId())) // <-
+ // SELECT
+ // clause
+ .append("\n");
+
+ query.append(" FROM ").append(this.getTableName()).append(" ");
+
+ if (consistentRead) {
+ query.append("AS OF SCN ").append(consistentReadScn).append(" ");
+ }
+
+ query.append(getPartitionClauseForDataChunk(this.dbInputSplit, idx))
+ .append(" t").append("\n");
+
+ query.append(" WHERE (").append(
+ getWhereClauseForDataChunk(this.dbInputSplit, idx)).append(")\n");
+
+ // If the user wants the WHERE clause applied to each data-chunk...
+ if (whereClauseLocation
+ == OraOopTableImportWhereClauseLocation.SUBSPLIT) {
+ String conditions = this.getConditions();
+ if (conditions != null && conditions.length() > 0) {
+ query.append(" AND (").append(conditions).append(")\n");
+ }
+ }
+
+ }
+
+ // If the user wants the WHERE clause applied to the whole split...
+ if (whereClauseLocation == OraOopTableImportWhereClauseLocation.SPLIT) {
+ String conditions = this.getConditions();
+ if (conditions != null && conditions.length() > 0) {
+
+ // Insert a "select everything" line at the start of the SQL query...
+ query.insert(0, getColumnNamesClause(tableColumns, null) + " FROM (\n");
+
+ // ...and then apply the WHERE clause to all the UNIONed sub-queries...
+ query.append(")\n").append("WHERE\n").append(conditions).append("\n");
+ }
+ }
+
+ LOG.info("SELECT QUERY = \n" + query.toString());
+
+ return query.toString();
+ }
+
+ private String getColumnNamesClause(OracleTableColumns tableColumns,
+ String dataChunkId) {
+
+ StringBuilder result = new StringBuilder();
+
+ result.append("SELECT ");
+ result.append(OraOopUtilities.getImportHint(this.getDBConf().getConf()));
+
+ String[] fieldNames = this.getFieldNames();
+
+ int firstFieldIndex = 0;
+ int lastFieldIndex = fieldNames.length - 1;
+ for (int i = firstFieldIndex; i <= lastFieldIndex; i++) {
+ if (i > firstFieldIndex) {
+ result.append(",");
+ }
+ String fieldName = fieldNames[i];
+
+ OracleTableColumn oracleTableColumn =
+ tableColumns.findColumnByName(fieldName);
+ if (oracleTableColumn != null) {
+ if (oracleTableColumn.getDataType().equals(
+ OraOopConstants.Oracle.URITYPE)) {
+ fieldName = String.format("uritype.geturl(%s)", fieldName);
+ }
+ }
+
+ // If this field is the "data_chunk_id" that we inserted during
+ // getFields()
+ // then we need to insert the value of that data_chunk_id now...
+ if (i == this.columnIndexDataChunkIdZeroBased
+ && fieldName == OraOopConstants.COLUMN_NAME_DATA_CHUNK_ID) {
+ if (dataChunkId != null && !dataChunkId.isEmpty()) {
+ fieldName =
+ String.format("'%s' %s", dataChunkId,
+ OraOopConstants.COLUMN_NAME_DATA_CHUNK_ID);
+ }
+ }
+
+ result.append(fieldName);
+ }
+ return result.toString();
+ }
+
+ private String getPartitionClauseForDataChunk(OraOopDBInputSplit split,
+ int dataChunkIndex) {
+ OraOopOracleDataChunk dataChunk = split.getDataChunks().get(dataChunkIndex);
+ return dataChunk.getPartitionClause();
+ }
+
+ private String getWhereClauseForDataChunk(OraOopDBInputSplit split,
+ int dataChunkIndex) {
+
+ OraOopOracleDataChunk dataChunk = split.getDataChunks().get(dataChunkIndex);
+ return dataChunk.getWhereClause();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getPos() throws IOException {
+
+ // This split contains multiple data-chunks.
+ // Each data-chunk contains multiple blocks.
+ // Return the number of blocks that have been processed by this split...
+ return numberOfBlocksProcessedInThisSplit;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public float getProgress() throws IOException {
+
+ return numberOfBlocksProcessedInThisSplit
+ / (float) numberOfBlocksInThisSplit;
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+
+ boolean result = false;
+ try {
+
+ long startTime = 0;
+ if (this.profilingEnabled) {
+ startTime = System.nanoTime();
+ }
+
+ result = super.nextKeyValue();
+
+ if (this.profilingEnabled) {
+ this.timeSpentInNextKeyValueInNanoSeconds +=
+ System.nanoTime() - startTime;
+ }
+
+ // Keep track of which data-chunk we're processing, and therefore how many
+ // Oracle blocks we've processed. This can be used to calculate our
+ // "percentage complete"...
+ if (result && this.results != null) {
+
+ String thisDataChunkId = null;
+ try {
+ // ColumnIndexes are 1-based in jdbc...
+ thisDataChunkId =
+ this.results.getString(this.columnIndexDataChunkIdZeroBased + 1);
+ } catch (SQLException ex) {
+ if (!progressCalculationErrorLogged) {
+ // This prevents us from flooding the log with the same message
+ // thousands of times...
+ progressCalculationErrorLogged = true;
+
+ LOG.warn(String
+ .format(
+ "Unable to obtain the value of the %s column in method %s.\n"
+ + "\tthis.columnIndexDataChunkIdZeroBased = %d (NB: "
+ + "jdbc field indexes are 1-based)\n\tAs a consequence, "
+ + "progress for the record-reader cannot be calculated.\n"
+ + "\tError=\n%s",
+ OraOopConstants.COLUMN_NAME_DATA_CHUNK_ID, OraOopUtilities
+ .getCurrentMethodName(),
+ this.columnIndexDataChunkIdZeroBased, ex.getMessage()));
+ }
+ }
+
+ if (thisDataChunkId != null
+ && !thisDataChunkId.equals(this.currentDataChunkId)) {
+ if (this.currentDataChunkId != null
+ && !this.currentDataChunkId.isEmpty()) {
+ OraOopOracleDataChunk dataChunk =
+ this.dbInputSplit.findDataChunkById(thisDataChunkId);
+ if (dataChunk != null) {
+ this.numberOfBlocksProcessedInThisSplit +=
+ dataChunk.getNumberOfBlocks();
+ }
+ }
+ this.currentDataChunkId = thisDataChunkId;
+ }
+ }
+ } catch (IOException ex) {
+ if (OraOopUtilities.oracleSessionHasBeenKilled(ex)) {
+ LOG.info("\n*********************************************************"
+ + "\nThe Oracle session in use has been killed by a 3rd party."
+ + "\n*********************************************************");
+ }
+ throw ex;
+ }
+
+ return result;
+ }
+
+ @Override
+ protected ResultSet executeQuery(String query) throws SQLException {
+
+ try {
+ this.results = super.executeQuery(query);
+ return this.results;
+ } catch (SQLException ex) {
+ LOG.error(String.format("Error in %s while executing the SQL query:\n"
+ + "%s\n\n" + "%s", OraOopUtilities.getCurrentMethodName(), query, ex
+ .getMessage()));
+ throw ex;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ if (this.profilingEnabled) {
+ LOG.info(String.format(
+ "Time spent in super.nextKeyValue() = %s seconds.",
+ this.timeSpentInNextKeyValueInNanoSeconds / Math.pow(10, 9)));
+ }
+
+ if (OraOopUtilities.userWantsOracleSessionStatisticsReports(getDBConf()
+ .getConf())) {
+ OraOopStatsReports reports =
+ OraOopUtilities.stopSessionSnapshot(this.oraOopOraStats);
+ this.oraOopOraStats = null;
+
+ LOG.info(String.format("Oracle Statistics Report for OraOop:\n\n%s",
+ reports.getPerformanceReport()));
+
+ String fileName =
+ String.format("oracle-stats-csv-%d", this.dbInputSplit.getSplitId());
+ OraOopUtilities.writeOutputFile(this.getDBConf().getConf(), fileName,
+ reports.getCsvReport());
+
+ fileName =
+ String.format("oracle-stats-%d", this.dbInputSplit.getSplitId());
+ OraOopUtilities.writeOutputFile(this.getDBConf().getConf(), fileName,
+ reports.getPerformanceReport());
+ }
+
+ super.close();
+ }
+
+ public OracleTable getOracleTableContext() {
+
+ Configuration conf = this.getDBConf().getConf();
+ OracleTable result =
+ new OracleTable(conf.get(OraOopConstants.ORAOOP_TABLE_OWNER), conf
+ .get(OraOopConstants.ORAOOP_TABLE_NAME));
+ return result;
+ }
+
+}