You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ve...@apache.org on 2014/06/29 18:03:15 UTC
[4/7] SQOOP-1287: Add high performance Oracle connector into Sqoop
(David Robson via Venkat Ranganathan)
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatInsert.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatInsert.java b/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatInsert.java
new file mode 100644
index 0000000..d5eebf4
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatInsert.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Insert into an Oracle table based on emitted keys.
+ */
+public class OraOopOutputFormatInsert<K extends SqoopRecord, V> extends
+ OraOopOutputFormatBase<K, V> {
+
+ private static final OraOopLog LOG = OraOopLogFactory
+ .getLog(OraOopOutputFormatInsert.class);
+
+ /**
+ * Type of insert to use - direct or partition exchange load.
+ */
+ public enum InsertMode {
+ DirectInsert, ExchangePartition
+ }
+
+ @Override
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ OraOopUtilities.checkJavaSecurityEgd();
+ Configuration conf = context.getConfiguration();
+
+ int mapperId = this.getMapperId(context);
+ applyMapperJdbcUrl(context, mapperId);
+
+ // Is each mapper inserting rows into a unique table?...
+ InsertMode insertMode = OraOopUtilities.getExportInsertMode(conf);
+
+ // Should we use the APPEND_VALUES Oracle hint?...
+ boolean useAppendValuesOracleHint = false;
+ if (insertMode == InsertMode.ExchangePartition) {
+ // NB: "Direct inserts" cannot utilize APPEND_VALUES, otherwise Oracle
+ // will serialize
+ // the N mappers, causing a lot of lock contention.
+ useAppendValuesOracleHint = this.canUseOracleAppendValuesHint(context);
+ }
+
+ // Has the user forced the use of APPEND_VALUES either on or off?...
+ useAppendValuesOracleHint =
+ allowUserToOverrideUseOfTheOracleAppendValuesHint(context,
+ useAppendValuesOracleHint);
+
+ // If using APPEND_VALUES, check the batch size and commit frequency...
+ if (useAppendValuesOracleHint) {
+ updateBatchSizeInConfigurationToAllowOracleAppendValuesHint(context);
+ }
+
+ // Create the Record Writer...
+ OraOopDBRecordWriterInsert result = null;
+ try {
+ result =
+ new OraOopDBRecordWriterInsert(context, mapperId, insertMode,
+ useAppendValuesOracleHint);
+ } catch (NoClassDefFoundError ex) {
+ throw new IOException(String.format(
+ "Unable to create an instance of OraOopDBRecordWriterInsert.\n"
+ + "The classpath is:\n%s", OraOopUtilities.getJavaClassPath()),
+ ex);
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+
+ try {
+ result.getExportTableAndColumns(context);
+ } catch (SQLException ex) {
+ throw new IOException(ex);
+ }
+
+ return result;
+ }
+
+ /**
+ * Insert into an Oracle table based on emitted keys.
+ */
+ public class OraOopDBRecordWriterInsert extends OraOopDBRecordWriterBase {
+
+ private String sqlStatement; // <- The SQL used when inserting batches of
+ // rows into the Oracle table
+ private InsertMode insertMode; // <- The modus operandi of this class. i.e.
+ // Whether we insert into the Oracle table
+ // directly, or insert data into a separate
+ // table and then perform an EXCHANGE
+ // PARTITION statement.
+ private boolean useAppendValuesOracleHint; // <- Whether to use the
+ // " /*+APPEND_VALUES*/ " hint
+ // within the Oracle SQL
+ // statement we generate
+ private String subPartitionName; // <- The name of the subpartition in the
+ // "main table" that this mappers unique
+ // table will be exchanged with
+
+ public OraOopDBRecordWriterInsert(TaskAttemptContext context, int mapperId,
+ InsertMode insertMode, boolean useAppendValuesOracleHint)
+ throws ClassNotFoundException, SQLException {
+
+ super(context, mapperId);
+ this.insertMode = insertMode;
+ this.useAppendValuesOracleHint = useAppendValuesOracleHint;
+ }
+
+ @Override
+ protected void getExportTableAndColumns(TaskAttemptContext context)
+ throws SQLException {
+
+ Configuration conf = context.getConfiguration();
+
+ switch (this.insertMode) {
+
+ case DirectInsert:
+ super.getExportTableAndColumns(context);
+ break;
+
+ case ExchangePartition:
+ // This mapper inserts data into a unique table before exchanging it
+ // into
+ // a subpartition of the 'real' export table...
+
+ this.oracleTable = createUniqueMapperTable(context);
+ setOracleTableColumns(OraOopOracleQueries.getTableColumns(this
+ .getConnection(), this.oracleTable, OraOopUtilities
+ .omitLobAndLongColumnsDuringImport(conf), OraOopUtilities
+ .recallSqoopJobType(conf), true // <- onlyOraOopSupportedTypes
+ , false) // <- omitOraOopPseudoColumns
+ );
+
+ this.subPartitionName =
+ OraOopUtilities.generateExportTableSubPartitionName(
+ this.mapperId, this.getJobSysDate(context), conf);
+
+ break;
+
+ default:
+ throw new RuntimeException(String.format(
+ "Update %s to cater for the insertMode \"%s\".", OraOopUtilities
+ .getCurrentMethodName(), this.insertMode.toString()));
+ }
+
+ }
+
+ @Override
+ public void closeConnection(TaskAttemptContext context)
+ throws SQLException {
+
+ // If this mapper is inserting data into a unique table, we'll now
+ // move this data into the main export table...
+ if (this.insertMode == InsertMode.ExchangePartition) {
+
+ // Perform an "exchange subpartition" operation on the "main table"
+ // to convert this table into a subpartition of the "main table"...
+ exchangePartitionUniqueMapperTableDataIntoMainExportTable(context);
+
+ LOG.debug(String.format("Dropping temporary mapper table %s",
+ this.oracleTable.toString()));
+ OraOopOracleQueries.dropTable(this.getConnection(), this.oracleTable);
+ }
+
+ super.closeConnection(context);
+ }
+
+ private void exchangePartitionUniqueMapperTableDataIntoMainExportTable(
+ TaskAttemptContext context) throws SQLException {
+
+ String schema =
+ context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_OWNER);
+ String localTableName =
+ context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_NAME);
+ OracleTable mainTable = new OracleTable(schema, localTableName);
+
+ try {
+ long start = System.nanoTime();
+
+ OraOopOracleQueries.exchangeSubpartition(this.getConnection(),
+ mainTable, this.subPartitionName, this.oracleTable);
+
+ double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
+ LOG.info(String
+ .format(
+ "Time spent performing an \"exchange subpartition with "
+ + "table\": %f sec.",
+ timeInSec));
+ } catch (SQLException ex) {
+ throw new SQLException(
+ String
+ .format(
+ "Unable to perform an \"exchange subpartition\" operation "
+ + "for the table %s, for the subpartition named "
+ + "\"%s\" with the table named \"%s\".",
+ mainTable.toString(), this.subPartitionName,
+ this.oracleTable.toString()), ex);
+ }
+ }
+
+ @Override
+ protected String getBatchSqlStatement() {
+
+ if (sqlStatement == null) {
+ this.sqlStatement =
+ getBatchInsertSqlStatement(this.useAppendValuesOracleHint
+ ? "/*+APPEND_VALUES*/" : "");
+ }
+
+ return this.sqlStatement;
+ }
+
+ @Override
+ void configurePreparedStatement(PreparedStatement statement,
+ List<SqoopRecord> userRecords) throws SQLException {
+
+ Map<String, Object> fieldMap;
+ try {
+ for (SqoopRecord record : userRecords) {
+ fieldMap = record.getFieldMap();
+
+ configurePreparedStatementColumns(statement, fieldMap);
+ }
+
+ } catch (Exception ex) {
+ if (ex instanceof SQLException) {
+ throw (SQLException) ex;
+ } else {
+ LOG.error(String.format("The following error occurred during %s",
+ OraOopUtilities.getCurrentMethodName()), ex);
+ throw new SQLException(ex);
+ }
+ }
+
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatUpdate.java b/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatUpdate.java
new file mode 100644
index 0000000..a33768f
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopOutputFormatUpdate.java
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.manager.oracle.OraOopOracleQueries.
+ CreateExportChangesTableOptions;
+
+/**
+ * Update an Oracle table based on emitted keys.
+ */
+public class OraOopOutputFormatUpdate<K extends SqoopRecord, V> extends
+ OraOopOutputFormatBase<K, V> {
+
+ private static final OraOopLog LOG = OraOopLogFactory
+ .getLog(OraOopOutputFormatUpdate.class);
+
+ /**
+ * Type of export - straight update or merge (update-insert).
+ */
+ public enum UpdateMode {
+ Update, Merge
+ }
+
+ private enum ExportTableUpdateTechnique {
+ ReInsertUpdatedRows, ReInsertUpdatedRowsAndNewRows, UpdateSql, MergeSql
+ }
+
+ @Override
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ OraOopUtilities.checkJavaSecurityEgd();
+ Configuration conf = context.getConfiguration();
+
+ // Get the unique JDBC URL to use for this mapper and update the
+ // configuration property
+ // so that the URL is actually used...
+ int mapperId = this.getMapperId(context);
+ applyMapperJdbcUrl(context, mapperId);
+
+ UpdateMode updateMode = OraOopUtilities.getExportUpdateMode(conf);
+
+ boolean useAppendValuesOracleHint = false;
+
+ if (updateMode == UpdateMode.Merge || updateMode == UpdateMode.Update) {
+ // Should we use the APPEND_VALUES Oracle hint?...
+ useAppendValuesOracleHint = this.canUseOracleAppendValuesHint(context);
+ }
+
+ // Has the user forced the use of APPEND_VALUES either on or off?...
+ useAppendValuesOracleHint =
+ allowUserToOverrideUseOfTheOracleAppendValuesHint(context,
+ useAppendValuesOracleHint);
+
+ // If using APPEND_VALUES, check the batch size and commit frequency...
+ if (useAppendValuesOracleHint) {
+ updateBatchSizeInConfigurationToAllowOracleAppendValuesHint(context);
+ }
+
+ // Create the Record Writer...
+ OraOopDBRecordWriterUpdate result = null;
+ try {
+ result =
+ new OraOopDBRecordWriterUpdate(context, mapperId, updateMode,
+ useAppendValuesOracleHint);
+ } catch (NoClassDefFoundError ex) {
+ throw new IOException(String.format(
+ "Unable to create an instance of OraOopDBRecordWriterUpdate.\n"
+ + "The classpath is:\n%s", OraOopUtilities.getJavaClassPath()),
+ ex);
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+
+ try {
+ result.getExportTableAndColumns(context);
+ } catch (SQLException ex) {
+ throw new IOException(ex);
+ }
+
+ return result;
+ }
+
+ /**
+ * Update an Oracle table based on emitted keys.
+ */
+ public class OraOopDBRecordWriterUpdate extends OraOopDBRecordWriterBase {
+
+ private String sqlStatement; // <- The SQL used when updating batches of
+ // rows into the Oracle table
+ private String[] updateColumnNames; // <- The name of the column(s) used to
+ // match a row in the HDFS file to a row
+ // in the Oracle table. i.e. What as
+ // specified in the "--update-key" sqoop
+ // argument.
+ private UpdateMode updateMode; // <- The modus operandi of this class. i.e.
+ // Whether we update the Oracle table
+ // directly, or insert data into a separate
+ // table and then apply a SQL MERGE
+ // statement.
+ private boolean useAppendValuesOracleHint; // <- Whether to use the
+ // " /*+APPEND_VALUES*/ " hint
+ // within the Oracle SQL
+ // statement we generate
+ private boolean tableHasOraOopPartitions; // <- Indicates whether the export
+ // table has partitions that were
+ // creted by OraOop
+ private long numberOfRowsSkipped; // <- The number of rows encountered
+ // during configurePreparedStatement()
+ // that had a NULL value for (one of) the
+ // update columns. This row was therefore
+ // skipped.
+
+ public OraOopDBRecordWriterUpdate(TaskAttemptContext context, int mapperId,
+ UpdateMode updateMode, boolean useAppendValuesOracleHint)
+ throws ClassNotFoundException, SQLException {
+
+ super(context, mapperId);
+
+ Configuration conf = context.getConfiguration();
+
+ this.updateColumnNames =
+ OraOopUtilities.getExportUpdateKeyColumnNames(conf);
+ this.useAppendValuesOracleHint = useAppendValuesOracleHint;
+ this.updateMode = updateMode;
+ this.tableHasOraOopPartitions =
+ conf.getBoolean(OraOopConstants.EXPORT_TABLE_HAS_ORAOOP_PARTITIONS,
+ false);
+ }
+
+ @Override
+ protected void getExportTableAndColumns(TaskAttemptContext context)
+ throws SQLException {
+
+ Configuration conf = context.getConfiguration();
+
+ this.oracleTable = createUniqueMapperTable(context);
+ setOracleTableColumns(OraOopOracleQueries.getTableColumns(this
+ .getConnection(), this.oracleTable, OraOopUtilities
+ .omitLobAndLongColumnsDuringImport(conf), OraOopUtilities
+ .recallSqoopJobType(conf), true // <- onlyOraOopSupportedTypes
+ , false) // <- omitOraOopPseudoColumns
+ );
+ }
+
+ @Override
+ public void closeConnection(TaskAttemptContext context)
+ throws SQLException {
+
+ try {
+
+ if (this.numberOfRowsSkipped > 0) {
+ LOG.warn(String.format(
+ "%d records were skipped due to a NULL value within one of the "
+ + "update-key column(s).\nHaving a NULL value prevents a record "
+ + "from being able to be matched to a row in the Oracle table.",
+ this.numberOfRowsSkipped));
+ }
+
+ // Now update the "main" export table with data that was inserted into
+ // this mapper's table...
+ updateMainExportTableFromUniqueMapperTable(context,
+ this.updateColumnNames);
+
+ LOG.debug(String.format("Dropping temporary mapper table %s",
+ this.oracleTable.toString()));
+ OraOopOracleQueries.dropTable(this.getConnection(), this.oracleTable);
+ } finally {
+ super.closeConnection(context);
+ }
+ }
+
+ private ExportTableUpdateTechnique getExportTableUpdateTechnique() {
+
+ ExportTableUpdateTechnique result;
+
+ if (this.tableHasOraOopPartitions) {
+ switch (this.updateMode) {
+
+ case Update:
+ result = ExportTableUpdateTechnique.ReInsertUpdatedRows;
+ break;
+
+ case Merge:
+ result = ExportTableUpdateTechnique.ReInsertUpdatedRowsAndNewRows;
+ break;
+
+ default:
+ throw new RuntimeException(String.format(
+ "Update %s to cater for the updateMode \"%s\".",
+ OraOopUtilities.getCurrentMethodName(), this.updateMode
+ .toString()));
+ }
+ } else {
+ switch (this.updateMode) {
+
+ case Update:
+ result = ExportTableUpdateTechnique.UpdateSql;
+ break;
+
+ case Merge:
+ result = ExportTableUpdateTechnique.MergeSql;
+ break;
+
+ default:
+ throw new RuntimeException(String.format(
+ "Update %s to cater for the updateMode \"%s\".",
+ OraOopUtilities.getCurrentMethodName(), this.updateMode
+ .toString()));
+ }
+ }
+
+ return result;
+ }
+
+ private void updateMainExportTableFromUniqueMapperTable(
+ TaskAttemptContext context, String[] mergeColumnNames)
+ throws SQLException {
+
+ String schema =
+ context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_OWNER);
+ String localTableName =
+ context.getConfiguration().get(OraOopConstants.ORAOOP_TABLE_NAME);
+ OracleTable targetTable = new OracleTable(schema, localTableName);
+
+ Object sysDateTime = getJobSysDate(context);
+ OracleTable changesTable =
+ OraOopUtilities.generateExportTableMapperTableName(Integer
+ .toString(this.mapperId)
+ + "_CHG", sysDateTime, null);
+
+ OraOopOracleQueries.CreateExportChangesTableOptions changesTableOptions;
+ boolean parallelizationEnabled =
+ OraOopUtilities.enableOracleParallelProcessingDuringExport(context
+ .getConfiguration());
+
+ ExportTableUpdateTechnique exportTableUpdateTechnique =
+ getExportTableUpdateTechnique();
+ switch (exportTableUpdateTechnique) {
+
+ case ReInsertUpdatedRows:
+ case UpdateSql:
+ changesTableOptions =
+ CreateExportChangesTableOptions.OnlyRowsThatDiffer;
+ break;
+
+ case ReInsertUpdatedRowsAndNewRows:
+ case MergeSql:
+ changesTableOptions =
+ CreateExportChangesTableOptions.RowsThatDifferPlusNewRows;
+ break;
+
+ default:
+ throw new RuntimeException(String.format(
+ "Update %s to cater for the ExportTableUpdateTechnique \"%s\".",
+ OraOopUtilities.getCurrentMethodName(),
+ exportTableUpdateTechnique.toString()));
+ }
+
+ String temporaryTableStorageClause =
+ OraOopUtilities.getTemporaryTableStorageClause(context
+ .getConfiguration());
+
+ try {
+ int changeTableRowCount =
+ OraOopOracleQueries.createExportChangesTable(this.getConnection(),
+ changesTable, temporaryTableStorageClause, this.oracleTable,
+ targetTable, this.updateColumnNames, changesTableOptions,
+ parallelizationEnabled);
+
+ if (changeTableRowCount == 0) {
+ LOG.debug(String.format(
+ "The changes-table does not contain any rows. %s is now exiting.",
+ OraOopUtilities.getCurrentMethodName()));
+ return;
+ }
+
+ switch (exportTableUpdateTechnique) {
+
+ case ReInsertUpdatedRows:
+ case ReInsertUpdatedRowsAndNewRows:
+
+ OraOopOracleQueries.deleteRowsFromTable(this.getConnection(),
+ targetTable, changesTable, this.updateColumnNames,
+ parallelizationEnabled);
+
+ OraOopOracleQueries.insertRowsIntoExportTable(this.getConnection(),
+ targetTable, changesTable, sysDateTime, this.mapperId,
+ parallelizationEnabled);
+ break;
+
+ case UpdateSql:
+
+ long start = System.nanoTime();
+
+ OraOopOracleQueries.updateTable(this.getConnection(), targetTable,
+ changesTable, this.updateColumnNames, this
+ .getOracleTableColumns(), sysDateTime, this.mapperId,
+ parallelizationEnabled);
+
+ double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
+ LOG.info(String.format("Time spent performing an update: %f sec.",
+ timeInSec));
+ break;
+
+ case MergeSql:
+
+ long mergeStart = System.nanoTime();
+
+ OraOopOracleQueries.mergeTable(this.getConnection(), targetTable,
+ changesTable, this.updateColumnNames, this
+ .getOracleTableColumns(), sysDateTime, this.mapperId,
+ parallelizationEnabled);
+
+ double mergeTimeInSec = (System.nanoTime() - mergeStart)
+ / Math.pow(10, 9);
+ LOG.info(String.format("Time spent performing a merge: %f sec.",
+ mergeTimeInSec));
+
+ break;
+
+ default:
+ throw new RuntimeException(
+ String.format(
+ "Update %s to cater for the ExportTableUpdateTechnique \"%s\".",
+ OraOopUtilities.getCurrentMethodName(),
+ exportTableUpdateTechnique.toString()));
+ }
+
+ this.getConnection().commit();
+ } catch (SQLException ex) {
+ this.getConnection().rollback();
+ throw ex;
+ } finally {
+ OraOopOracleQueries.dropTable(this.getConnection(), changesTable);
+ }
+ }
+
+ @Override
+ protected String getBatchSqlStatement() {
+
+ if (sqlStatement == null) {
+ this.sqlStatement =
+ getBatchInsertSqlStatement(
+ this.useAppendValuesOracleHint ? "/*+APPEND_VALUES*/"
+ : "");
+ }
+
+ return this.sqlStatement;
+ }
+
+ @Override
+ void configurePreparedStatement(PreparedStatement statement,
+ List<SqoopRecord> userRecords) throws SQLException {
+
+ Map<String, Object> fieldMap;
+ try {
+ for (SqoopRecord record : userRecords) {
+ fieldMap = record.getFieldMap();
+
+ boolean updateKeyValueIsNull = false;
+ for (int idx = 0; idx < this.updateColumnNames.length; idx++) {
+ String updateColumnName = this.updateColumnNames[idx];
+ Object updateKeyValue = fieldMap.get(updateColumnName);
+ if (updateKeyValue == null) {
+ this.numberOfRowsSkipped++;
+ updateKeyValueIsNull = true;
+ break;
+ }
+ }
+
+ if (updateKeyValueIsNull) {
+ continue;
+ }
+
+ configurePreparedStatementColumns(statement, fieldMap);
+ }
+
+ } catch (Exception ex) {
+ if (ex instanceof SQLException) {
+ throw (SQLException) ex;
+ } else {
+ LOG.error(String.format("The following error occurred during %s",
+ OraOopUtilities.getCurrentMethodName()), ex);
+ throw new SQLException(ex);
+ }
+ }
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java b/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
new file mode 100644
index 0000000..473a5ae
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
@@ -0,0 +1,1461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Category;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.cloudera.sqoop.Sqoop;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.mapreduce.ExportJobBase;
+
+import org.apache.sqoop.manager.oracle.OraOopOutputFormatInsert.InsertMode;
+import org.apache.sqoop.manager.oracle.OraOopOutputFormatUpdate.UpdateMode;
+
+/**
+ * Utilities used by OraOop.
+ */
+public final class OraOopUtilities {
+
+ private OraOopUtilities() {
+ }
+
+ /**
+ * Used for testing purposes - can get OraOop to call a class to run a report
+ * on various performance metrics.
+ */
+ public static class OraOopStatsReports {
+ private String csvReport;
+ private String performanceReport;
+
+ public String getPerformanceReport() {
+ return performanceReport;
+ }
+
+ public void setPerformanceReport(String newPerformanceReport) {
+ this.performanceReport = newPerformanceReport;
+ }
+
+ public String getCsvReport() {
+ return csvReport;
+ }
+
+ public void setCsvReport(String newCsvReport) {
+ this.csvReport = newCsvReport;
+ }
+ }
+
+ protected static final OraOopLog LOG = OraOopLogFactory
+ .getLog(OraOopUtilities.class.getName());
+
+ public static List<String> copyStringList(List<String> list) {
+
+ List<String> result = new ArrayList<String>(list.size());
+ result.addAll(list);
+ return result;
+ }
+
+ public static OracleTable decodeOracleTableName(
+ String oracleConnectionUserName, String tableStr) {
+
+ String tableOwner;
+ String tableName;
+
+ // These are the possibilities for double-quote location...
+ // table
+ // "table"
+ // schema.table
+ // schema."table"
+ // "schema".table
+ // "schema"."table"
+ String[] tableStrings = tableStr.split("\"");
+
+ switch (tableStrings.length) {
+
+ case 1: // <- table or schema.table
+
+ tableStrings = tableStr.split("\\.");
+
+ switch (tableStrings.length) {
+
+ case 1: // <- No period
+ tableOwner = oracleConnectionUserName.toUpperCase();
+ tableName = tableStrings[0].toUpperCase();
+ break;
+ case 2: // <- 1 period
+ tableOwner = tableStrings[0].toUpperCase();
+ tableName = tableStrings[1].toUpperCase();
+ break;
+ default:
+ LOG.debug(String.format(
+ "Unable to decode the table name (displayed in "
+ + "double quotes): \"%s\"", tableStr));
+ throw new RuntimeException(String.format(
+ "Unable to decode the table name: %s", tableStr));
+ }
+ break;
+
+ case 2: // <- "table" or schema."table"
+
+ if (tableStrings[0] == null || tableStrings[0].isEmpty()) {
+ tableOwner = oracleConnectionUserName.toUpperCase();
+ } else {
+ tableOwner = tableStrings[0].toUpperCase();
+ // Remove the "." from the end of the schema name...
+ if (tableOwner.endsWith(".")) {
+ tableOwner = tableOwner.substring(0, tableOwner.length() - 1);
+ }
+ }
+
+ tableName = tableStrings[1];
+ break;
+
+ case 3: // <- "schema".table
+
+ tableOwner = tableStrings[1];
+ tableName = tableStrings[2].toUpperCase();
+ // Remove the "." from the start of the table name...
+ if (tableName.startsWith(".")) {
+ tableName = tableName.substring(1, tableName.length());
+ }
+
+ break;
+
+ case 4: // <- "schema"."table"
+ tableOwner = tableStrings[1];
+ tableName = tableStrings[3];
+ break;
+
+ default:
+ LOG.debug(String.format(
+ "Unable to decode the table name (displayed in double "
+ + "quotes): \"%s\"", tableStr));
+ throw new RuntimeException(String.format(
+ "Unable to decode the table name: %s", tableStr));
+
+ }
+ OracleTable result = new OracleTable(tableOwner, tableName);
+ return result;
+ }
+
+ public static OracleTable decodeOracleTableName(
+ String oracleConnectionUserName, String tableStr,
+ org.apache.hadoop.conf.Configuration conf) {
+
+ OracleTable result = new OracleTable();
+
+ // Have we already determined the answer to this question?...
+ if (conf != null) {
+ String tableOwner = conf.get(OraOopConstants.ORAOOP_TABLE_OWNER);
+ String tableName = conf.get(OraOopConstants.ORAOOP_TABLE_NAME);
+ result = new OracleTable(tableOwner, tableName);
+ }
+
+ // If we couldn't look up the answer, then determine it now...
+ if (result.getSchema() == null || result.getName() == null) {
+
+ result = decodeOracleTableName(oracleConnectionUserName, tableStr);
+
+ LOG.debug(String.format(
+ "The Oracle table context has been derived from:\n"
+ + "\toracleConnectionUserName = %s\n" + "\ttableStr = %s\n"
+ + "\tas:\n" + "\towner : %s\n" + "\ttable : %s",
+ oracleConnectionUserName, tableStr, result.getSchema(), result
+ .getName()));
+
+ // Save the answer for next time...
+ if (conf != null) {
+ conf.set(OraOopConstants.ORAOOP_TABLE_OWNER, result.getSchema());
+ conf.set(OraOopConstants.ORAOOP_TABLE_NAME, result.getName());
+ }
+ }
+
+ return result;
+ }
+
+ public static boolean oracleJdbcUrlGenerationDisabled(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ return conf.getBoolean(OraOopConstants.ORAOOP_JDBC_URL_VERBATIM, false);
+ }
+
+ public static boolean userWantsOracleSessionStatisticsReports(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ return conf.getBoolean(OraOopConstants.ORAOOP_REPORT_SESSION_STATISTICS,
+ false);
+ }
+
+ public static boolean enableDebugLoggingIfRequired(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ boolean result = false;
+
+ try {
+
+ Level desiredOraOopLoggingLevel =
+ Level.toLevel(conf.get(OraOopConstants.ORAOOP_LOGGING_LEVEL),
+ Level.INFO);
+
+ Level sqoopLogLevel =
+ Logger.getLogger(Sqoop.class.getName()).getParent().getLevel();
+
+ if (desiredOraOopLoggingLevel == Level.DEBUG
+ || desiredOraOopLoggingLevel == Level.ALL
+ || sqoopLogLevel == Level.DEBUG || sqoopLogLevel == Level.ALL) {
+
+ Category oraOopLogger =
+ Logger.getLogger(OraOopManagerFactory.class.getName()).getParent();
+ oraOopLogger.setLevel(Level.DEBUG);
+ LOG.debug("Enabled OraOop debug logging.");
+ result = true;
+
+ conf.set(OraOopConstants.ORAOOP_LOGGING_LEVEL, Level.DEBUG.toString());
+ }
+ } catch (Exception ex) {
+ LOG.error(String.format(
+ "Unable to determine whether debug logging should be enabled.\n%s",
+ getFullExceptionMessage(ex)));
+ }
+
+ return result;
+ }
+
+ public static String generateDataChunkId(int fileId, int fileBatch) {
+ StringBuilder sb = new StringBuilder();
+ return sb.append(fileId).append("_").append(fileBatch).toString();
+ }
+
+ public static String getCurrentMethodName() {
+
+ StackTraceElement[] stackTraceElements = (new Throwable()).getStackTrace();
+ return String.format("%s()", stackTraceElements[1].getMethodName());
+ }
+
+ public static String[] getDuplicatedStringArrayValues(String[] list,
+ boolean ignoreCase) {
+
+ if (list == null) {
+ throw new IllegalArgumentException("The list argument cannot be null");
+ }
+
+ ArrayList<String> duplicates = new ArrayList<String>();
+
+ for (int idx1 = 0; idx1 < list.length - 1; idx1++) {
+ for (int idx2 = idx1 + 1; idx2 < list.length; idx2++) {
+ if (list[idx1].equals(list[idx2])) {
+ // If c is a duplicate of both a & b, don't add c to the list twice...
+ if (!duplicates.contains(list[idx2])) {
+ duplicates.add(list[idx2]);
+ }
+
+ } else if (ignoreCase && list[idx1].equalsIgnoreCase((list[idx2]))) {
+ // If c is a duplicate of both a & b, don't add c to the list twice...
+ if (stringListIndexOf(duplicates, list[idx2], ignoreCase) == -1) {
+ duplicates.add(list[idx2]);
+ }
+ }
+ }
+ }
+
+ return duplicates.toArray(new String[duplicates.size()]);
+ }
+
+ public static String getFullExceptionMessage(Exception ex) {
+
+ ByteArrayOutputStream arrayStream = new ByteArrayOutputStream();
+ PrintStream printStream = new PrintStream(arrayStream);
+ ex.printStackTrace(printStream);
+ return arrayStream.toString();
+ }
+
+ public static int getMinNumberOfImportMappersAcceptedByOraOop(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ return conf.getInt(OraOopConstants.ORAOOP_MIN_IMPORT_MAPPERS,
+ OraOopConstants.MIN_NUM_IMPORT_MAPPERS_ACCEPTED_BY_ORAOOP);
+ }
+
+ public static int getMinAppendValuesBatchSize(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ return conf.getInt(OraOopConstants.ORAOOP_MIN_APPEND_VALUES_BATCH_SIZE,
+ OraOopConstants.ORAOOP_MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT);
+ }
+
+ public static int getMinNumberOfExportMappersAcceptedByOraOop(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ return conf.getInt(OraOopConstants.ORAOOP_MIN_EXPORT_MAPPERS,
+ OraOopConstants.MIN_NUM_EXPORT_MAPPERS_ACCEPTED_BY_ORAOOP);
+ }
+
+ public static int getMinNumberOfOracleRacActiveInstancesForDynamicJdbcUrlUse(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ return conf.getInt(OraOopConstants.ORAOOP_MIN_RAC_ACTIVE_INSTANCES,
+ OraOopConstants.MIN_NUM_RAC_ACTIVE_INSTANCES_FOR_DYNAMIC_JDBC_URLS);
+ }
+
+ public static int getNumberOfDataChunksPerOracleDataFile(
+ int desiredNumberOfMappers, org.apache.hadoop.conf.Configuration conf) {
+
+ final String MAPPER_MULTIPLIER = "oraoop.datachunk.mapper.multiplier";
+ final String RESULT_INCREMENT = "oraoop.datachunk.result.increment";
+
+ int numberToMultiplyMappersBy = conf.getInt(MAPPER_MULTIPLIER, 2);
+ int numberToIncrementResultBy = conf.getInt(RESULT_INCREMENT, 1);
+
+ // The number of chunks generated will *not* be a multiple of the number of
+ // splits,
+ // to ensure that each split doesn't always get data from the start of each
+ // data-file...
+ int numberOfDataChunksPerOracleDataFile =
+ (desiredNumberOfMappers * numberToMultiplyMappersBy)
+ + numberToIncrementResultBy;
+
+ LOG.debug(String.format("%s:\n" + "\t%s=%d\n" + "\t%s=%d\n"
+ + "\tdesiredNumberOfMappers=%d\n" + "\tresult=%d",
+ getCurrentMethodName(), MAPPER_MULTIPLIER, numberToMultiplyMappersBy,
+ RESULT_INCREMENT, numberToIncrementResultBy, desiredNumberOfMappers,
+ numberOfDataChunksPerOracleDataFile));
+
+ return numberOfDataChunksPerOracleDataFile;
+ }
+
+ public static OraOopConstants.OraOopOracleDataChunkMethod
+ getOraOopOracleDataChunkMethod(Configuration conf) {
+ if (conf == null) {
+ throw new IllegalArgumentException("The conf argument cannot be null");
+ }
+
+ String strMethod =
+ conf.get(OraOopConstants.ORAOOP_ORACLE_DATA_CHUNK_METHOD);
+ if (strMethod == null) {
+ return OraOopConstants.ORAOOP_ORACLE_DATA_CHUNK_METHOD_DEFAULT;
+ }
+
+ OraOopConstants.OraOopOracleDataChunkMethod result;
+
+ try {
+ strMethod = strMethod.toUpperCase().trim();
+ result = OraOopConstants.OraOopOracleDataChunkMethod.valueOf(strMethod);
+ } catch (IllegalArgumentException ex) {
+ result = OraOopConstants.ORAOOP_ORACLE_DATA_CHUNK_METHOD_DEFAULT;
+ LOG.error("An invalid value of \"" + strMethod
+ + "\" was specified for the \""
+ + OraOopConstants.ORAOOP_ORACLE_DATA_CHUNK_METHOD
+ + "\" configuration property value.\n" + "\tThe default value of "
+ + OraOopConstants.ORAOOP_ORACLE_DATA_CHUNK_METHOD_DEFAULT
+ + " will be used.");
+ }
+ return result;
+ }
+
+ public static
+ OraOopConstants.OraOopOracleBlockToSplitAllocationMethod
+ getOraOopOracleBlockToSplitAllocationMethod(
+ org.apache.hadoop.conf.Configuration conf,
+ OraOopConstants.OraOopOracleBlockToSplitAllocationMethod
+ defaultMethod) {
+
+ if (conf == null) {
+ throw new IllegalArgumentException("The conf argument cannot be null");
+ }
+
+ String strMethod =
+ conf.get(
+ OraOopConstants.ORAOOP_ORACLE_BLOCK_TO_SPLIT_ALLOCATION_METHOD);
+ if (strMethod == null) {
+ return defaultMethod;
+ }
+
+ OraOopConstants.OraOopOracleBlockToSplitAllocationMethod result;
+
+ try {
+ strMethod = strMethod.toUpperCase().trim();
+ result =
+ OraOopConstants.OraOopOracleBlockToSplitAllocationMethod
+ .valueOf(strMethod);
+ } catch (IllegalArgumentException ex) {
+ result = defaultMethod;
+
+ String errorMsg =
+ String
+ .format(
+ "An invalid value of \"%s\" was specified for the \"%s\" "
+ + "configuration property value.\n"
+ + "\tValid values are: %s\n"
+ + "\tThe default value of %s will be used.",
+ strMethod,
+ OraOopConstants.ORAOOP_ORACLE_BLOCK_TO_SPLIT_ALLOCATION_METHOD,
+ getOraOopOracleBlockToSplitAllocationMethods(), defaultMethod
+ .name());
+ LOG.error(errorMsg);
+ }
+
+ return result;
+ }
+
+ private static String getOraOopOracleBlockToSplitAllocationMethods() {
+
+ OraOopConstants.OraOopOracleBlockToSplitAllocationMethod[] values =
+ OraOopConstants.OraOopOracleBlockToSplitAllocationMethod.values();
+
+ StringBuilder result =
+ new StringBuilder((2 * values.length) - 1); // <- Include capacity
+ // for commas
+
+ for (int idx = 0; idx < values.length; idx++) {
+ OraOopConstants.OraOopOracleBlockToSplitAllocationMethod value =
+ values[idx];
+ if (idx > 0) {
+ result.append(" or ");
+ }
+ result.append(value.name());
+ }
+ return result.toString();
+ }
+
+ public static OraOopConstants.OraOopTableImportWhereClauseLocation
+ getOraOopTableImportWhereClauseLocation(
+ org.apache.hadoop.conf.Configuration conf,
+ OraOopConstants.OraOopTableImportWhereClauseLocation defaultLocation) {
+
+ if (conf == null) {
+ throw new IllegalArgumentException("The conf argument cannot be null");
+ }
+
+ String strLocation =
+ conf.get(OraOopConstants.ORAOOP_TABLE_IMPORT_WHERE_CLAUSE_LOCATION);
+ if (strLocation == null) {
+ return defaultLocation;
+ }
+
+ OraOopConstants.OraOopTableImportWhereClauseLocation result;
+
+ try {
+ strLocation = strLocation.toUpperCase().trim();
+ result =
+ OraOopConstants.OraOopTableImportWhereClauseLocation
+ .valueOf(strLocation);
+ } catch (IllegalArgumentException ex) {
+ result = defaultLocation;
+
+ String errorMsg =
+ String
+ .format(
+ "An invalid value of \"%s\"was specified for the \"%s\" "
+ + "configuration property value.\n"
+ + "\tValid values are: %s\n"
+ + "\tThe default value of %s will be used.", strLocation,
+ OraOopConstants.ORAOOP_TABLE_IMPORT_WHERE_CLAUSE_LOCATION,
+ getOraOopTableImportWhereClauseLocations(), defaultLocation
+ .name());
+ LOG.error(errorMsg);
+ }
+
+ return result;
+ }
+
+ private static String getOraOopTableImportWhereClauseLocations() {
+
+ OraOopConstants.OraOopTableImportWhereClauseLocation[] locationValues =
+ OraOopConstants.OraOopTableImportWhereClauseLocation.values();
+
+ StringBuilder result =
+ new StringBuilder((2 * locationValues.length) - 1); // <- Include
+ // capacity for
+ // commas
+
+ for (int idx = 0; idx < locationValues.length; idx++) {
+ OraOopConstants.OraOopTableImportWhereClauseLocation locationValue =
+ locationValues[idx];
+ if (idx > 0) {
+ result.append(" or ");
+ }
+ result.append(locationValue.name());
+ }
+ return result.toString();
+ }
+
+ public static String getOutputDirectory(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ String workingDir = conf.get("mapred.working.dir");
+ String outputDir = conf.get("mapred.output.dir");
+
+ return workingDir + "/" + outputDir;
+ }
+
+ public static String padLeft(String s, int n) {
+ return StringUtils.leftPad(s, n);
+ }
+
+ public static String padRight(String s, int n) {
+ return StringUtils.rightPad(s, n);
+ }
+
+ public static String replaceConfigurationExpression(String str,
+ org.apache.hadoop.conf.Configuration conf) {
+
+ int startPos = str.indexOf('{');
+ int endPos = str.indexOf('}');
+
+ // Example:
+ // alter session set timezone = '{oracle.sessionTimeZone|GMT}';
+
+ if (startPos == -1 || endPos == -1) {
+ return str;
+ }
+
+ String configName = null;
+ String defaultValue = null;
+
+ String expression = str.substring(startPos + 1, endPos);
+ int defaultValuePos = expression.indexOf('|');
+ if (defaultValuePos == -1) {
+ // return expression;
+ configName = expression;
+ } else {
+ configName = expression.substring(0, defaultValuePos);
+ defaultValue = expression.substring(defaultValuePos + 1);
+ }
+
+ if (defaultValue == null) {
+ defaultValue = "";
+ }
+
+ String configValue = conf.get(configName);
+ if (configValue == null) {
+ configValue = defaultValue;
+ }
+
+ String result = str.replace(String.format("{%s}", expression), configValue);
+
+ LOG.debug(String.format("The expression:\n%s\nwas replaced with:\n%s", str,
+ result));
+
+ // Recurse to evaluate any other expressions...
+ result = replaceConfigurationExpression(result, conf);
+
+ return result;
+ }
+
+ public static boolean stackContainsClass(String className) {
+
+ StackTraceElement[] stackTraceElements = (new Throwable()).getStackTrace();
+ for (StackTraceElement stackTraceElement : stackTraceElements) {
+ if (stackTraceElement.getClassName().equalsIgnoreCase(className)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public static Object startSessionSnapshot(Connection connection) {
+
+ Object result = null;
+ try {
+
+ Class<?> oraOopOraStatsClass =
+ Class.forName("quest.com.oraOop.oracleStats.OraOopOraStats");
+ Method startSnapshotMethod =
+ oraOopOraStatsClass.getMethod("startSnapshot", Connection.class);
+ if (connection != null) {
+ result = startSnapshotMethod.invoke(null, connection);
+ }
+ } catch (ClassNotFoundException ex) {
+ throw new RuntimeException(ex);
+ } catch (NoSuchMethodException ex) {
+ throw new RuntimeException(ex);
+ } catch (InvocationTargetException ex) {
+ throw new RuntimeException(ex);
+ } catch (IllegalAccessException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ return result;
+ }
+
+ public static OraOopStatsReports stopSessionSnapshot(Object oraOopOraStats) {
+
+ OraOopStatsReports result = new OraOopStatsReports();
+
+ if (oraOopOraStats == null) {
+ return result;
+ }
+
+ try {
+
+ Class<?> oraOopOraStatsClass =
+ Class.forName("quest.com.oraOop.oracleStats.OraOopOraStats");
+ Method finalizeSnapshotMethod =
+ oraOopOraStatsClass.getMethod("finalizeSnapshot", (Class<?>[]) null);
+ finalizeSnapshotMethod.invoke(oraOopOraStats, (Object[]) null);
+
+ Method performanceReportCsvMethod =
+ oraOopOraStatsClass.getMethod("getStatisticsCSV", (Class<?>[]) null);
+ result.setCsvReport((String) performanceReportCsvMethod.invoke(
+ oraOopOraStats, (Object[]) null));
+
+ Method performanceReportMethod =
+ oraOopOraStatsClass.getMethod("performanceReport", (Class<?>[]) null);
+ result.setPerformanceReport((String) performanceReportMethod.invoke(
+ oraOopOraStats, (Object[]) null));
+ } catch (ClassNotFoundException ex) {
+ throw new RuntimeException(ex);
+ } catch (NoSuchMethodException ex) {
+ throw new RuntimeException(ex);
+ } catch (InvocationTargetException ex) {
+ throw new RuntimeException(ex);
+ } catch (IllegalAccessException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ return result;
+ }
+
+ public static boolean stringArrayContains(String[] list, String value,
+ boolean ignoreCase) {
+
+ return stringArrayIndexOf(list, value, ignoreCase) > -1;
+ }
+
+ public static int stringArrayIndexOf(String[] list, String value,
+ boolean ignoreCase) {
+
+ for (int idx = 0; idx < list.length; idx++) {
+ if (list[idx].equals(value)) {
+ return idx;
+ }
+ if (ignoreCase && list[idx].equalsIgnoreCase(value)) {
+ return idx;
+ }
+ }
+ return -1;
+ }
+
+ public static String stringArrayToCSV(String[] list) {
+
+ return stringArrayToCSV(list, "");
+ }
+
+ public static String
+ stringArrayToCSV(String[] list, String encloseValuesWith) {
+
+ StringBuilder result = new StringBuilder((list.length * 2) - 1);
+ for (int idx = 0; idx < list.length; idx++) {
+ if (idx > 0) {
+ result.append(",");
+ }
+ result
+ .append(String.format("%1$s%2$s%1$s", encloseValuesWith, list[idx]));
+ }
+ return result.toString();
+ }
+
+ public static int stringListIndexOf(List<String> list, String value,
+ boolean ignoreCase) {
+
+ for (int idx = 0; idx < list.size(); idx++) {
+ if (list.get(idx).equals(value)) {
+ return idx;
+ }
+ if (ignoreCase && list.get(idx).equalsIgnoreCase(value)) {
+ return idx;
+ }
+ }
+ return -1;
+ }
+
+ public static void writeOutputFile(org.apache.hadoop.conf.Configuration conf,
+ String fileName, String fileText) {
+
+ Path uniqueFileName = null;
+ try {
+ FileSystem fileSystem = FileSystem.get(conf);
+
+ // NOTE: This code is not thread-safe.
+ // i.e. A race-condition could still cause this code to 'fail'.
+
+ int suffix = 0;
+ String fileNameTemplate = fileName + "%s";
+ while (true) {
+ uniqueFileName =
+ new Path(getOutputDirectory(conf), String.format(fileNameTemplate,
+ suffix == 0 ? "" : String.format(" (%d)", suffix)));
+ if (!fileSystem.exists(uniqueFileName)) {
+ break;
+ }
+ suffix++;
+ }
+
+ FSDataOutputStream outputStream =
+ fileSystem.create(uniqueFileName, false);
+ if (fileText != null) {
+ outputStream.writeBytes(fileText);
+ }
+ outputStream.flush();
+ outputStream.close();
+ } catch (IOException ex) {
+ LOG.error(String.format("Error attempting to write the file %s\n" + "%s",
+ (uniqueFileName == null ? "null" : uniqueFileName.toUri()),
+ getFullExceptionMessage(ex)));
+ }
+ }
+
+ /**
+ * Class to wrap details about Oracle connection string.
+ */
+ public static class JdbcOracleThinConnection {
+ private String host;
+ private int port;
+ private String sid;
+ private String service;
+
+ public JdbcOracleThinConnection(String host, int port, String sid,
+ String service) {
+ this.host = host;
+ this.port = port;
+ this.sid = sid;
+ this.service = service;
+ }
+
+ @Override
+ public String toString() {
+
+ // Use the SID if it's available...
+ if (this.sid != null && !this.sid.isEmpty()) {
+ return String.format("jdbc:oracle:thin:@%s:%d:%s", this.host,
+ this.port, this.sid);
+ }
+
+ // Otherwise, use the SERVICE. Note that the service is prefixed by "/",
+ // not by ":"...
+ if (this.service != null && !this.service.isEmpty()) {
+ return String.format("jdbc:oracle:thin:@%s:%d/%s", this.host,
+ this.port, this.service);
+ }
+
+ throw new RuntimeException(
+ "Unable to generate a JDBC URL, as no SID or SERVICE has been "
+ + "provided.");
+
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getSid() {
+ return sid;
+ }
+
+ public String getService() {
+ return service;
+ }
+ }
+
+ /**
+ * Thrown if the Oracle connection string cannot be parsed.
+ */
+ public static class JdbcOracleThinConnectionParsingError extends Exception {
+
+ private static final long serialVersionUID = 1559860600099354233L;
+
+ public JdbcOracleThinConnectionParsingError(String message) {
+
+ super(message);
+ }
+
+ public JdbcOracleThinConnectionParsingError(String message,
+ Throwable cause) {
+
+ super(message, cause);
+ }
+
+ public JdbcOracleThinConnectionParsingError(Throwable cause) {
+
+ super(cause);
+ }
+ }
+
+ public static String getOracleServiceName(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ return conf.get(OraOopConstants.ORAOOP_ORACLE_RAC_SERVICE_NAME, "");
+ }
+
+ public static String generateOracleSidJdbcUrl(String hostName, int port,
+ String sid) {
+
+ return String.format("jdbc:oracle:thin:@(DESCRIPTION=" + "(ADDRESS_LIST="
+ + "(ADDRESS=(PROTOCOL=TCP)(HOST=%s)(PORT=%d))" + ")"
+ + "(CONNECT_DATA=(SERVER=DEDICATED)(SID=%s))" + ")", hostName, port,
+ sid);
+ }
+
+ public static String generateOracleServiceNameJdbcUrl(String hostName,
+ int port, String serviceName) {
+
+ return String.format("jdbc:oracle:thin:@(DESCRIPTION=" + "(ADDRESS_LIST="
+ + "(ADDRESS=(PROTOCOL=TCP)(HOST=%s)(PORT=%d))" + ")"
+ + "(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=%s))" + ")", hostName,
+ port, serviceName);
+ }
+
+ public static String getMapperJdbcUrlPropertyName(int mapperId,
+ org.apache.hadoop.conf.Configuration conf) {
+
+ return String.format("oraoop.mapper.jdbc.url.%d", mapperId);
+ }
+
+ public static final String SQOOP_JOB_TYPE = "oraoop.sqoop.job.type";
+
+ public static void rememberSqoopJobType(OraOopConstants.Sqoop.Tool jobType,
+ org.apache.hadoop.conf.Configuration conf) {
+
+ conf.set(SQOOP_JOB_TYPE, jobType.name());
+ }
+
+ public static OraOopConstants.Sqoop.Tool recallSqoopJobType(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ String jobType = conf.get(SQOOP_JOB_TYPE);
+ if (jobType == null || jobType.isEmpty()) {
+ throw new RuntimeException(
+ "RecallSqoopJobType() cannot be called unless RememberSqoopJobType() "
+ + "has been used.");
+ }
+
+ OraOopConstants.Sqoop.Tool result =
+ OraOopConstants.Sqoop.Tool.valueOf(jobType);
+ return result;
+ }
+
+ public static boolean omitLobAndLongColumnsDuringImport(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ return conf.getBoolean(OraOopConstants.ORAOOP_IMPORT_OMIT_LOBS_AND_LONG,
+ false);
+ }
+
+ public static boolean oracleSessionHasBeenKilled(Exception exception) {
+
+ Throwable ex = exception;
+
+ while (ex != null) {
+ if (ex instanceof SQLException
+ && ((SQLException) ex).getErrorCode() == 28) { // ORA-00028: your
+ // session has been
+ // killed
+ return true;
+ }
+
+ ex = ex.getCause();
+ }
+
+ return false;
+ }
+
+ private static String
+ formatTimestampForOracleObjectName(Object oracleDateTime) {
+
+ // NOTE: Update decodeTimestampFromOracleObjectName() if you modify this
+ // method.
+
+ String jobTimeStr =
+ OraOopOracleQueries.oraDATEToString(oracleDateTime,
+ OraOopConstants.ORACLE_OBJECT_NAME_DATE_TO_STRING_FORMAT_STRING);
+
+ return jobTimeStr;
+
+ // E.g. 20101028_151000 (15 characters)
+ }
+
+ private static Object decodeTimestampFromOracleObjectName(
+ String oracleObjectNameTimestampFragment) {
+
+ String dateString = oracleObjectNameTimestampFragment;
+ String dateFormatString =
+ OraOopConstants.ORACLE_OBJECT_NAME_DATE_TO_STRING_FORMAT_STRING;
+
+ // return oracle.sql.DATE.fromText(oracleObjectNameTimestampFragment
+ // ,OraOopConstants.ORACLE_OBJECT_NAME_DATE_TO_STRING_FORMAT_STRING
+ // ,null);
+
+ /*
+ * Unfortunately, we don't seem to be able to reliably decode strings into
+ * DATE objects using Oracle. For example, the following string will cause
+ * Oracle to throw an "Invalid Oracle date" exception, due to the time
+ * portion starting with a zero...
+ * oracle.sql.DATE.fromText("20101123 091554", "yyyymmdd hh24miss", null);
+ *
+ * Therefore, we need to manually deconstruct the date string and insert
+ * some colons into the time so that Oracle can decode the string. (This is
+ * therefore an Oracle bug we're working around.)
+ */
+
+ try {
+ String year = oracleObjectNameTimestampFragment.substring(0, 4);
+ String month = oracleObjectNameTimestampFragment.substring(4, 6);
+ String day = oracleObjectNameTimestampFragment.substring(6, 8);
+ String hour = oracleObjectNameTimestampFragment.substring(9, 11);
+ String minute = oracleObjectNameTimestampFragment.substring(11, 13);
+ String second = oracleObjectNameTimestampFragment.substring(13, 15);
+ dateString =
+ String.format("%s/%s/%s %s:%s:%s", year, month, day, hour, minute,
+ second);
+ dateFormatString = "yyyy/mm/dd hh24:mi:ss";
+
+ return OraOopOracleQueries.oraDATEFromString(
+ dateString, dateFormatString);
+ } catch (Exception ex) {
+ LOG.debug(String.format(
+ "%s could not convert the string \"%s\" into a DATE via the format "
+ + "string \"%s\".\n" + "The error encountered was:\n%s",
+ getCurrentMethodName(), dateString, dateFormatString,
+ getFullExceptionMessage(ex)));
+
+ return null;
+ }
+ }
+
+ public static String createExportTablePartitionNameFromOracleTimestamp(
+ Object oracleDateTime) {
+
+ // Partition name can be up to 30 characters long and must start with a
+ // letter...
+ return OraOopConstants.EXPORT_TABLE_PARTITION_NAME_PREFIX
+ + formatTimestampForOracleObjectName(oracleDateTime);
+
+ // E.g. ORAOOP_20101028_151000 (22 characters)
+ }
+
+ public static String createExportTableNamePrefixFromOracleTimestamp(
+ Object oracleDateTime) {
+
+ // NOTE: Alter decodeExportTableNamePrefix() if you modify this method.
+
+ // Table name can be 30 characters long and must start with a letter...
+ return OraOopConstants.EXPORT_MAPPER_TABLE_NAME_PREFIX
+ + formatTimestampForOracleObjectName(oracleDateTime);
+ // G1.ORAOOP_20101028_152500 (22 characters) (This is just the prefix,
+ // append "_3" for mapper 4)
+ }
+
+ public static Object decodeExportTableNamePrefix(String tableNamePrefix) {
+
+ if (tableNamePrefix == null || tableNamePrefix.isEmpty()) {
+ return null;
+ }
+
+ if (!tableNamePrefix
+ .startsWith(OraOopConstants.EXPORT_MAPPER_TABLE_NAME_PREFIX)) {
+ return null;
+ }
+
+ String formattedTimestamp =
+ tableNamePrefix.substring(
+ OraOopConstants.EXPORT_MAPPER_TABLE_NAME_PREFIX.length(),
+ tableNamePrefix.length());
+
+ return decodeTimestampFromOracleObjectName(formattedTimestamp);
+ }
+
+ private static boolean userWantsToCreateExportTableFromTemplate(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ String exportTableTemplate =
+ conf.get(OraOopConstants.ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE, "");
+ if (exportTableTemplate.isEmpty()) {
+ return false;
+ }
+
+ OraOopConstants.Sqoop.Tool tool = OraOopUtilities.recallSqoopJobType(conf);
+ switch (tool) {
+ case UNKNOWN:
+ case EXPORT:
+ return true;
+
+ default:
+ return false;
+ }
+ }
+
+ public static boolean enableOracleParallelProcessingDuringExport(
+ org.apache.hadoop.conf.Configuration conf) {
+ return conf.getBoolean(OraOopConstants.ORAOOP_EXPORT_PARALLEL, false);
+ }
+
+ public static boolean userWantsToCreatePartitionedExportTableFromTemplate(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ return userWantsToCreateExportTableFromTemplate(conf)
+ && conf.getBoolean(
+ OraOopConstants.ORAOOP_EXPORT_CREATE_TABLE_PARTITIONED, false);
+ }
+
+ public static boolean userWantsToCreateNonPartitionedExportTableFromTemplate(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ return userWantsToCreateExportTableFromTemplate(conf)
+ && !conf.getBoolean(
+ OraOopConstants.ORAOOP_EXPORT_CREATE_TABLE_PARTITIONED, false);
+ }
+
+ public static String generateExportTableSubPartitionName(int mapperId,
+ Object sysDateTime, org.apache.hadoop.conf.Configuration conf) {
+
+ String partitionName =
+ createExportTablePartitionNameFromOracleTimestamp(sysDateTime);
+
+ String subPartitionName = String.format("%s_MAP_%d" // <- Should allow for
+ // 1,000 mappers before
+ // exceeding 30
+ // characters
+ , partitionName // <- Partition name is 22 characters
+ , mapperId);
+
+ // Check the length of the name...
+ if (subPartitionName.length()
+ > OraOopConstants.Oracle.MAX_IDENTIFIER_LENGTH) {
+ throw new RuntimeException(
+ String
+ .format(
+ "The generated Oracle subpartition name \"%s\" is longer "
+ + "than %d characters.",
+ subPartitionName,
+ OraOopConstants.Oracle.MAX_IDENTIFIER_LENGTH));
+ }
+
+ return subPartitionName;
+ }
+
+ public static String[] generateExportTableSubPartitionNames(int numMappers,
+ Object sysDateTime, org.apache.hadoop.conf.Configuration conf) {
+
+ String[] result = new String[numMappers];
+ for (int idx = 0; idx < numMappers; idx++) {
+ result[idx] = generateExportTableSubPartitionName(idx, sysDateTime, conf);
+ }
+
+ return result;
+ }
+
+ public static OracleTable generateExportTableMapperTableName(int mapperId,
+ Object sysDateTime, String schema) {
+ //mapperId: should allow 10,000,000 mappers before it exceeds 30 characters.
+ return generateExportTableMapperTableName(Integer.toString(mapperId)
+ , sysDateTime, schema);
+ }
+
+ public static OracleTable generateExportTableMapperTableName(
+ String mapperSuffix, Object sysDateTime, String schema) {
+
+ // NOTE: Update decodeExportTableMapperTableName() if you alter this method.
+
+ // Generate a (22 character) prefix to use for the N tables that need to be
+ // created for the N mappers to insert into...
+ String mapperTableNamePrefix =
+ createExportTableNamePrefixFromOracleTimestamp(sysDateTime);
+
+ // Generate the name...
+ String tableName = String.format("%s_%s", mapperTableNamePrefix // <- 22
+ // chars
+ , mapperSuffix);
+
+ // Check the length of the name...
+ if (tableName.length() > OraOopConstants.Oracle.MAX_IDENTIFIER_LENGTH) {
+ throw new RuntimeException(
+ String
+ .format(
+ "The generated Oracle table name \"%s\" is longer than "
+ + "%d characters.",
+ tableName, OraOopConstants.Oracle.MAX_IDENTIFIER_LENGTH));
+ }
+
+ return new OracleTable(schema, tableName);
+ }
+
+ /**
+ * Class to wrap the table name to be used for the mapper.
+ */
+ public static class DecodedExportMapperTableName {
+ private String mapperId; // <- This is not an int, because it might be "CHG"
+ // in the case of a "changes-table".
+ private Object tableDateTime;
+
+ public String getMapperId() {
+ return mapperId;
+ }
+
+ public void setMapperId(String newMapperId) {
+ this.mapperId = newMapperId;
+ }
+
+ public Object getTableDateTime() {
+ return tableDateTime;
+ }
+
+ public void setTableDateTime(Object newTableDateTime) {
+ this.tableDateTime = newTableDateTime;
+ }
+ }
+
+ public static DecodedExportMapperTableName decodeExportTableMapperTableName(
+ OracleTable oracleTable) {
+
+ DecodedExportMapperTableName result = null;
+ try {
+ int lastUnderScoreIndex = oracleTable.getName().lastIndexOf("_");
+ if (lastUnderScoreIndex == -1) {
+ return result;
+ }
+
+ String dateFragment =
+ oracleTable.getName().substring(0, lastUnderScoreIndex);
+ String mapperIdFragment =
+ oracleTable.getName().substring(lastUnderScoreIndex + 1,
+ oracleTable.getName().length());
+
+ Object sysDateTime = decodeExportTableNamePrefix(dateFragment);
+ if (sysDateTime != null) {
+ result = new DecodedExportMapperTableName();
+ result.setTableDateTime(sysDateTime);
+ result.setMapperId(mapperIdFragment);
+ }
+ } catch (Exception ex) {
+ LOG.debug(
+ String.format(
+ "Error when attempting to decode the export mapper-table name \"%s\".",
+ oracleTable.toString()), ex);
+ }
+ return result;
+ }
+
+ public static void rememberOracleDateTime(
+ org.apache.hadoop.conf.Configuration conf, String propertyName,
+ String dateTime) {
+ conf.set(propertyName, dateTime);
+ }
+
+ public static Object recallOracleDateTime(
+ org.apache.hadoop.conf.Configuration conf, String propertyName) {
+
+ String dateTimeStr = conf.get(propertyName);
+ if (dateTimeStr == null || dateTimeStr.isEmpty()) {
+ throw new RuntimeException(String.format(
+ "Unable to recall the value of the property \"%s\".", propertyName));
+ }
+
+ return OraOopOracleQueries.oraDATEFromString(dateTimeStr,
+ "yyyy-mm-dd hh24:mi:ss");
+ }
+
+ public static UpdateMode getExportUpdateMode(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ // NB: The Sqoop code does not add the column specified in the
+ // "--update-key" argument
+ // as a configuration property value until quite late in the process. i.e.
+ // After the
+ // OraOopManagerFactory.accept() have been called.
+ // (It is available via sqoopOptions.getUpdateKeyCol() however.)
+ // Therefore, when calling this method you need to be confident that the
+ // export being
+ // performed is actually an "update" export and not an "import" one.
+
+ // String updateKeyCol =
+ // conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY);
+ // if(updateKeyCol == null ||
+ // updateKeyCol.isEmpty())
+ // throw new
+ // RuntimeException(String.format("This job is not an update-export. "+
+ // "i.e. %s has not been specified."
+ // ,ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY));
+
+ UpdateMode updateMode = UpdateMode.Update;
+
+ boolean mergeData =
+ conf.getBoolean(OraOopConstants.ORAOOP_EXPORT_MERGE, false);
+ if (mergeData) {
+ updateMode = UpdateMode.Merge;
+ }
+
+ return updateMode;
+ }
+
+ public static InsertMode getExportInsertMode(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ InsertMode result = InsertMode.DirectInsert;
+
+ if (OraOopUtilities
+ .userWantsToCreatePartitionedExportTableFromTemplate(conf)
+ || conf.getBoolean(OraOopConstants.EXPORT_TABLE_HAS_ORAOOP_PARTITIONS,
+ false)) {
+ result = InsertMode.ExchangePartition;
+ }
+
+ return result;
+ }
+
+ public static String getJavaClassPath() {
+
+ return System.getProperty("java.class.path");
+ }
+
+ public static String replaceAll(String inputString, String textToReplace,
+ String replaceWith) {
+
+ String result = inputString.replaceAll(textToReplace, replaceWith);
+ if (!result.equals(inputString)) {
+ result = replaceAll(result, textToReplace, replaceWith);
+ }
+
+ return result;
+ }
+
+ public static String getTemporaryTableStorageClause(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ String result =
+ conf.get(OraOopConstants.ORAOOP_TEMPORARY_TABLE_STORAGE_CLAUSE, "");
+ if (result == null) {
+ result = "";
+ }
+ return result;
+ }
+
+ public static String getExportTableStorageClause(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ String result =
+ conf.get(OraOopConstants.ORAOOP_EXPORT_TABLE_STORAGE_CLAUSE, "");
+ if (result == null) {
+ result = "";
+ }
+ return result;
+ }
+
+ public static String[] getExportUpdateKeyColumnNames(SqoopOptions options) {
+
+ String updateKey = options.getUpdateKeyCol();
+ return getExtraExportUpdateKeyColumnNames(updateKey, options.getConf());
+ }
+
+ public static String[] getExportUpdateKeyColumnNames(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ String updateKey = conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY);
+ return getExtraExportUpdateKeyColumnNames(updateKey, conf);
+ }
+
+ /**
+ * Splits a string separated by commas - the elements can be optionally
+ * enclosed in quotes - this allows the elements to have commas in them.
+ *
+ * @param value
+ * The String to be split
+ * @return A list of values
+ */
+ public static List<String> splitStringList(String value) {
+ List<String> result = new ArrayList<String>();
+ if (value != null && !value.isEmpty()) {
+ Pattern pattern = Pattern.compile("([^\",]*|\"[^\"]*\")(,|$)");
+ Matcher matcher = pattern.matcher(value);
+ while (matcher.find()) {
+ if (matcher.group(1) != null && !matcher.group(1).isEmpty()) {
+ result.add(matcher.group(1));
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Splits a string list separated by commas. If the element is not surrounded
+ * by quotes it will be return in upper case. If the element is enclosed in
+ * quotes it will be returned in the same case and special characters will be
+ * retained.
+ *
+ * @param value
+ * The String to be split
+ * @return A list of values
+ */
+ public static List<String> splitOracleStringList(String value) {
+ List<String> result = new ArrayList<String>();
+ List<String> splitValue = splitStringList(value);
+ Pattern pattern = Pattern.compile("(\")([^\"]*)(\")");
+ for (String element : splitValue) {
+ Matcher matcher = pattern.matcher(element);
+ if (matcher.find()) {
+ result.add(matcher.group(2));
+ } else {
+ result.add(element.toUpperCase());
+ }
+ }
+ return result;
+ }
+
+ private static String[] getExtraExportUpdateKeyColumnNames(String updateKey,
+ org.apache.hadoop.conf.Configuration conf) {
+
+ if (updateKey == null) {
+ // This must be an "insert-export" if no --update-key has been specified!
+ return new String[0];
+ }
+
+ String extraKeys =
+ conf.get(OraOopConstants.ORAOOP_UPDATE_KEY_EXTRA_COLUMNS, "");
+ if (!extraKeys.isEmpty()) {
+ updateKey += "," + extraKeys;
+ }
+
+ String[] columnNames = updateKey.split(",");
+ for (int idx = 0; idx < columnNames.length; idx++) {
+ columnNames[idx] = columnNames[idx].trim();
+ if (!columnNames[idx].startsWith("\"")) {
+ columnNames[idx] = columnNames[idx].toUpperCase();
+ }
+
+ }
+ return columnNames;
+ }
+
+ public static OraOopConstants.AppendValuesHintUsage
+ getOracleAppendValuesHintUsage(org.apache.hadoop.conf.Configuration conf) {
+
+ if (conf == null) {
+ throw new IllegalArgumentException("The conf argument cannot be null");
+ }
+
+ String strUsage =
+ conf.get(OraOopConstants.ORAOOP_ORACLE_APPEND_VALUES_HINT_USAGE);
+ if (strUsage == null) {
+ return OraOopConstants.AppendValuesHintUsage.AUTO;
+ }
+
+ OraOopConstants.AppendValuesHintUsage result;
+
+ try {
+ strUsage = strUsage.toUpperCase().trim();
+ result = OraOopConstants.AppendValuesHintUsage.valueOf(strUsage);
+ } catch (IllegalArgumentException ex) {
+ result = OraOopConstants.AppendValuesHintUsage.AUTO;
+
+ String errorMsg =
+ String
+ .format(
+ "An invalid value of \"%s\" was specified for the \"%s\" "
+ + "configuration property value.\n"
+ + "\tValid values are: %s\n"
+ + "\tThe default value of %s will be used.", strUsage,
+ OraOopConstants.ORAOOP_ORACLE_APPEND_VALUES_HINT_USAGE,
+ getOraOopOracleAppendValuesHintUsageValues(),
+ OraOopConstants.AppendValuesHintUsage.AUTO.name());
+ LOG.error(errorMsg);
+ }
+
+ return result;
+ }
+
+ private static String getOraOopOracleAppendValuesHintUsageValues() {
+
+ OraOopConstants.AppendValuesHintUsage[] values =
+ OraOopConstants.AppendValuesHintUsage.values();
+
+ StringBuilder result = new StringBuilder((2 * values.length) - 1); // <-
+ // Include
+ // capacity
+ // for
+ // commas
+
+ for (int idx = 0; idx < values.length; idx++) {
+ OraOopConstants.AppendValuesHintUsage value = values[idx];
+ if (idx > 0) {
+ result.append(" or ");
+ }
+ result.append(value.name());
+ }
+ return result.toString();
+ }
+
+ public static String getImportHint(
+ org.apache.hadoop.conf.Configuration conf) {
+ String result = null;
+ result = conf.get(OraOopConstants.IMPORT_QUERY_HINT);
+ if (result == null || result.trim().isEmpty()) {
+ result = "";
+ } else {
+ result = String.format(OraOopConstants.Oracle.HINT_SYNTAX, result);
+ }
+ return result;
+ }
+
+ public static void appendJavaSecurityEgd(Configuration conf) {
+ String mapredJavaOpts = conf.get("mapred.child.java.opts");
+ if (mapredJavaOpts == null
+ || !mapredJavaOpts.contains("-Djava.security.egd")) {
+ StringBuilder newMapredJavaOpts =
+ new StringBuilder("-Djava.security.egd=file:///dev/urandom");
+ if (mapredJavaOpts != null && !mapredJavaOpts.isEmpty()) {
+ newMapredJavaOpts.append(" ").append(mapredJavaOpts);
+ }
+ String newMapredJavaOptsString = newMapredJavaOpts.toString();
+ conf.set("mapred.child.java.opts", newMapredJavaOptsString);
+ LOG.debug("Updated mapred.child.java.opts from \"" + mapredJavaOpts
+ + "\" to \"" + newMapredJavaOptsString + "\"");
+ }
+ }
+
+ public static void checkJavaSecurityEgd() {
+ String javaSecurityEgd = System.getProperty("java.security.egd");
+ if (!"file:///dev/urandom".equals(javaSecurityEgd)) {
+ LOG.warn("System property java.security.egd is not set to "
+ + "file:///dev/urandom - Oracle connections may time out.");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OracleActiveInstance.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OracleActiveInstance.java b/src/java/org/apache/sqoop/manager/oracle/OracleActiveInstance.java
new file mode 100644
index 0000000..180da53
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OracleActiveInstance.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+/**
+ * Wraps data from v$active_instances.
+ */
+public class OracleActiveInstance {
+
+ private String instanceName;
+ private String hostName;
+
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ public void setInstanceName(String newInstanceName) {
+ this.instanceName = newInstanceName;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String newHostName) {
+ this.hostName = newHostName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OracleConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OracleConnectionFactory.java b/src/java/org/apache/sqoop/manager/oracle/OracleConnectionFactory.java
new file mode 100644
index 0000000..094576b
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OracleConnectionFactory.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Create and initialize connections to Oracle RDBMS.
+ */
+public class OracleConnectionFactory {
+
+ protected OracleConnectionFactory() {
+ }
+
+ protected static final OraOopLog LOG = OraOopLogFactory
+ .getLog(OracleConnectionFactory.class.getName());
+
+ public static Connection createOracleJdbcConnection(
+ String jdbcDriverClassName, String jdbcUrl, String username,
+ String password) throws SQLException {
+ Properties props = null;
+ return createOracleJdbcConnection(jdbcDriverClassName, jdbcUrl, username,
+ password, props);
+ }
+
+ public static Connection createOracleJdbcConnection(
+ String jdbcDriverClassName, String jdbcUrl, String username,
+ String password, Properties additionalProps) throws SQLException {
+
+ loadJdbcDriver(jdbcDriverClassName);
+ Connection connection =
+ createConnection(jdbcUrl, username, password, additionalProps);
+
+ // Only OraOopDBRecordReader will call initializeOracleConnection(), as
+ // we only need to initialize the session(s) prior to the mapper starting
+ // it's job.
+ // i.e. We don't need to initialize the sessions in order to get the
+ // table's data-files etc.
+
+ // initializeOracleConnection(connection, conf);
+
+ return connection;
+ }
+
+ private static void loadJdbcDriver(String jdbcDriverClassName) {
+
+ try {
+ Class.forName(jdbcDriverClassName);
+ } catch (ClassNotFoundException ex) {
+ String errorMsg =
+ "Unable to load the jdbc driver class : " + jdbcDriverClassName;
+ LOG.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+ }
+
+ private static Connection createConnection(String jdbcUrl, String username,
+ String password, Properties additionalProps) throws SQLException {
+
+ Properties props = new Properties();
+ if (username != null) {
+ props.put("user", username);
+ }
+
+ if (password != null) {
+ props.put("password", password);
+ }
+
+ if (additionalProps != null && additionalProps.size() > 0) {
+ props.putAll(additionalProps);
+ }
+
+ try {
+ return DriverManager.getConnection(jdbcUrl, props);
+ } catch (SQLException ex) {
+ String errorMsg = String.format(
+ "Unable to obtain a JDBC connection to the URL \"%s\" as user \"%s\": ",
+ jdbcUrl, (username != null) ? username : "[null]");
+ LOG.error(errorMsg, ex);
+ throw ex;
+ }
+ }
+
+ public static void initializeOracleConnection(Connection connection,
+ org.apache.hadoop.conf.Configuration conf) throws SQLException {
+
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+
+ OraOopOracleQueries.setConnectionTimeZone(connection, conf);
+
+ setSessionClientInfo(connection, conf);
+
+ OraOopOracleQueries.setJdbcFetchSize(connection, conf);
+
+ executeOraOopSessionInitializationStatements(connection, conf);
+ }
+
+ protected static void setSessionClientInfo(Connection connection,
+ org.apache.hadoop.conf.Configuration conf) {
+
+ String sql = "";
+ try {
+ sql =
+ "begin \n"
+ + " dbms_application_info.set_module(module_name => "
+ + "'%s', action_name => '%s'); \n"
+ + "end;";
+
+ String oracleSessionActionName =
+ conf.get(OraOopConstants.ORACLE_SESSION_ACTION_NAME);
+
+ sql =
+ String.format(sql, OraOopConstants.ORACLE_SESSION_MODULE_NAME,
+ oracleSessionActionName);
+
+ Statement statement = connection.createStatement();
+ statement.execute(sql);
+ LOG.info("Initializing Oracle session with SQL :\n" + sql);
+ } catch (Exception ex) {
+ LOG.error(String.format("An error occurred while attempting to execute "
+ + "the following Oracle session-initialization statement:" + "\n%s"
+ + "\nError:" + "\n%s", sql, ex.getMessage()));
+ }
+ }
+
+ protected static void executeOraOopSessionInitializationStatements(
+ Connection connection, org.apache.hadoop.conf.Configuration conf) {
+
+ List<String> statements = parseOraOopSessionInitializationStatements(conf);
+
+ if (statements.size() == 0) {
+ LOG.warn(String
+ .format(
+ "No Oracle 'session initialization' statements were found to "
+ + "execute.\nCheck that your %s and/or %s files are correctly "
+ + "installed in the ${SQOOP_HOME}/conf directory.",
+ OraOopConstants.ORAOOP_SITE_TEMPLATE_FILENAME,
+ OraOopConstants.ORAOOP_SITE_FILENAME));
+ } else {
+ for (String statement : statements) {
+ try {
+ connection.createStatement().execute(statement);
+ LOG.info("Initializing Oracle session with SQL : " + statement);
+ } catch (Exception ex) {
+ LOG.error(String.format(
+ "An error occurred while attempting to execute "
+ + "the following Oracle session-initialization statement:"
+ + "\n%s" + "\nError:" + "\n%s", statement, ex.getMessage()));
+ }
+ }
+ }
+ }
+
+ protected static List<String> parseOraOopSessionInitializationStatements(
+ org.apache.hadoop.conf.Configuration conf) {
+
+ ArrayList<String> result = new ArrayList<String>();
+
+ if (conf == null) {
+ throw new IllegalArgumentException(
+ "No configuration argument must be specified.");
+ }
+
+ String sessionInitializationStatements =
+ conf.get(OraOopConstants.ORAOOP_SESSION_INITIALIZATION_STATEMENTS);
+ if (sessionInitializationStatements != null
+ && !sessionInitializationStatements.isEmpty()) {
+ String[] initializationStatements =
+ sessionInitializationStatements.split(";");
+ for (String initializationStatement : initializationStatements) {
+ initializationStatement = initializationStatement.trim();
+ if (initializationStatement != null
+ && !initializationStatement.isEmpty()
+ && !initializationStatement
+ .startsWith(OraOopConstants.Oracle.
+ ORACLE_SQL_STATEMENT_COMMENT_TOKEN)) {
+
+ LOG.debug(String
+ .format(
+ "initializationStatement (quoted & pre-expression "
+ + "evaluation) = \"%s\"",
+ initializationStatement));
+
+ initializationStatement =
+ OraOopUtilities.replaceConfigurationExpression(
+ initializationStatement, conf);
+
+ result.add(initializationStatement);
+ }
+ }
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6bfaa9d6/src/java/org/apache/sqoop/manager/oracle/OracleTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OracleTable.java b/src/java/org/apache/sqoop/manager/oracle/OracleTable.java
new file mode 100644
index 0000000..a0ecce1
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/oracle/OracleTable.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.oracle;
+
+/**
+ * Contains details about an Oracle table.
+ */
+public class OracleTable {
+
+ private String schema;
+ private String name;
+
+ public String getSchema() {
+ return schema;
+ }
+
+ private void setSchema(String newSchema) {
+ this.schema = newSchema;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ private void setName(String newName) {
+ this.name = newName;
+ }
+
+ public OracleTable() {
+
+ }
+
+ public OracleTable(String schema, String name) {
+
+ setSchema(schema);
+ setName(name);
+ }
+
+ public OracleTable(String name) {
+ setName(name);
+ }
+
+ @Override
+ public String toString() {
+ String result =
+ (getSchema() == null || getSchema().isEmpty()) ? "" : "\""
+ + getSchema() + "\".";
+ result += "\"" + getName() + "\"";
+ return result;
+ }
+
+}