You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ve...@apache.org on 2014/06/29 18:03:16 UTC
[5/7] SQOOP-1287: Add high performance Oracle connector into Sqoop
(David Robson via Venkat Ranganathan)
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java b/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java
new file mode 100644
index 0000000..7fd18a1
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java
@@ -0,0 +1,1687 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.security.InvalidParameterException;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Contains the queries to get data dictionary information from Oracle database.
+ */
+public final class OraOopOracleQueries {
+
+ private static final OraOopLog LOG = OraOopLogFactory
+ .getLog(OraOopOracleQueries.class);
+
+ private static Class<?> oracleConnectionClass;
+ private static Class<?> oracleStatementClass;
+ private static Class<?> oracleResultSetClass;
+ private static Class<?> oracleTypesClass;
+ private static Class<?> oracleDateClass;
+ private static Method methSetLongAtName;
+ private static Method methSetBigDecimalAtName;
+ private static Method methSetStringAtName;
+ private static Method methSetTimestampAtName;
+ private static Method methSetBinaryDoubleAtName;
+ private static Method methSetObjectAtName;
+ private static Method methSetBinaryFloatAtName;
+ private static Method methSetIntAtName;
+
+ private static final Map<String, Integer> ORACLE_TYPES =
+ new HashMap<String, Integer>();
+
+ static {
+ try {
+ oracleStatementClass =
+ Class.forName("oracle.jdbc.OraclePreparedStatement");
+ methSetLongAtName =
+ oracleStatementClass.getMethod("setLongAtName", String.class,
+ long.class);
+ methSetBigDecimalAtName =
+ oracleStatementClass.getMethod("setBigDecimalAtName", String.class,
+ BigDecimal.class);
+ methSetStringAtName =
+ oracleStatementClass.getMethod("setStringAtName", String.class,
+ String.class);
+ methSetTimestampAtName =
+ oracleStatementClass.getMethod("setTimestampAtName", String.class,
+ Timestamp.class);
+ methSetBinaryDoubleAtName =
+ oracleStatementClass.getMethod("setBinaryDoubleAtName", String.class,
+ double.class);
+ methSetObjectAtName =
+ oracleStatementClass.getMethod("setObjectAtName", String.class,
+ Object.class);
+ methSetBinaryFloatAtName =
+ oracleStatementClass.getMethod("setBinaryFloatAtName", String.class,
+ float.class);
+ methSetIntAtName =
+ oracleStatementClass.getMethod("setIntAtName", String.class,
+ int.class);
+
+ oracleResultSetClass = Class.forName("oracle.jdbc.OracleResultSet");
+ oracleDateClass = Class.forName("oracle.sql.DATE");
+ oracleConnectionClass = Class.forName("oracle.jdbc.OracleConnection");
+ oracleTypesClass = Class.forName("oracle.jdbc.OracleTypes");
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Problem getting Oracle JDBC methods via reflection.", e);
+ }
+ }
+
+ private OraOopOracleQueries() {
+ }
+
+ protected static void setJdbcFetchSize(Connection connection,
+ org.apache.hadoop.conf.Configuration conf) {
+
+ int fetchSize =
+ conf.getInt(OraOopConstants.ORACLE_ROW_FETCH_SIZE,
+ OraOopConstants.ORACLE_ROW_FETCH_SIZE_DEFAULT);
+ try {
+ Method methSetPrefetch =
+ oracleConnectionClass.getMethod("setDefaultRowPrefetch", int.class);
+ methSetPrefetch.invoke(connection, fetchSize);
+
+ String msg =
+ "The Oracle connection has had its default row fetch size set to : "
+ + fetchSize;
+ if (fetchSize == OraOopConstants.ORACLE_ROW_FETCH_SIZE_DEFAULT) {
+ LOG.debug(msg);
+ } else {
+ LOG.info(msg);
+ }
+ } catch (Exception ex) {
+ LOG.warn(
+ String
+ .format(
+ "Unable to configure the DefaultRowPrefetch of the "
+ + "Oracle connection in %s.",
+ OraOopUtilities.getCurrentMethodName()), ex);
+ }
+
+ }
+
+ public static void setConnectionTimeZone(Connection connection,
+ Configuration conf) {
+ String timeZoneString = conf.get("oracle.sessionTimeZone", "GMT");
+ setConnectionTimeZone(connection, timeZoneString);
+ }
+
+ public static void setConnectionTimeZone(Connection connection,
+ String timeZone) {
+ TimeZone timeZoneObj = TimeZone.getTimeZone(timeZone);
+ try {
+ Method methSession =
+ oracleConnectionClass.getMethod("setSessionTimeZone", String.class);
+ Method methDefault =
+ oracleConnectionClass.getMethod("setDefaultTimeZone", TimeZone.class);
+ methSession.invoke(connection, timeZoneObj.getID());
+ methDefault.invoke(connection, timeZoneObj);
+ TimeZone.setDefault(timeZoneObj);
+ LOG.info("Session Time Zone set to " + timeZoneObj.getID());
+ } catch (Exception e) {
+ LOG.error("Error setting time zone: " + e.getMessage());
+ }
+ }
+
+ public static OracleTablePartitions getPartitions(Connection connection,
+ OracleTable table) throws SQLException {
+
+ OracleTablePartitions result = new OracleTablePartitions();
+
+ PreparedStatement statement =
+ connection
+ .prepareStatement("with"
+ + " partitions as"
+ + " (select table_owner, table_name, partition_name"
+ + " from dba_tab_partitions"
+ + " where"
+ + " table_owner = ? and"
+ + " table_name = ?),"
+ + " subpartitions as"
+ + " (select table_owner, table_name, partition_name, subpartition_name"
+ + " from dba_tab_subpartitions"
+ + " where"
+ + " table_owner = ? and"
+ + " table_name = ?)"
+ + " select"
+ + " partitions.partition_name,"
+ + " subpartitions.subpartition_name"
+ + " from partitions left outer join subpartitions on"
+ + " (partitions.table_owner = subpartitions.table_owner"
+ + " and partitions.table_name = subpartitions.table_name"
+ + " and partitions.partition_name = subpartitions.partition_name)"
+ + " order by partition_name, subpartition_name");
+
+ statement.setString(1, table.getSchema());
+ statement.setString(2, table.getName());
+ statement.setString(3, table.getSchema());
+ statement.setString(4, table.getName());
+
+ ResultSet resultSet = statement.executeQuery();
+
+ OracleTablePartition partition = null;
+ while (resultSet.next()) {
+ String partitionName = resultSet.getString("partition_name");
+ String subPartitionName = resultSet.getString("subpartition_name");
+
+ if (subPartitionName != null && !("".equals(subPartitionName))) {
+ partition = new OracleTablePartition(subPartitionName, true);
+ result.add(partition);
+ } else {
+ if (partition == null || partition.isSubPartition()
+ || partition.getName() != partitionName) {
+ partition = new OracleTablePartition(partitionName, false);
+ result.add(partition);
+ }
+ }
+ }
+
+ resultSet.close();
+ statement.close();
+
+ return result;
+ }
+
+ public static int getOracleDataObjectNumber(Connection connection,
+ OracleTable table) throws SQLException {
+
+ PreparedStatement statement =
+ connection.prepareStatement("SELECT data_object_id "
+ + " FROM dba_objects" + " WHERE owner = ?" + " and object_name = ?"
+ + " and object_type = ?");
+ statement.setString(1, table.getSchema());
+ statement.setString(2, table.getName());
+ statement.setString(3, "TABLE");
+
+ ResultSet resultSet = statement.executeQuery();
+
+ resultSet.next();
+ int result = resultSet.getInt("data_object_id");
+
+ resultSet.close();
+ statement.close();
+
+ return result;
+ }
+
+ private static String getPartitionBindVars(List<String> partitionList) {
+ String result = "";
+ for (int i = 1; i <= partitionList.size(); i++) {
+ result += (i > 1) ? "," : "";
+ result += ":part" + i;
+ }
+ return result;
+ }
+
+ private static void bindPartitionBindVars(PreparedStatement statement,
+ List<String> partitionList) throws SQLException {
+ int i = 0;
+ for (String partition : partitionList) {
+ i++;
+ OraOopOracleQueries.setStringAtName(statement, "part" + i, partition);
+ }
+ }
+
+ public static List<OraOopOracleDataChunkPartition>
+ getOracleDataChunksPartition(Connection connection, OracleTable table,
+ List<String> partitionList) throws SQLException {
+ List<OraOopOracleDataChunkPartition> result =
+ new ArrayList<OraOopOracleDataChunkPartition>();
+ String sql =
+ "SELECT "
+ + " pl.partition_name, "
+ + " pl.is_subpartition, "
+ + " s.blocks "
+ + "FROM "
+ + " (SELECT tp.table_owner, "
+ + " tp.table_name, "
+ + " NVL(tsp.subpartition_name,tp.partition_name) partition_name, "
+ + " nvl2(tsp.subpartition_name,1,0) is_subpartition "
+ + " FROM dba_tab_partitions tp, "
+ + " dba_tab_subpartitions tsp "
+ + " WHERE tp.table_owner = :table_owner"
+ + " AND tp.table_name = :table_name"
+ + " AND tsp.table_owner(+) =tp.table_owner "
+ + " AND tsp.table_name(+) =tp.table_name "
+ + " AND tsp.partition_name(+)=tp.partition_name ";
+
+ if (partitionList != null && partitionList.size() > 0) {
+ sql +=
+ " AND tp.partition_name IN (" + getPartitionBindVars(partitionList)
+ + ") ";
+ }
+
+ sql +=
+ " ) pl, " + " dba_segments s "
+ + "WHERE s.owner =pl.table_owner "
+ + "AND s.segment_name =pl.table_name "
+ + "AND s.partition_name=pl.partition_name ";
+
+ PreparedStatement statement = connection.prepareStatement(sql);
+ OraOopOracleQueries.setStringAtName(statement, "table_owner", table
+ .getSchema());
+ OraOopOracleQueries.setStringAtName(statement, "table_name", table
+ .getName());
+
+ if (partitionList != null && partitionList.size() > 0) {
+ bindPartitionBindVars(statement, partitionList);
+ }
+
+ trace(String.format("%s SQL Query =\n%s", OraOopUtilities
+ .getCurrentMethodName(), sql.replace(":table_owner", table.getSchema())
+ .replace(":table_name", table.getName())));
+
+ ResultSet resultSet = statement.executeQuery();
+
+ while (resultSet.next()) {
+ OraOopOracleDataChunkPartition dataChunk =
+ new OraOopOracleDataChunkPartition(resultSet
+ .getString("partition_name"), resultSet
+ .getBoolean("is_subpartition"), resultSet.getInt("blocks"));
+ result.add(dataChunk);
+ }
+ resultSet.close();
+ statement.close();
+ return result;
+ }
+
+ public static List<OraOopOracleDataChunkExtent> getOracleDataChunksExtent(
+ Configuration conf, Connection connection, OracleTable table,
+ List<String> partitionList, int numberOfChunksPerOracleDataFile)
+ throws SQLException {
+
+ List<OraOopOracleDataChunkExtent> result =
+ new ArrayList<OraOopOracleDataChunkExtent>();
+
+ String sql =
+ "SELECT data_object_id, "
+ + "file_id, "
+ + "relative_fno, "
+ + "file_batch, "
+ + "MIN (start_block_id) start_block_id, "
+ + "MAX (end_block_id) end_block_id, "
+ + "SUM (blocks) blocks "
+ + "FROM (SELECT o.data_object_id, "
+ + "e.file_id, "
+ + "e.relative_fno, "
+ + "e.block_id start_block_id, "
+ + "e.block_id + e.blocks - 1 end_block_id, "
+ + "e.blocks, "
+ + "CEIL ( "
+ + " SUM ( "
+ + " e.blocks) "
+ + " OVER (PARTITION BY o.data_object_id, e.file_id "
+ + " ORDER BY e.block_id ASC) "
+ + " / (SUM (e.blocks) "
+ + " OVER (PARTITION BY o.data_object_id, e.file_id) "
+ + " / :numchunks)) "
+ + " file_batch "
+ + "FROM dba_extents e, dba_objects o, dba_tab_subpartitions tsp "
+ + "WHERE o.owner = :owner "
+ + "AND o.object_name = :object_name "
+ + "AND e.owner = :owner "
+ + "AND e.segment_name = :object_name "
+ + "AND o.owner = e.owner "
+ + "AND o.object_name = e.segment_name "
+ + "AND (o.subobject_name = e.partition_name "
+ + " OR (o.subobject_name IS NULL AND e.partition_name IS NULL)) "
+ + "AND o.owner = tsp.table_owner(+) "
+ + "AND o.object_name = tsp.table_name(+) "
+ + "AND o.subobject_name = tsp.subpartition_name(+) ";
+
+ if (partitionList != null && partitionList.size() > 0) {
+ sql +=
+ " AND case when o.object_type='TABLE SUBPARTITION' then "
+ + "tsp.partition_name else o.subobject_name end IN ("
+ + getPartitionBindVars(partitionList) + ") ";
+ }
+
+ sql +=
+ ") " + "GROUP BY data_object_id, " + " file_id, "
+ + " relative_fno, " + " file_batch "
+ + "ORDER BY data_object_id, " + " file_id, "
+ + " relative_fno, " + " file_batch";
+
+ sql = conf.get(OraOopConstants.ORAOOP_ORACLE_DATA_CHUNKS_QUERY, sql);
+
+ PreparedStatement statement = connection.prepareStatement(sql);
+ OraOopOracleQueries.setIntAtName(statement, "numchunks",
+ numberOfChunksPerOracleDataFile);
+ OraOopOracleQueries.setStringAtName(statement, "owner", table.getSchema());
+ OraOopOracleQueries.setStringAtName(statement, "object_name", table
+ .getName());
+
+ if (partitionList != null && partitionList.size() > 0) {
+ bindPartitionBindVars(statement, partitionList);
+ }
+
+ trace(String.format("%s SQL Query =\n%s", OraOopUtilities
+ .getCurrentMethodName(), sql.replace(":numchunks",
+ Integer.toString(numberOfChunksPerOracleDataFile)).replace(":owner",
+ table.getSchema()).replace(":object_name", table.getName())));
+
+ ResultSet resultSet = statement.executeQuery();
+
+ while (resultSet.next()) {
+ int fileId = resultSet.getInt("relative_fno");
+ int fileBatch = resultSet.getInt("file_batch");
+ String dataChunkId =
+ OraOopUtilities.generateDataChunkId(fileId, fileBatch);
+ OraOopOracleDataChunkExtent dataChunk =
+ new OraOopOracleDataChunkExtent(dataChunkId, resultSet
+ .getInt("data_object_id"), resultSet.getInt("relative_fno"),
+ resultSet.getInt("start_block_id"), resultSet
+ .getInt("end_block_id"));
+ result.add(dataChunk);
+ }
+
+ resultSet.close();
+ statement.close();
+
+ return result;
+ }
+
+ private static void trace(String message) {
+
+ LOG.debug(message);
+ }
+
+ public static String getOracleObjectType(Connection connection,
+ OracleTable table) throws SQLException {
+
+ PreparedStatement statement =
+ connection.prepareStatement("SELECT object_type " + " FROM dba_objects"
+ + " WHERE owner = ?" + " and object_name = ?");
+ statement.setString(1, table.getSchema());
+ statement.setString(2, table.getName());
+
+ ResultSet resultSet = statement.executeQuery();
+
+ String result = null;
+ if (resultSet.next()) {
+ result = resultSet.getString("object_type");
+ }
+
+ resultSet.close();
+ statement.close();
+
+ return result;
+ }
+
+ public static OracleVersion getOracleVersion(Connection connection)
+ throws SQLException {
+
+ String sql =
+ "SELECT \n"
+ + " v.banner, \n"
+ + " rtrim(v.version) full_version, \n"
+ + " rtrim(v.version_bit) version_bit, \n"
+ + " SUBSTR(v.version, 1, INSTR(v.version, '.', 1, 1)-1) major, \n"
+ + " SUBSTR(v.version, INSTR(v.version, '.', 1, 1) + 1, "
+ + " INSTR(v.version, '.', 1, 2) - INSTR(v.version, '.', 1, 1) - 1) "
+ + " minor, \n"
+ + " SUBSTR(v.version, INSTR(v.version, '.', 1, 2) + 1, "
+ + " INSTR(v.version, '.', 1, 3) - INSTR(v.version, '.', 1, 2) - 1) "
+ + " version, \n"
+ + " SUBSTR(v.version, INSTR(v.version, '.', 1, 3) + 1, "
+ + " INSTR(v.version, '.', 1, 4) - INSTR(v.version, '.', 1, 3) - 1) "
+ + " patch, \n"
+ + " DECODE(instr(v.banner, '64bit'), 0, 'False', 'True') isDb64bit, \n"
+ + " DECODE(instr(b.banner, 'HPUX'), 0, 'False', 'True') isHPUX \n"
+ + "FROM (SELECT rownum row_num, \n"
+ + " banner,\n"
+ + " SUBSTR(SUBSTR(banner,INSTR(banner,'Release ')+8), 1) version_bit,\n"
+ + " SUBSTR(SUBSTR(banner,INSTR(banner,'Release ')+8), 1,\n"
+ + " INSTR(SUBSTR(banner,INSTR(banner,'Release ')+8),' ')) version\n"
+ + "FROM v$version\n" + " WHERE banner LIKE 'Oracle%'\n"
+ + " OR banner LIKE 'Personal Oracle%') v,\n" + "v$version b\n"
+ + " WHERE v.row_num = 1\n" + " and b.banner like 'TNS%'\n";
+
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ resultSet.next();
+ OracleVersion result =
+ new OracleVersion(resultSet.getInt("major"), resultSet.getInt("minor"),
+ resultSet.getInt("version"), resultSet.getInt("patch"), resultSet
+ .getString("banner"));
+
+ resultSet.close();
+ statement.close();
+
+ return result;
+ }
+
+ public static List<OracleTable> getTables(Connection connection)
+ throws SQLException {
+
+ return getTables(connection, null, null, TableNameQueryType.Equals);
+ }
+
+ private enum GetTablesOptions {
+ Owner, Table
+ }
+
+ private enum TableNameQueryType {
+ Equals, Like
+ }
+
+ public static List<OracleTable>
+ getTables(Connection connection, String owner) throws SQLException {
+
+ return getTables(connection, owner, null, TableNameQueryType.Equals);
+ }
+
+ public static OracleTable getTable(Connection connection, String owner,
+ String tableName) throws SQLException {
+
+ List<OracleTable> tables =
+ getTables(connection, owner, tableName, TableNameQueryType.Equals);
+ if (tables.size() > 0) {
+ return tables.get(0);
+ }
+
+ return null;
+ }
+
+ public static List<OracleTable> getTablesWithTableNameLike(
+ Connection connection, String owner, String tableNameLike)
+ throws SQLException {
+
+ return getTables(connection, owner, tableNameLike, TableNameQueryType.Like);
+ }
+
+ private static List<OracleTable> getTables(Connection connection,
+ String owner, String tableName, TableNameQueryType tableNameQueryType)
+ throws SQLException {
+
+ EnumSet<GetTablesOptions> options = EnumSet.noneOf(GetTablesOptions.class);
+
+ if (owner != null && !owner.isEmpty()) {
+ options.add(GetTablesOptions.Owner);
+ }
+
+ if (tableName != null && !tableName.isEmpty()) {
+ options.add(GetTablesOptions.Table);
+ }
+
+ String sql =
+ "SELECT owner, table_name " + " FROM dba_tables" + " %s %s %s %s "
+ + " ORDER BY owner, table_name";
+
+ String tableComparitor = null;
+ switch (tableNameQueryType) {
+ case Equals:
+ tableComparitor = "=";
+ break;
+ case Like:
+ tableComparitor = "LIKE";
+ break;
+ default:
+ throw new RuntimeException("Operator not implemented.");
+ }
+
+ sql =
+ String.format(sql, options.isEmpty() ? "" : "WHERE", options
+ .contains(GetTablesOptions.Owner) ? "owner = ?" : "", options
+ .containsAll(EnumSet.of(GetTablesOptions.Owner,
+ GetTablesOptions.Table)) ? "AND" : "", options
+ .contains(GetTablesOptions.Table) ? String.format(
+ "table_name %s ?", tableComparitor) : "");
+
+ PreparedStatement statement = connection.prepareStatement(sql);
+
+ if (options.containsAll(EnumSet.of(GetTablesOptions.Owner,
+ GetTablesOptions.Table))) {
+ statement.setString(1, owner);
+ statement.setString(2, tableName);
+ } else {
+ if (options.contains(GetTablesOptions.Owner)) {
+ statement.setString(1, owner);
+ } else if (options.contains(GetTablesOptions.Table)) {
+ statement.setString(1, tableName);
+ }
+ }
+
+ ResultSet resultSet = statement.executeQuery();
+
+ ArrayList<OracleTable> result = new ArrayList<OracleTable>();
+ while (resultSet.next()) {
+ result.add(new OracleTable(resultSet.getString("owner"), resultSet
+ .getString("table_name")));
+ }
+
+ resultSet.close();
+ statement.close();
+
+ return result;
+ }
+
+ public static List<String> getTableColumnNames(Connection connection,
+ OracleTable table) throws SQLException {
+
+ OracleTableColumns oracleTableColumns = getTableColumns(connection, table);
+ List<String> result = new ArrayList<String>(oracleTableColumns.size());
+
+ for (int idx = 0; idx < oracleTableColumns.size(); idx++) {
+ result.add(oracleTableColumns.get(idx).getName());
+ }
+
+ return result;
+ }
+
+ public static List<String> getTableColumnNames(Connection connection,
+ OracleTable table, boolean omitLobAndLongColumnsDuringImport,
+ OraOopConstants.Sqoop.Tool sqoopTool, boolean onlyOraOopSupportedTypes,
+ boolean omitOraOopPseudoColumns) throws SQLException {
+
+ OracleTableColumns oracleTableColumns =
+ getTableColumns(connection, table, omitLobAndLongColumnsDuringImport,
+ sqoopTool, onlyOraOopSupportedTypes, omitOraOopPseudoColumns);
+
+ List<String> result = new ArrayList<String>(oracleTableColumns.size());
+
+ for (int idx = 0; idx < oracleTableColumns.size(); idx++) {
+ result.add(oracleTableColumns.get(idx).getName());
+ }
+
+ return result;
+ }
+
+ private static OracleTableColumns getTableColumns(Connection connection,
+ OracleTable table, boolean omitLobColumns, String dataTypesClause,
+ HashSet<String> columnNamesToOmit) throws SQLException {
+
+ String sql =
+ "SELECT column_name, data_type " + " FROM dba_tab_columns"
+ + " WHERE owner = ?" + " and table_name = ?" + " %s"
+ + " ORDER BY column_id";
+
+ sql =
+ String.format(sql, dataTypesClause == null ? "" : " and "
+ + dataTypesClause);
+
+ LOG.debug(String.format("%s : sql = \n%s", OraOopUtilities
+ .getCurrentMethodName(), sql));
+
+ OracleTableColumns result = new OracleTableColumns();
+ PreparedStatement statement = connection.prepareStatement(sql);
+ statement.setString(1, getTableSchema(connection, table));
+ statement.setString(2, table.getName());
+
+ ResultSet resultSet = statement.executeQuery();
+
+ while (resultSet.next()) {
+
+ String columnName = resultSet.getString("column_name");
+
+ if (columnNamesToOmit != null) {
+ if (columnNamesToOmit.contains(columnName)) {
+ continue;
+ }
+ }
+
+ result.add(new OracleTableColumn(columnName, resultSet
+ .getString("data_type")));
+ }
+
+ resultSet.close();
+ statement.close();
+
+ // Now get the actual JDBC data-types for these columns...
+ StringBuilder columnList = new StringBuilder();
+ for (int idx = 0; idx < result.size(); idx++) {
+ if (idx > 0) {
+ columnList.append(",");
+ }
+ columnList.append(result.get(idx).getName());
+ }
+ sql =
+ String.format("SELECT %s FROM %s WHERE 0=1", columnList.toString(),
+ table.toString());
+ Statement statementDesc = connection.createStatement();
+ ResultSet resultSetDesc = statementDesc.executeQuery(sql);
+ ResultSetMetaData metaData = resultSetDesc.getMetaData();
+ for (int idx = 0; idx < metaData.getColumnCount(); idx++) {
+ result.get(idx).setOracleType(metaData.getColumnType(idx + 1)); // <- JDBC
+ // is
+ // 1-based
+ }
+ resultSetDesc.close();
+ statementDesc.close();
+
+ return result;
+ }
+
+ public static OracleTableColumns getTableColumns(Connection connection,
+ OracleTable table) throws SQLException {
+
+ return getTableColumns(connection, table, false, null // <- dataTypesClause
+ , null); // <-columnNamesToOmit
+ }
+
+ public static OracleTableColumns getTableColumns(Connection connection,
+ OracleTable table, boolean omitLobAndLongColumnsDuringImport,
+ OraOopConstants.Sqoop.Tool sqoopTool, boolean onlyOraOopSupportedTypes,
+ boolean omitOraOopPseudoColumns) throws SQLException {
+
+ String dataTypesClause = "";
+ HashSet<String> columnNamesToOmit = null;
+
+ if (onlyOraOopSupportedTypes) {
+
+ switch (sqoopTool) {
+
+ case UNKNOWN:
+ throw new InvalidParameterException(
+ "The sqoopTool parameter must not be \"UNKNOWN\".");
+
+ case IMPORT:
+ dataTypesClause =
+ OraOopConstants.SUPPORTED_IMPORT_ORACLE_DATA_TYPES_CLAUSE;
+
+ if (omitLobAndLongColumnsDuringImport) {
+ LOG.info("LOB and LONG columns are being omitted from the Import.");
+ dataTypesClause =
+ " DATA_TYPE not in ('BLOB', 'CLOB', 'NCLOB', 'LONG') and "
+ + dataTypesClause;
+ }
+ break;
+
+ case EXPORT:
+ dataTypesClause =
+ OraOopConstants.SUPPORTED_EXPORT_ORACLE_DATA_TYPES_CLAUSE;
+ break;
+
+ default:
+ throw new InvalidParameterException("Sqoop Tool not implemented.");
+
+ }
+ }
+
+ if (omitOraOopPseudoColumns) {
+
+ switch (sqoopTool) {
+
+ case EXPORT:
+ if (columnNamesToOmit == null) {
+ columnNamesToOmit = new HashSet<String>();
+ }
+ columnNamesToOmit.add(OraOopConstants.COLUMN_NAME_EXPORT_PARTITION);
+ columnNamesToOmit
+ .add(OraOopConstants.COLUMN_NAME_EXPORT_SUBPARTITION);
+ columnNamesToOmit.add(OraOopConstants.COLUMN_NAME_EXPORT_MAPPER_ROW);
+ break;
+ default:
+ // Only applicable for export.
+ break;
+ }
+ }
+
+ return getTableColumns(connection, table,
+ omitLobAndLongColumnsDuringImport, dataTypesClause, columnNamesToOmit);
+ }
+
+ public static List<OracleActiveInstance> getOracleActiveInstances(
+ Connection connection) throws SQLException {
+
+ // Returns null if there are no rows in v$active_instances - which indicates
+ // this Oracle database is not a RAC.
+ ArrayList<OracleActiveInstance> result = null;
+
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
+ statement.executeQuery("select inst_name from v$active_instances ");
+
+ while (resultSet.next()) {
+ String instName = resultSet.getString("inst_name");
+ String[] nameFragments = instName.split(":");
+
+ if (nameFragments.length != 2) {
+ throw new SQLException(
+ "Parsing Error: The inst_name column of v$active_instances does "
+ + "not contain two values separated by a colon.");
+ }
+
+ String hostName = nameFragments[0].trim();
+ String instanceName = nameFragments[1].trim();
+
+ if (hostName.isEmpty()) {
+ throw new SQLException(
+ "Parsing Error: The inst_name column of v$active_instances does "
+ + "not include a host name.");
+ }
+
+ if (instanceName.isEmpty()) {
+ throw new SQLException(
+ "Parsing Error: The inst_name column of v$active_instances does "
+ + "not include an instance name.");
+ }
+
+ OracleActiveInstance instance = new OracleActiveInstance();
+ instance.setHostName(hostName);
+ instance.setInstanceName(instanceName);
+
+ if (result == null) {
+ result = new ArrayList<OracleActiveInstance>();
+ }
+
+ result.add(instance);
+ }
+
+ resultSet.close();
+ statement.close();
+ return result;
+ }
+
+ public static String getCurrentOracleInstanceName(Connection connection)
+ throws SQLException {
+
+ String result = "";
+
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
+ statement.executeQuery("select instance_name from v$instance");
+
+ if (resultSet.next()) {
+ result = resultSet.getString("instance_name");
+ }
+
+ resultSet.close();
+ statement.close();
+ return result;
+ }
+
+ public static Object getSysDate(Connection connection) throws SQLException {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery("select sysdate from dual");
+
+ resultSet.next();
+ try {
+ Method method = oracleResultSetClass.getMethod("getDATE", int.class);
+ return method.invoke(resultSet, 1);
+ } catch (Exception e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ } else {
+ throw new RuntimeException("Could not get sysdate", e);
+ }
+ } finally {
+ resultSet.close();
+ statement.close();
+ }
+ }
+
+ public static String oraDATEToString(Object date, String format) {
+ try {
+ Method dateMethod =
+ oracleDateClass.getMethod("toText", String.class, String.class);
+ return (String) dateMethod.invoke(date, format, null);
+ } catch (Exception e) {
+ throw new RuntimeException(String.format(
+ "Unable to convert the oracle.sql.DATE value \"%s\" to text.", date
+ .toString()), e);
+ }
+ }
+
+ public static Object oraDATEFromString(String date, String format) {
+ try {
+ Method dateMethod =
+ oracleDateClass.getMethod("fromText", String.class, String.class,
+ String.class);
+ return dateMethod.invoke(null, date, format, null);
+ } catch (Exception e) {
+ throw new RuntimeException(String
+ .format(
+ "Unable to convert the String value \"%s\" to oracle.sql.DATE.",
+ date), e);
+ }
+ }
+
+ public static Date oraDATEToDate(Object date) {
+ try {
+ Method dateMethod = oracleDateClass.getMethod("dateValue");
+ return (Date) dateMethod.invoke(date);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not get sysdate", e);
+ }
+ }
+
+ public static String getSysTimeStamp(Connection connection)
+ throws SQLException {
+
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
+ statement.executeQuery("select systimestamp from dual");
+
+ resultSet.next();
+
+ try {
+ Method method = oracleResultSetClass.getMethod("getTIMESTAMP", int.class);
+ Object timestamp = method.invoke(resultSet, 1);
+ return timestamp.toString();
+ } catch (Exception e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ } else {
+ throw new RuntimeException("Could not get sysdate", e);
+ }
+ } finally {
+ resultSet.close();
+ statement.close();
+ }
+ }
+
+ public static boolean isTableAnIndexOrganizedTable(Connection connection,
+ OracleTable table) throws SQLException {
+
+ /*
+ * http://ss64.com/orad/DBA_TABLES.html IOT_TYPE: If index-only table,then
+ * IOT_TYPE is IOT or IOT_OVERFLOW or IOT_MAPPING else NULL
+ */
+
+ boolean result = false;
+
+ PreparedStatement statement =
+ connection.prepareStatement("select iot_type " + "from dba_tables "
+ + "where owner = ? " + "and table_name = ?");
+ statement.setString(1, table.getSchema());
+ statement.setString(2, table.getName());
+ ResultSet resultSet = statement.executeQuery();
+
+ if (resultSet.next()) {
+ String iotType = resultSet.getString("iot_type");
+ result = iotType != null && !iotType.isEmpty();
+ }
+
+ resultSet.close();
+ statement.close();
+
+ return result;
+ }
+
+ public static void dropTable(Connection connection, OracleTable table)
+ throws SQLException {
+
+ String sql = String.format("DROP TABLE %s", table.toString());
+
+ Statement statement = connection.createStatement();
+ try {
+ statement.execute(sql);
+ } catch (SQLException ex) {
+ if (ex.getErrorCode() != 942) { // ORA-00942: table or view does not exist
+ throw ex;
+ }
+ }
+ statement.close();
+ }
+
+ public static void
+ exchangeSubpartition(Connection connection, OracleTable table,
+ String subPartitionName, OracleTable subPartitionTable)
+ throws SQLException {
+
+ Statement statement = connection.createStatement();
+ String sql =
+ String.format("ALTER TABLE %s EXCHANGE SUBPARTITION %s WITH TABLE %s",
+ table.toString(), subPartitionName, subPartitionTable.toString());
+ statement.execute(sql);
+ statement.close();
+ }
+
+ public static void createExportTableFromTemplate(Connection connection,
+ OracleTable newTable, String tableStorageClause,
+ OracleTable templateTable, boolean noLogging) throws SQLException {
+
+ String sql =
+ String.format("CREATE TABLE %s \n" + "%s %s \n" + "AS \n"
+ + "(SELECT * FROM %s WHERE 0=1)", newTable.toString(),
+ noLogging ? "NOLOGGING" : "", tableStorageClause, templateTable
+ .toString());
+
+ Statement statement = connection.createStatement();
+ statement.execute(sql);
+ statement.close();
+ }
+
+ private static Object oraDATEAddJulianDays(Object date, int julianDay,
+ int julianSec) {
+ try {
+ Constructor<?> dateCon = oracleDateClass.getConstructor(byte[].class);
+ Method dateBytes = oracleDateClass.getMethod("toBytes");
+ Object result = dateCon.newInstance(dateBytes.invoke(date));
+ Method dateAdd =
+ oracleDateClass.getMethod("addJulianDays", int.class, int.class);
+ result = dateAdd.invoke(result, julianDay, julianSec);
+ return result;
+ } catch (Exception e) {
+ throw new RuntimeException("Could not add days to date.", e);
+ }
+ }
+
+ public static void createExportTableFromTemplateWithPartitioning(
+ Connection connection, OracleTable newTable, String tableStorageClause,
+ OracleTable templateTable, boolean noLogging, String partitionName,
+ Object jobDateTime, int numberOfMappers, String[] subPartitionNames)
+ throws SQLException {
+
+ String dateFormat = "yyyy-mm-dd hh24:mi:ss";
+
+ Object partitionBound =
+ OraOopOracleQueries.oraDATEAddJulianDays(jobDateTime, 0, 1);
+
+ String partitionBoundStr =
+ OraOopOracleQueries.oraDATEToString(partitionBound, dateFormat);
+
+ StringBuilder subPartitions = new StringBuilder();
+ for (int idx = 0; idx < numberOfMappers; idx++) {
+ if (idx > 0) {
+ subPartitions.append(",");
+ }
+
+ subPartitions.append(String.format(" SUBPARTITION %s VALUES (%d)",
+ subPartitionNames[idx], idx));
+ }
+
+ String sql =
+ String.format(
+ "CREATE TABLE %s \n" + "%s %s \n" + "PARTITION BY RANGE (%s) \n"
+ + "SUBPARTITION BY LIST (%s) \n" + "(PARTITION %s \n"
+ + "VALUES LESS THAN (to_date('%s', '%s')) \n" + "( %s ) \n"
+ + ") \n" + "AS \n"
+ + "(SELECT t.*, sysdate %s, 0 %s, 0 %s FROM %s t \n"
+ + "WHERE 0=1)", newTable.toString(), noLogging ? "NOLOGGING"
+ : "", tableStorageClause,
+ OraOopConstants.COLUMN_NAME_EXPORT_PARTITION,
+ OraOopConstants.COLUMN_NAME_EXPORT_SUBPARTITION, partitionName,
+ partitionBoundStr, dateFormat, subPartitions.toString(),
+ OraOopConstants.COLUMN_NAME_EXPORT_PARTITION,
+ OraOopConstants.COLUMN_NAME_EXPORT_SUBPARTITION,
+ OraOopConstants.COLUMN_NAME_EXPORT_MAPPER_ROW, templateTable
+ .toString());
+
+ LOG.debug(String.format("SQL generated by %s:\n%s", OraOopUtilities
+ .getCurrentMethodName(), sql));
+
+ try {
+
+ // Create the main export table...
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ preparedStatement.execute(sql);
+ preparedStatement.close();
+ } catch (SQLException ex) {
+ LOG.error(String
+ .format(
+ "The error \"%s\" was encountered when executing the following "
+ + "SQL statement:\n%s",
+ ex.getMessage(), sql));
+ throw ex;
+ }
+ }
+
+ public static void createExportTableForMapper(Connection connection,
+ OracleTable table, String tableStorageClause, OracleTable templateTable,
+ boolean addOraOopPartitionColumns) throws SQLException {
+
+ String sql = "";
+ try {
+
+ // Create the N tables to be used by the mappers...
+ Statement statement = connection.createStatement();
+ if (addOraOopPartitionColumns) {
+ sql =
+ String.format("CREATE TABLE %s \n" + "NOLOGGING %s \n" + "AS \n"
+ + "(SELECT t.*, SYSDATE %s, 0 %s, 0 %s FROM %s t WHERE 0=1)",
+ table.toString(), tableStorageClause,
+ OraOopConstants.COLUMN_NAME_EXPORT_PARTITION,
+ OraOopConstants.COLUMN_NAME_EXPORT_SUBPARTITION,
+ OraOopConstants.COLUMN_NAME_EXPORT_MAPPER_ROW, templateTable
+ .toString());
+ } else {
+ sql =
+ String.format("CREATE TABLE %s \n" + "NOLOGGING %s \n" + "AS \n"
+ + "(SELECT * FROM %s WHERE 0=1)", table.toString(),
+ tableStorageClause, templateTable.toString());
+ }
+
+ LOG.info(String.format("SQL generated by %s:\n%s", OraOopUtilities
+ .getCurrentMethodName(), sql));
+
+ statement.execute(sql);
+ statement.close();
+ } catch (SQLException ex) {
+ LOG.error(String
+ .format(
+ "The error \"%s\" was encountered when executing the following "
+ + "SQL statement:\n%s",
+ ex.getMessage(), sql));
+ throw ex;
+ }
+ }
+
+ // public static void createExportTablesForMappers(Connection connection
+ // ,String[] mapperTableNames
+ // ,OracleTable templateTable
+ // ,boolean addOraOopPartitionColumns)
+ // throws SQLException {
+ //
+ // for(String tableName : mapperTableNames)
+ // createExportTableForMapper(connection, tableName, templateTable,
+ // addOraOopPartitionColumns);
+ // }
+
+ public static void createMoreExportTablePartitions(Connection connection,
+ OracleTable table, String partitionName, Object jobDateTime,
+ String[] subPartitionNames) throws SQLException {
+
+ String dateFormat = "yyyy-mm-dd hh24:mi:ss";
+
+ Object partitionBound =
+ OraOopOracleQueries.oraDATEAddJulianDays(jobDateTime, 0, 1);
+ String partitionBoundStr =
+ OraOopOracleQueries.oraDATEToString(partitionBound, dateFormat);
+
+ StringBuilder subPartitions = new StringBuilder();
+ for (int idx = 0; idx < subPartitionNames.length; idx++) {
+ if (idx > 0) {
+ subPartitions.append(",");
+ }
+
+ subPartitions.append(String.format(" SUBPARTITION %s VALUES (%d)",
+ subPartitionNames[idx], idx));
+ }
+
+ String sql =
+ String.format("ALTER TABLE %s " + "ADD PARTITION %s "
+ + "VALUES LESS THAN (to_date('%s', '%s'))" + "( %s ) ", table
+ .toString(), partitionName, partitionBoundStr, dateFormat,
+ subPartitions.toString());
+
+ LOG.debug(String.format("SQL generated by %s:\n%s", OraOopUtilities
+ .getCurrentMethodName(), sql));
+
+ try {
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ preparedStatement.execute(sql);
+ preparedStatement.close();
+
+ } catch (SQLException ex) {
+ LOG.error(String
+ .format(
+ "The error \"%s\" was encountered when executing the following "
+ + "SQL statement:\n%s",
+ ex.getMessage(), sql));
+ throw ex;
+ }
+ }
+
+ public static void mergeTable(Connection connection, OracleTable targetTable,
+ OracleTable sourceTable, String[] mergeColumnNames,
+ OracleTableColumns oracleTableColumns, Object oraOopSysDate,
+ int oraOopMapperId, boolean parallelizationEnabled) throws SQLException {
+
+ StringBuilder updateClause = new StringBuilder();
+ StringBuilder insertClause = new StringBuilder();
+ StringBuilder valuesClause = new StringBuilder();
+ for (int idx = 0; idx < oracleTableColumns.size(); idx++) {
+ OracleTableColumn oracleTableColumn = oracleTableColumns.get(idx);
+ String columnName = oracleTableColumn.getName();
+
+ if (insertClause.length() > 0) {
+ insertClause.append(",");
+ }
+ insertClause.append(String.format("target.%s", columnName));
+
+ if (valuesClause.length() > 0) {
+ valuesClause.append(",");
+ }
+ valuesClause.append(String.format("source.%s", columnName));
+
+ if (!OraOopUtilities.stringArrayContains(mergeColumnNames, columnName,
+ true)) {
+
+ // If we're performing a merge, then the table is not partitioned. (If
+ // the table
+ // was partitioned, we'd be deleting and then inserting rows.)
+ if (!columnName
+ .equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_PARTITION)
+ && !columnName
+ .equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_SUBPARTITION)
+ && !columnName
+ .equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_MAPPER_ROW)) {
+
+ if (updateClause.length() > 0) {
+ updateClause.append(",");
+ }
+ updateClause.append(String.format("target.%1$s = source.%1$s",
+ columnName));
+
+ }
+ }
+ }
+
+ String sourceClause = valuesClause.toString();
+
+ String sql =
+ String.format("MERGE %7$s INTO %1$s target \n"
+ + "USING (SELECT %8$s * FROM %2$s) source \n" + " ON (%3$s) \n"
+ + "WHEN MATCHED THEN \n" + " UPDATE SET %4$s \n"
+ + "WHEN NOT MATCHED THEN \n" + " INSERT (%5$s) \n"
+ + " VALUES (%6$s)", targetTable.toString(),
+ sourceTable.toString(),
+ generateUpdateKeyColumnsWhereClauseFragment(mergeColumnNames,
+ "target", "source"), updateClause.toString(), insertClause
+ .toString(), sourceClause,
+ parallelizationEnabled ? "/*+ append parallel(target) */" : "",
+ parallelizationEnabled ? "/*+parallel*/" : "");
+
+ LOG.info(String.format("Merge SQL statement:\n" + sql));
+
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ resultSet.close();
+ statement.close();
+ }
+
+ public static void updateTable(Connection connection,
+ OracleTable targetTable, OracleTable sourceTable,
+ String[] mergeColumnNames, OracleTableColumns oracleTableColumns,
+ Object oraOopSysDate, int oraOopMapperId, boolean parallelizationEnabled)
+ throws SQLException {
+
+ StringBuilder targetColumnsClause = new StringBuilder();
+ StringBuilder sourceColumnsClause = new StringBuilder();
+ for (int idx = 0; idx < oracleTableColumns.size(); idx++) {
+ OracleTableColumn oracleTableColumn = oracleTableColumns.get(idx);
+ String columnName = oracleTableColumn.getName();
+
+ if (targetColumnsClause.length() > 0) {
+ targetColumnsClause.append(",");
+ }
+ targetColumnsClause.append(String.format("a.%s", columnName));
+
+ if (sourceColumnsClause.length() > 0) {
+ sourceColumnsClause.append(",");
+ }
+ sourceColumnsClause.append(String.format("b.%s", columnName));
+ }
+
+ String sourceClause = sourceColumnsClause.toString();
+
+ sourceClause =
+ sourceClause.replaceAll(OraOopConstants.COLUMN_NAME_EXPORT_PARTITION,
+ String.format("to_date('%s', 'yyyy/mm/dd hh24:mi:ss')",
+ OraOopOracleQueries.oraDATEToString(oraOopSysDate,
+ "yyyy/mm/dd hh24:mi:ss")));
+
+ sourceClause =
+ sourceClause.replaceAll(
+ OraOopConstants.COLUMN_NAME_EXPORT_SUBPARTITION, Integer
+ .toString(oraOopMapperId));
+
+ String sql =
+ String.format("UPDATE %5$s %1$s a \n" + "SET \n" + "(%2$s) \n"
+ + "= (SELECT \n" + "%3$s \n" + "FROM %4$s b \n" + "WHERE %6$s) \n"
+ + "WHERE EXISTS (SELECT null FROM %4$s c " + "WHERE %7$s)",
+ targetTable.toString(), targetColumnsClause.toString(),
+ sourceClause, sourceTable.toString(),
+ parallelizationEnabled ? "/*+ parallel */" : "",
+ generateUpdateKeyColumnsWhereClauseFragment(mergeColumnNames, "b",
+ "a"), generateUpdateKeyColumnsWhereClauseFragment(
+ mergeColumnNames, "c", "a"));
+
+ LOG.info(String.format("Update SQL statement:\n" + sql));
+
+ Statement statement = connection.createStatement();
+ int rowsAffected = statement.executeUpdate(sql);
+
+ LOG.info(String.format(
+ "The number of rows affected by the update SQL was: %d", rowsAffected));
+
+ statement.close();
+ }
+
+ /**
+ * Whether new rows should be included in changes table or not.
+ */
+ public enum CreateExportChangesTableOptions {
+ OnlyRowsThatDiffer, RowsThatDifferPlusNewRows
+ }
+
+ public static int createExportChangesTable(Connection connection,
+ OracleTable tableToCreate, String tableToCreateStorageClause,
+ OracleTable tableContainingUpdates, OracleTable tableToBeUpdated,
+ String[] joinColumnNames, CreateExportChangesTableOptions options,
+ boolean parallelizationEnabled) throws SQLException {
+
+ List<String> columnNames =
+ getTableColumnNames(connection, tableToBeUpdated
+ , false // <- omitLobAndLongColumnsDuringImport
+ , OraOopConstants.Sqoop.Tool.EXPORT
+ , true // <- onlyOraOopSupportedTypes
+ , false // <- omitOraOopPseudoColumns
+ );
+
+ StringBuilder columnClause = new StringBuilder(2 * columnNames.size());
+ for (int idx = 0; idx < columnNames.size(); idx++) {
+ if (idx > 0) {
+ columnClause.append(",");
+ }
+ columnClause.append("a." + columnNames.get(idx));
+ }
+
+ StringBuilder rowEqualityClause = new StringBuilder();
+ for (int idx = 0; idx < columnNames.size(); idx++) {
+ String columnName = columnNames.get(idx);
+
+ // We need to omit the OraOop pseudo columns from the SQL statement that
+ // compares the data in
+ // the two tables we're interested in. Otherwise, EVERY row will be
+ // considered to be changed,
+ // since the values in the pseudo columns will differ. (i.e.
+ // ORAOOP_EXPORT_SYSDATE will differ.)
+ if (columnName
+ .equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_PARTITION)
+ || columnName
+ .equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_SUBPARTITION)
+ || columnName
+ .equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_MAPPER_ROW)) {
+ continue;
+ }
+
+ if (idx > 0) {
+ rowEqualityClause.append("OR");
+ }
+
+ rowEqualityClause.append(String.format("(a.%1$s <> b.%1$s "
+ + "OR (a.%1$s IS NULL AND b.%1$s IS NOT NULL) "
+ + "OR (a.%1$s IS NOT NULL AND b.%1$s IS NULL))", columnName));
+ }
+
+ String sqlJoin = null;
+ switch (options) {
+
+ case OnlyRowsThatDiffer:
+ sqlJoin = "";
+ break;
+
+ case RowsThatDifferPlusNewRows:
+ sqlJoin = "(+)"; // <- An outer-join will cause the "new" rows to be
+ // included
+ break;
+
+ default:
+ throw new RuntimeException(String.format(
+ "Update %s to cater for the option \"%s\".", OraOopUtilities
+ .getCurrentMethodName(), options.toString()));
+ }
+
+ String sql =
+ String.format("CREATE TABLE %1$s \n" + "NOLOGGING %8$s \n" + "%7$s \n"
+ + "AS \n " + "SELECT \n" + "%5$s \n" + "FROM %2$s a, %3$s b \n"
+ + "WHERE (%4$s) \n" + "AND ( \n" + "%6$s \n" + ")", tableToCreate
+ .toString(), tableContainingUpdates.toString(), tableToBeUpdated
+ .toString(), generateUpdateKeyColumnsWhereClauseFragment(
+ joinColumnNames, "a", "b", sqlJoin), columnClause.toString(),
+ rowEqualityClause.toString(), parallelizationEnabled ? "PARALLEL"
+ : "", tableToCreateStorageClause);
+
+ LOG.info(String.format("The SQL to create the changes-table is:\n%s", sql));
+
+ Statement statement = connection.createStatement();
+
+ long start = System.nanoTime();
+ statement.executeUpdate(sql);
+ double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
+ LOG.info(String.format("Time spent creating change-table: %f sec.",
+ timeInSec));
+
+ String indexName = tableToCreate.toString().replaceAll("CHG", "IDX");
+ start = System.nanoTime();
+ statement.execute(String.format("CREATE INDEX %s ON %s (%s)", indexName,
+ tableToCreate.toString(), OraOopUtilities
+ .stringArrayToCSV(joinColumnNames)));
+ timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
+ LOG.info(String.format("Time spent creating change-table index: %f sec.",
+ timeInSec));
+
+ int changeTableRowCount = 0;
+
+ ResultSet resultSet =
+ statement.executeQuery(String.format("select count(*) from %s",
+ tableToCreate.toString()));
+ resultSet.next();
+ changeTableRowCount = resultSet.getInt(1);
+ LOG.info(String.format("The number of rows in the change-table is: %d",
+ changeTableRowCount));
+
+ statement.close();
+ return changeTableRowCount;
+ }
+
+ public static void deleteRowsFromTable(Connection connection,
+ OracleTable tableToDeleteRowsFrom,
+ OracleTable tableContainingRowsToDelete, String[] joinColumnNames,
+ boolean parallelizationEnabled) throws SQLException {
+
+ String sql =
+ String.format("DELETE %4$s FROM %1$s a \n" + "WHERE EXISTS ( \n"
+ + "SELECT null FROM %3$s b WHERE \n" + "%2$s)",
+ tableToDeleteRowsFrom.toString(),
+ generateUpdateKeyColumnsWhereClauseFragment(joinColumnNames, "a",
+ "b"), tableContainingRowsToDelete.toString(),
+ parallelizationEnabled ? "/*+ parallel */" : "");
+
+ LOG.info(String.format("The SQL to delete rows from a table:\n%s", sql));
+
+ Statement statement = connection.createStatement();
+ int rowsAffected = statement.executeUpdate(sql);
+
+ LOG.info(String.format(
+ "The number of rows affected by the delete SQL was: %d", rowsAffected));
+
+ statement.close();
+ }
+
+ public static void insertRowsIntoExportTable(Connection connection,
+ OracleTable tableToInsertRowsInto,
+ OracleTable tableContainingRowsToInsert, Object oraOopSysDate,
+ int oraOopMapperId, boolean parallelizationEnabled) throws SQLException {
+
+ List<String> columnNames =
+ getTableColumnNames(connection, tableToInsertRowsInto);
+
+ StringBuilder columnClause =
+ new StringBuilder(2 + (2 * columnNames.size()));
+ for (int idx = 0; idx < columnNames.size(); idx++) {
+ if (idx > 0) {
+ columnClause.append(",");
+ }
+ columnClause.append(columnNames.get(idx));
+ }
+
+ String columnsClause = columnClause.toString();
+
+ String sql =
+ String.format("insert %4$s \n" + "into %1$s \n" + "select \n"
+ + "%2$s \n" + "from %3$s", tableToInsertRowsInto.toString(),
+ columnsClause, tableContainingRowsToInsert.toString(),
+ parallelizationEnabled ? "/*+ append parallel */" : "");
+
+ LOG.info(String.format(
+ "The SQL to insert rows from one table into another:\n%s", sql));
+
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ resultSet.close();
+ statement.close();
+ }
+
+ public static boolean doesIndexOnColumnsExist(Connection connection,
+ OracleTable oracleTable, String[] columnNames) throws SQLException {
+
+ // Attempts to find an index on the table that *starts* with the N column
+ // names passed.
+ // These columns can be in any order.
+
+ String columnNamesInClause =
+ OraOopUtilities.stringArrayToCSV(columnNames, "'");
+
+ String sql =
+ String.format("SELECT b.index_name, \n"
+ + " sum(case when b.column_name in (%1$s) then 1 end) num_cols \n"
+ + "FROM dba_indexes a, dba_ind_columns b \n" + "WHERE \n"
+ + "a.owner = b.index_owner \n"
+ + "AND a.index_name = b.index_name \n" + "AND b.table_owner = ? \n"
+ + "AND b.table_name = ? \n" + "AND a.status = 'VALID' \n"
+ + "AND b.column_position <= ? \n" + "GROUP BY b.index_name \n"
+ + "HAVING sum(case when b.column_name in (%1$s) then 1 end) = ?",
+ columnNamesInClause);
+
+ PreparedStatement statement = connection.prepareStatement(sql);
+ statement.setString(1, oracleTable.getSchema());
+ statement.setString(2, oracleTable.getName());
+ statement.setInt(3, columnNames.length);
+ statement.setInt(4, columnNames.length);
+
+ LOG.debug(String.format("SQL to find an index on the columns %s:\n%s",
+ columnNamesInClause, sql));
+
+ ResultSet resultSet = statement.executeQuery();
+
+ boolean result = false;
+ if (resultSet.next()) {
+ LOG.debug(String
+ .format(
+ "The table %s has an index named %s starting with the column(s) "
+ + "%s (in any order).",
+ oracleTable.toString(), resultSet.getString("index_name"),
+ columnNamesInClause));
+ result = true;
+ }
+
+ resultSet.close();
+ statement.close();
+
+ return result;
+ }
+
+ private static String generateUpdateKeyColumnsWhereClauseFragment(
+ String[] joinColumnNames, String prefix1, String prefix2) {
+
+ return generateUpdateKeyColumnsWhereClauseFragment(joinColumnNames,
+ prefix1, prefix2, "");
+ }
+
+ private static String generateUpdateKeyColumnsWhereClauseFragment(
+ String[] joinColumnNames, String prefix1, String prefix2,
+ String sqlJoinOperator) {
+
+ StringBuilder result = new StringBuilder();
+ for (int idx = 0; idx < joinColumnNames.length; idx++) {
+ String joinColumnName = joinColumnNames[idx];
+ if (idx > 0) {
+ result.append(" AND ");
+ }
+ result.append(String.format("%1$s.%3$s = %2$s.%3$s %4$s", prefix1,
+ prefix2, joinColumnName, sqlJoinOperator));
+ }
+ return result.toString();
+ }
+
+ public static String getCurrentSchema(Connection connection)
+ throws SQLException {
+ String sql = "SELECT SYS_CONTEXT('USERENV','CURRENT_SCHEMA') FROM DUAL";
+
+ PreparedStatement statement = connection.prepareStatement(sql);
+
+ ResultSet resultSet = statement.executeQuery();
+
+ resultSet.next();
+ String result = resultSet.getString(1);
+
+ resultSet.close();
+ statement.close();
+
+ LOG.info("Current schema is: " + result);
+
+ return result;
+ }
+
+ public static String getTableSchema(Connection connection, OracleTable table)
+ throws SQLException {
+ if (table.getSchema() == null || table.getSchema().isEmpty()) {
+ return getCurrentSchema(connection);
+ } else {
+ return table.getSchema();
+ }
+ }
+
+ public static long getCurrentScn(Connection connection) throws SQLException {
+ String sql = "SELECT current_scn FROM v$database";
+ PreparedStatement statement = connection.prepareStatement(sql);
+ ResultSet resultSet = statement.executeQuery();
+
+ resultSet.next();
+ long result = resultSet.getLong(1);
+ resultSet.close();
+ statement.close();
+
+ return result;
+ }
+
+ public static void setLongAtName(PreparedStatement statement,
+ String bindName, long bindValue) throws SQLException {
+ try {
+ methSetLongAtName.invoke(statement, bindName, bindValue);
+ } catch (Exception e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ } else {
+ throw new RuntimeException("Could not set bind variable", e);
+ }
+ }
+ }
+
+ public static void setBigDecimalAtName(PreparedStatement statement,
+ String bindName, BigDecimal bindValue) throws SQLException {
+ try {
+ methSetBigDecimalAtName.invoke(statement, bindName, bindValue);
+ } catch (Exception e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ } else {
+ throw new RuntimeException("Could not set bind variable", e);
+ }
+ }
+ }
+
+ public static void setStringAtName(PreparedStatement statement,
+ String bindName, String bindValue) throws SQLException {
+ try {
+ methSetStringAtName.invoke(statement, bindName, bindValue);
+ } catch (Exception e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ } else {
+ throw new RuntimeException("Could not set bind variable", e);
+ }
+ }
+ }
+
+ public static void setTimestampAtName(PreparedStatement statement,
+ String bindName, Timestamp bindValue) throws SQLException {
+ try {
+ methSetTimestampAtName.invoke(statement, bindName, bindValue);
+ } catch (Exception e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ } else {
+ throw new RuntimeException("Could not set bind variable", e);
+ }
+ }
+ }
+
+ public static void setBinaryDoubleAtName(PreparedStatement statement,
+ String bindName, double bindValue) throws SQLException {
+ try {
+ methSetBinaryDoubleAtName.invoke(statement, bindName, bindValue);
+ } catch (Exception e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ } else {
+ throw new RuntimeException("Could not set bind variable", e);
+ }
+ }
+ }
+
+ public static void setObjectAtName(PreparedStatement statement,
+ String bindName, Object bindValue) throws SQLException {
+ try {
+ methSetObjectAtName.invoke(statement, bindName, bindValue);
+ } catch (Exception e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ } else {
+ throw new RuntimeException("Could not set bind variable", e);
+ }
+ }
+ }
+
+ public static void setBinaryFloatAtName(PreparedStatement statement,
+ String bindName, float bindValue) throws SQLException {
+ try {
+ methSetBinaryFloatAtName.invoke(statement, bindName, bindValue);
+ } catch (Exception e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ } else {
+ throw new RuntimeException("Could not set bind variable", e);
+ }
+ }
+ }
+
+ public static void setIntAtName(PreparedStatement statement, String bindName,
+ int bindValue) throws SQLException {
+ try {
+ methSetIntAtName.invoke(statement, bindName, bindValue);
+ } catch (Exception e) {
+ if (e.getCause() instanceof SQLException) {
+ throw (SQLException) e.getCause();
+ } else {
+ throw new RuntimeException("Could not set bind variable", e);
+ }
+ }
+ }
+
+ public static int getOracleType(String name) {
+ Integer result = ORACLE_TYPES.get(name);
+ if (result == null) {
+ synchronized (ORACLE_TYPES) {
+ try {
+ result = oracleTypesClass.getField(name).getInt(null);
+ ORACLE_TYPES.put(name, result);
+ } catch (Exception e) {
+ throw new RuntimeException("Invalid oracle type specified", e);
+ }
+ }
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatBase.java b/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatBase.java
new file mode 100644
index 0000000..7c4d1c5
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatBase.java
@@ -0,0 +1,713 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AsyncSqlOutputFormat;
+import com.cloudera.sqoop.mapreduce.ExportOutputFormat;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+
+abstract class OraOopOutputFormatBase<K extends SqoopRecord, V> extends
+ ExportOutputFormat<K, V> {
+
+ private static final OraOopLog LOG = OraOopLogFactory
+ .getLog(OraOopOutputFormatBase.class);
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException,
+ InterruptedException {
+
+ super.checkOutputSpecs(context);
+
+ Configuration conf = context.getConfiguration();
+
+ // This code is now running on a Datanode in the Hadoop cluster, so we
+ // need to enable debug logging in this JVM...
+ OraOopUtilities.enableDebugLoggingIfRequired(conf);
+ }
+
+ protected int getMapperId(TaskAttemptContext context) {
+
+ return context.getTaskAttemptID().getTaskID().getId();
+ }
+
+ protected void applyMapperJdbcUrl(TaskAttemptContext context, int mapperId) {
+
+ Configuration conf = context.getConfiguration();
+
+ // Retrieve the JDBC URL that should be used by this mapper.
+ // We achieve this by modifying the JDBC URL property in the
+ // configuration, prior to the OraOopDBRecordWriter's (ancestral)
+ // constructor using the configuration to establish a connection
+ // to the database - via DBConfiguration.getConnection()...
+ String mapperJdbcUrlPropertyName =
+ OraOopUtilities.getMapperJdbcUrlPropertyName(mapperId, conf);
+
+ // Get this mapper's JDBC URL
+ String mapperJdbcUrl = conf.get(mapperJdbcUrlPropertyName, null);
+
+ LOG.debug(String.format("Mapper %d has a JDBC URL of: %s", mapperId,
+ mapperJdbcUrl == null ? "<null>" : mapperJdbcUrl));
+
+ if (mapperJdbcUrl != null) {
+ conf.set(DBConfiguration.URL_PROPERTY, mapperJdbcUrl);
+ }
+ }
+
+ protected boolean canUseOracleAppendValuesHint(TaskAttemptContext context) {
+
+ Configuration conf = context.getConfiguration();
+
+ // Should we use the APPEND_VALUES Oracle hint?...
+ // (Yes, if this is Oracle 11.2 or above)...
+ OracleVersion oracleVersion =
+ new OracleVersion(conf.getInt(
+ OraOopConstants.ORAOOP_ORACLE_DATABASE_VERSION_MAJOR, 0), conf
+ .getInt(OraOopConstants.ORAOOP_ORACLE_DATABASE_VERSION_MINOR, 0),
+ 0, 0, "");
+
+ boolean result = oracleVersion.isGreaterThanOrEqualTo(11, 2, 0, 0);
+
+ // If there is a BINARY_DOUBLE or BINARY_FLOAT column, then we'll avoid
+ // using
+ // the APPEND_VALUES hint. If there is a NULL in the HDFS file, then we'll
+ // encounter
+ // "ORA-12838: cannot read/modify an object after modifying it in parallel"
+ // due to the JDBC driver issuing the INSERT statement twice to the database
+ // without a COMMIT in between (as was observed via WireShark).
+ // We're not sure why this happens - we just know how to avoid it.
+ if (result) {
+ boolean binaryDoubleColumnExists =
+ conf.getBoolean(OraOopConstants.TABLE_CONTAINS_BINARY_DOUBLE_COLUMN,
+ false);
+ boolean binaryFloatColumnExists =
+ conf.getBoolean(OraOopConstants.TABLE_CONTAINS_BINARY_FLOAT_COLUMN,
+ false);
+ if (binaryDoubleColumnExists || binaryFloatColumnExists) {
+ result = false;
+ LOG.info("The APPEND_VALUES Oracle hint will not be used for the "
+ + "INSERT SQL statement, as the Oracle table "
+ + "contains either a BINARY_DOUBLE or BINARY_FLOAT column.");
+ }
+ }
+
+ return result;
+ }
+
+ protected boolean allowUserToOverrideUseOfTheOracleAppendValuesHint(
+ TaskAttemptContext context, boolean useAppendValuesOracleHint) {
+
+ Configuration conf = context.getConfiguration();
+
+ boolean result = useAppendValuesOracleHint;
+
+ // Has the user forced the use of APPEND_VALUES either on or off?...
+ switch (OraOopUtilities.getOracleAppendValuesHintUsage(conf)) {
+
+ case OFF:
+ result = false;
+ LOG.debug(String
+ .format(
+ "Use of the APPEND_VALUES Oracle hint has been forced OFF. "
+ + "(It was %s to used).",
+ useAppendValuesOracleHint ? "going" : "not going"));
+ break;
+
+ case ON:
+ result = true;
+ LOG.debug(String
+ .format(
+ "Use of the APPEND_VALUES Oracle hint has been forced ON. "
+ + "(It was %s to used).",
+ useAppendValuesOracleHint ? "going" : "not going"));
+ break;
+
+ case AUTO:
+ LOG.debug(String.format("The APPEND_VALUES Oracle hint %s be used.",
+ result ? "will" : "will not"));
+ break;
+
+ default:
+ throw new RuntimeException("Invalid value for APPEND_VALUES.");
+ }
+ return result;
+ }
+
+ protected void updateBatchSizeInConfigurationToAllowOracleAppendValuesHint(
+ TaskAttemptContext context) {
+
+ Configuration conf = context.getConfiguration();
+
+ // If using APPEND_VALUES, check the batch size and commit frequency...
+ int originalBatchesPerCommit =
+ conf.getInt(AsyncSqlOutputFormat.STATEMENTS_PER_TRANSACTION_KEY, 0);
+ if (originalBatchesPerCommit != 1) {
+ conf.setInt(AsyncSqlOutputFormat.STATEMENTS_PER_TRANSACTION_KEY, 1);
+ LOG.info(String
+ .format(
+ "The number of batch-inserts to perform per commit has been "
+ + "changed from %d to %d. This is in response "
+ + "to the Oracle APPEND_VALUES hint being used.",
+ originalBatchesPerCommit, 1));
+ }
+
+ int originalBatchSize =
+ conf.getInt(AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY, 0);
+ int minAppendValuesBatchSize =
+ OraOopUtilities.getMinAppendValuesBatchSize(conf);
+ if (originalBatchSize < minAppendValuesBatchSize) {
+ conf.setInt(AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY,
+ minAppendValuesBatchSize);
+ LOG.info(String
+ .format(
+ "The number of rows per batch-insert has been changed from %d "
+ + "to %d. This is in response "
+ + "to the Oracle APPEND_VALUES hint being used.",
+ originalBatchSize, minAppendValuesBatchSize));
+ }
+ }
+
+ abstract class OraOopDBRecordWriterBase extends
+ ExportOutputFormat<K, V>.ExportRecordWriter<K, V> {
+
+ protected OracleTable oracleTable; // <- If exporting into a partitioned
+ // table, this table will be unique for
+ // this mapper
+ private OracleTableColumns oracleTableColumns; // <- The columns in the
+ // table we're inserting rows
+ // into
+ protected int mapperId; // <- The index of this Hadoop mapper
+ protected boolean tableHasMapperRowNumberColumn; // <- Whether the export
+ // table contain the column
+ // ORAOOP_MAPPER_ROW
+ protected long mapperRowNumber; // <- The 1-based row number being processed
+ // by this mapper. It's inserted into the
+ // "ORAOOP_MAPPER_ROW" column
+
+ public OraOopDBRecordWriterBase(TaskAttemptContext context, int mapperId)
+ throws ClassNotFoundException, SQLException {
+
+ super(context);
+ this.mapperId = mapperId;
+ this.mapperRowNumber = 1;
+
+ Configuration conf = context.getConfiguration();
+
+ // Log any info that might be useful to us...
+ logBatchSettings();
+
+ // Connect to Oracle...
+ Connection connection = this.getConnection();
+
+ String thisOracleInstanceName =
+ OraOopOracleQueries.getCurrentOracleInstanceName(connection);
+ LOG.info(String.format(
+ "This record writer is connected to Oracle via the JDBC URL: \n"
+ + "\t\"%s\"\n" + "\tto the Oracle instance: \"%s\"", connection
+ .toString(), thisOracleInstanceName));
+
+ // Initialize the Oracle session...
+ OracleConnectionFactory.initializeOracleConnection(connection, conf);
+ connection.setAutoCommit(false);
+ }
+
+ protected void setOracleTableColumns(
+ OracleTableColumns newOracleTableColumns) {
+
+ this.oracleTableColumns = newOracleTableColumns;
+ this.tableHasMapperRowNumberColumn =
+ this.oracleTableColumns.findColumnByName(
+ OraOopConstants.COLUMN_NAME_EXPORT_MAPPER_ROW) != null;
+ }
+
+ protected OracleTableColumns getOracleTableColumns() {
+
+ return this.oracleTableColumns;
+ }
+
+ protected void getExportTableAndColumns(TaskAttemptContext context)
+ throws SQLException {
+
+ Configuration conf = context.getConfiguration();
+
+ String schema =
+ context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_OWNER);
+ String localTableName =
+ context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_NAME);
+
+ if (schema == null || schema.isEmpty() || localTableName == null
+ || localTableName.isEmpty()) {
+ throw new RuntimeException(
+ "Unable to recall the schema and name of the Oracle table "
+ + "being exported.");
+ }
+
+ this.oracleTable = new OracleTable(schema, localTableName);
+
+ setOracleTableColumns(OraOopOracleQueries.getTableColumns(this
+ .getConnection(), this.oracleTable, OraOopUtilities
+ .omitLobAndLongColumnsDuringImport(conf), OraOopUtilities
+ .recallSqoopJobType(conf), true // <- onlyOraOopSupportedTypes
+ , false // <- omitOraOopPseudoColumns
+ ));
+ }
+
+ @Override
+ protected PreparedStatement getPreparedStatement(
+ List<SqoopRecord> userRecords) throws SQLException {
+
+ Connection connection = this.getConnection();
+
+ String sql = getBatchSqlStatement();
+ LOG.debug(String.format("Prepared Statement SQL:\n%s", sql));
+
+ PreparedStatement statement;
+
+ try {
+ // Synchronize on connection to ensure this does not conflict
+ // with the operations in the update thread.
+ synchronized (connection) {
+ statement = connection.prepareStatement(sql);
+ }
+
+ configurePreparedStatement(statement, userRecords);
+ } catch (Exception ex) {
+ if (ex instanceof SQLException) {
+ throw (SQLException) ex;
+ } else {
+ LOG.error(String.format("The following error occurred during %s",
+ OraOopUtilities.getCurrentMethodName()), ex);
+ throw new SQLException(ex);
+ }
+ }
+
+ return statement;
+ }
+
+ @Override
+ protected boolean isBatchExec() {
+
+ return true;
+ }
+
+ @Override
+ protected String getInsertStatement(int numRows) {
+
+ throw new UnsupportedOperationException(String.format(
+ "%s should not be called, as %s operates in batch mode.",
+ OraOopUtilities.getCurrentMethodName(), this.getClass().getName()));
+ }
+
+ protected String getBatchInsertSqlStatement(String oracleHint) {
+
+ // String[] columnNames = this.getColumnNames();
+ StringBuilder sqlNames = new StringBuilder();
+ StringBuilder sqlValues = new StringBuilder();
+
+ /*
+ * NOTE: "this.oracleTableColumns" may contain a different list of columns
+ * than "this.getColumnNames()". This is because: (1)
+ * "this.getColumnNames()" includes columns with data-types that are not
+ * supported by OraOop. (2) "this.oracleTableColumns" includes any
+ * pseudo-columns that we've added to the export table (and don't exist in
+ * the HDFS file being read). For example, if exporting to a partitioned
+ * table (that OraOop created), there are two pseudo-columns we added to
+ * the table to identify the export job and the mapper.
+ */
+
+ int colCount = 0;
+ for (int idx = 0; idx < this.oracleTableColumns.size(); idx++) {
+ OracleTableColumn oracleTableColumn = this.oracleTableColumns.get(idx);
+ String columnName = oracleTableColumn.getName();
+
+ // column names...
+ if (colCount > 0) {
+ sqlNames.append("\n,");
+ }
+ sqlNames.append(columnName);
+
+ // column values...
+ if (colCount > 0) {
+ sqlValues.append("\n,");
+ }
+
+ String pseudoColumnValue =
+ generateInsertValueForPseudoColumn(columnName);
+
+ String bindVarName = null;
+
+ if (pseudoColumnValue != null) {
+ bindVarName = pseudoColumnValue;
+ } else if (oracleTableColumn.getOracleType() == OraOopOracleQueries
+ .getOracleType("STRUCT")) {
+ if (oracleTableColumn.getDataType().equals(
+ OraOopConstants.Oracle.URITYPE)) {
+ bindVarName =
+ String.format("urifactory.getUri(%s)",
+ columnNameToBindVariable(columnName));
+ }
+ } else if (getConf().getBoolean(
+ OraOopConstants.ORAOOP_MAP_TIMESTAMP_AS_STRING,
+ OraOopConstants.ORAOOP_MAP_TIMESTAMP_AS_STRING_DEFAULT)) {
+ if (oracleTableColumn.getOracleType() == OraOopOracleQueries
+ .getOracleType("DATE")) {
+ bindVarName =
+ String.format("to_date(%s, 'yyyy-mm-dd hh24:mi:ss')",
+ columnNameToBindVariable(columnName));
+ } else if (oracleTableColumn.getOracleType() == OraOopOracleQueries
+ .getOracleType("TIMESTAMP")) {
+ bindVarName =
+ String.format("to_timestamp(%s, 'yyyy-mm-dd hh24:mi:ss.ff')",
+ columnNameToBindVariable(columnName));
+ } else if (oracleTableColumn.getOracleType() == OraOopOracleQueries
+ .getOracleType("TIMESTAMPTZ")) {
+ bindVarName =
+ String.format(
+ "to_timestamp_tz(%s, 'yyyy-mm-dd hh24:mi:ss.ff TZR')",
+ columnNameToBindVariable(columnName));
+ } else if (oracleTableColumn.getOracleType() == OraOopOracleQueries
+ .getOracleType("TIMESTAMPLTZ")) {
+ bindVarName =
+ String.format(
+ "to_timestamp_tz(%s, 'yyyy-mm-dd hh24:mi:ss.ff TZR')",
+ columnNameToBindVariable(columnName));
+ }
+ }
+
+ if (bindVarName == null) {
+ bindVarName = columnNameToBindVariable(columnName);
+ }
+
+ sqlValues.append(bindVarName);
+
+ colCount++;
+ }
+
+ String sql =
+ String.format("insert %s into %s\n" + "(%s)\n" + "values\n"
+ + "(%s)\n", oracleHint, this.oracleTable.toString(), sqlNames
+ .toString(), sqlValues.toString());
+
+ LOG.info("Batch-Mode insert statement:\n" + sql);
+ return sql;
+ }
+
+ abstract void configurePreparedStatement(
+ PreparedStatement preparedStatement, List<SqoopRecord> userRecords)
+ throws SQLException;
+
+ private void setBindValueAtName(PreparedStatement statement,
+ String bindValueName, Object bindValue, OracleTableColumn column)
+ throws SQLException {
+ if (column.getOracleType()
+ == OraOopOracleQueries.getOracleType("NUMBER")) {
+ OraOopOracleQueries.setBigDecimalAtName(statement, bindValueName,
+ (BigDecimal) bindValue);
+ } else if (column.getOracleType() == OraOopOracleQueries
+ .getOracleType("VARCHAR")) {
+ OraOopOracleQueries.setStringAtName(statement, bindValueName,
+ (String) bindValue);
+ } else if (column.getOracleType() == OraOopOracleQueries
+ .getOracleType("TIMESTAMP")
+ || column.getOracleType() == OraOopOracleQueries
+ .getOracleType("TIMESTAMPTZ")
+ || column.getOracleType() == OraOopOracleQueries
+ .getOracleType("TIMESTAMPLTZ")) {
+ Object objValue = bindValue;
+ if (objValue instanceof Timestamp) {
+ Timestamp value = (Timestamp) objValue;
+ OraOopOracleQueries.setTimestampAtName(statement, bindValueName,
+ value);
+ } else {
+ String value = (String) objValue;
+
+ if (value == null || value.equalsIgnoreCase("null")) {
+ value = "";
+ }
+
+ OraOopOracleQueries.setStringAtName(statement, bindValueName, value);
+ }
+ } else if (column.getOracleType() == OraOopOracleQueries
+ .getOracleType("BINARY_DOUBLE")) {
+ Double value = (Double) bindValue;
+ if (value != null) {
+ OraOopOracleQueries.setBinaryDoubleAtName(statement, bindValueName,
+ value);
+ } else {
+ OraOopOracleQueries.setObjectAtName(statement, bindValueName, null);
+ }
+ } else if (column.getOracleType() == OraOopOracleQueries
+ .getOracleType("BINARY_FLOAT")) {
+ Float value = (Float) bindValue;
+ if (value != null) {
+ OraOopOracleQueries.setBinaryFloatAtName(statement, bindValueName,
+ value);
+ } else {
+ OraOopOracleQueries.setObjectAtName(statement, bindValueName, null);
+ }
+ } else if (column.getOracleType() == OraOopOracleQueries
+ .getOracleType("STRUCT")) { // <- E.g. URITYPE
+ if (column.getDataType().equals(OraOopConstants.Oracle.URITYPE)) {
+ String value = (String) bindValue;
+ OraOopOracleQueries.setStringAtName(statement, bindValueName, value);
+ } else {
+ String msg =
+ String.format(
+ "%s needs to be updated to cope with the data-type: %s "
+ + "where the Oracle data_type is \"%s\".",
+ OraOopUtilities.getCurrentMethodName(), column.getDataType(),
+ column.getOracleType());
+ LOG.error(msg);
+ throw new UnsupportedOperationException(msg);
+ }
+ } else {
+ // LOB data-types are currently not supported during
+ // a Sqoop Export.
+ // JIRA: SQOOP-117
+ // OraOopConstants.SUPPORTED_EXPORT_ORACLE_DATA_TYPES_CLAUSE
+ // will already have excluded all LOB columns.
+
+ // case oracle.jdbc.OracleTypes.CLOB:
+ // {
+ // oracle.sql.CLOB clob = new
+ // oracle.sql.CLOB(connection);
+ // Object value = fieldMap.get(colName);
+ // //clob.set
+ // statement.setCLOBAtName(bindValueName, clob);
+ // break;
+ // }
+ String msg =
+ String.format(
+ "%s may need to be updated to cope with the data-type: %s",
+ OraOopUtilities.getCurrentMethodName(), column.getOracleType());
+ LOG.debug(msg);
+
+ OraOopOracleQueries
+ .setObjectAtName(statement, bindValueName, bindValue);
+ }
+ }
+
+ protected void configurePreparedStatementColumns(
+ PreparedStatement statement, Map<String, Object> fieldMap)
+ throws SQLException {
+
+ String bindValueName;
+
+ if (this.tableHasMapperRowNumberColumn) {
+ bindValueName =
+ columnNameToBindVariable(
+ OraOopConstants.COLUMN_NAME_EXPORT_MAPPER_ROW).replaceFirst(
+ ":", "");
+ try {
+ OraOopOracleQueries.setLongAtName(statement, bindValueName,
+ this.mapperRowNumber);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ this.mapperRowNumber++;
+ }
+
+ Iterator<String> columnNameIterator = fieldMap.keySet().iterator();
+ while (columnNameIterator.hasNext()) {
+ String colName = columnNameIterator.next();
+ bindValueName = columnNameToBindVariable(colName).replaceFirst(":", "");
+
+ OracleTableColumn oracleTableColumn =
+ oracleTableColumns.findColumnByName(colName);
+ setBindValueAtName(statement, bindValueName, fieldMap.get(colName),
+ oracleTableColumn);
+ }
+ statement.addBatch();
+ }
+
+ abstract String getBatchSqlStatement();
+
+ protected String columnNameToBindVariable(String columnName) {
+
+ return ":" + columnName;
+ }
+
+ @Override
+ public void write(K key, V value) throws InterruptedException, IOException {
+
+ try {
+ super.write(key, value);
+ } catch (IOException ex) {
+ // This IOException may contain a SQLException that occurred
+ // during the batch insert...
+ showSqlBatchErrorDetails(ex);
+ throw ex;
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+
+ try {
+ super.close(context);
+ } catch (IOException ex) {
+ // This IOException may contain a SQLException that occurred
+ // during the batch insert...
+ showSqlBatchErrorDetails(ex);
+ throw ex;
+ }
+ }
+
+ private void showSqlBatchErrorDetails(Exception exception) {
+
+ if (OraOopUtilities.oracleSessionHasBeenKilled(exception)) {
+ LOG.info("\n*********************************************************"
+ + "\nThe Oracle session in use has been killed by a 3rd party."
+ + "\n*********************************************************");
+ }
+
+ /*
+ * Unfortunately, BatchUpdateException.getUpdateCounts() only returns
+ * information about UPDATE statements (not INSERT) statements. Since
+ * we're only performing INSERT statements, there's no extra information
+ * we can provide to the user at this point.
+ */
+
+ // if(exception == null)
+ // return;
+ //
+ // if(exception instanceof BatchUpdateException) {
+ // BatchUpdateException ex = (BatchUpdateException)exception;
+ //
+ // int[] updateCounts = ex.getUpdateCounts();
+ // LOG.error("The number of successful updates was: " +
+ // updateCounts.length);
+ //
+ // // Recurse for chained exceptions...
+ // SQLException nextEx = ex.getNextException();
+ // while(nextEx != null) {
+ // showSqlBatchErrorDetails(nextEx);
+ // nextEx = nextEx.getNextException();
+ // }
+ // }
+ //
+ // // Recurse for nested exceptions...
+ // Throwable cause = exception.getCause();
+ // if(cause instanceof Exception)
+ // showSqlBatchErrorDetails((Exception)cause);
+
+ }
+
+ protected Object getJobSysDate(TaskAttemptContext context) {
+
+ Configuration conf = context.getConfiguration();
+ return OraOopUtilities.recallOracleDateTime(conf,
+ OraOopConstants.ORAOOP_JOB_SYSDATE);
+ }
+
+ protected OracleTable createUniqueMapperTable(TaskAttemptContext context)
+ throws SQLException {
+
+ // This mapper inserts data into a unique table before either:
+ // - exchanging it into a subpartition of the 'real' export table; or
+ // - merging it into the 'real' export table.
+
+ Configuration conf = context.getConfiguration();
+
+ Object sysDateTime = getJobSysDate(context);
+
+ String schema = conf.get(OraOopConstants.ORAOOP_TABLE_OWNER);
+ String localTableName = conf.get(OraOopConstants.ORAOOP_TABLE_NAME);
+
+ OracleTable templateTable = new OracleTable(schema, localTableName);
+
+ OracleTable mapperTable =
+ OraOopUtilities.generateExportTableMapperTableName(this.mapperId,
+ sysDateTime, null);
+
+ // If this mapper is being reattempted in response to a failure, we need
+ // to delete the
+ // temporary table created by the previous attempt...
+ OraOopOracleQueries.dropTable(this.getConnection(), mapperTable);
+
+ String temporaryTableStorageClause =
+ OraOopUtilities.getTemporaryTableStorageClause(conf);
+
+ OraOopOracleQueries.createExportTableForMapper(this.getConnection(),
+ mapperTable, temporaryTableStorageClause, templateTable
+ , false); // <- addOraOopPartitionColumns
+
+ LOG.debug(String.format("Created temporary mapper table %s", mapperTable
+ .toString()));
+
+ return mapperTable;
+ }
+
+ protected String generateInsertValueForPseudoColumn(String columnName) {
+
+ if (columnName
+ .equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_PARTITION)) {
+
+ String partitionValueStr =
+ this.getConf().get(
+ OraOopConstants.ORAOOP_EXPORT_PARTITION_DATE_VALUE, null);
+ if (partitionValueStr == null) {
+ throw new RuntimeException(
+ "Unable to recall the value of the partition date-time.");
+ }
+
+ return String.format("to_date('%s', '%s')", partitionValueStr,
+ OraOopConstants.ORAOOP_EXPORT_PARTITION_DATE_FORMAT);
+ }
+
+ if (columnName
+ .equalsIgnoreCase(OraOopConstants.COLUMN_NAME_EXPORT_SUBPARTITION)) {
+ return Integer.toString(this.mapperId);
+ }
+
+ return null;
+ }
+
+ protected void logBatchSettings() {
+
+ LOG.info(String.format("The number of rows per batch is: %d",
+ this.rowsPerStmt));
+
+ int stmtsPerTx =
+ this.getConf().getInt(
+ AsyncSqlOutputFormat.STATEMENTS_PER_TRANSACTION_KEY,
+ AsyncSqlOutputFormat.DEFAULT_STATEMENTS_PER_TRANSACTION);
+
+ LOG.info(String.format("The number of batches per commit is: %d",
+ stmtsPerTx));
+ }
+
+ }
+
+}