You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/11/05 18:40:59 UTC
[5/6] sqoop git commit: SQOOP-2595: Add Oracle connector to Sqoop 2
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcLoader.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcLoader.java
new file mode 100644
index 0000000..b741dc8
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcLoader.java
@@ -0,0 +1,615 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumn;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumns;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.InsertMode;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.UpdateMode;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleVersion;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
+import org.apache.sqoop.schema.type.Column;
+import org.joda.time.LocalDateTime;
+
+public class OracleJdbcLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
+
+ private static final Logger LOG =
+ Logger.getLogger(OracleJdbcToDestroyer.class);
+
+ private long rowsWritten = 0;
+ private LoaderContext context;
+ private Connection connection;
+ private OracleVersion oracleVersion;
+ private OracleTable table; // <- If exporting into a partitioned
+ // table, this table will be unique for
+ // this mapper
+ private OracleTableColumns tableColumns; // <- The columns in the
+ // table we're inserting rows
+ // into
+ private int mapperId; // <- The index of this Hadoop mapper
+ private boolean tableHasMapperRowNumberColumn; // <- Whether the export
+ // table contain the column
+ // SQOOP_MAPPER_ROW
+ private long mapperRowNumber; // <- The 1-based row number being processed
+ // by this mapper. It's inserted into the
+ // "SQOOP_MAPPER_ROW" column
+ private boolean useAppendValuesOracleHint = false; // <- Whether to use the
+ // " /*+APPEND_VALUES*/ " hint
+ // within the Oracle SQL
+ // statement we generate
+ private long numberOfRowsSkipped; // <- The number of rows encountered
+ // during configurePreparedStatement()
+ // that had a NULL value for (one of) the
+ // update columns. This row was therefore
+ // skipped.
+ private String[] updateColumnNames;
+ private int rowsPerBatch;
+ private int rowsPerCommit;
+
+
+ private void setupInsert(LinkConfiguration linkConfiguration,
+ ToJobConfiguration jobConfiguration) throws SQLException {
+ // Is each mapper inserting rows into a unique table?...
+ InsertMode insertMode = OracleUtilities.getExportInsertMode(
+ jobConfiguration.toJobConfig, context.getContext());
+
+ if(insertMode==InsertMode.ExchangePartition) {
+ Object sysDateTime =
+ OracleUtilities.recallOracleDateTime(context.getContext());
+ table = OracleUtilities.generateExportTableMapperTableName(
+ mapperId, sysDateTime, null);
+
+ } else {
+ table = OracleUtilities.decodeOracleTableName(
+ linkConfiguration.connectionConfig.username,
+ jobConfiguration.toJobConfig.tableName);
+ }
+ tableColumns = OracleQueries.getToTableColumns(
+ connection, table, true, false);
+ tableHasMapperRowNumberColumn =
+ tableColumns.findColumnByName(
+ OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_MAPPER_ROW) != null;
+
+ // Should we use the APPEND_VALUES Oracle hint?...
+ useAppendValuesOracleHint = false;
+ if (insertMode == InsertMode.ExchangePartition) {
+ // NB: "Direct inserts" cannot utilize APPEND_VALUES, otherwise Oracle
+ // will serialize
+ // the N mappers, causing a lot of lock contention.
+ useAppendValuesOracleHint = canUseOracleAppendValuesHint();
+ }
+ }
+
+ private void setupUpdate(LinkConfiguration linkConfiguration,
+ ToJobConfiguration jobConfiguration) throws SQLException {
+ UpdateMode updateMode = OracleUtilities.getExportUpdateMode(
+ jobConfiguration.toJobConfig);
+
+ Object sysDateTime =
+ OracleUtilities.recallOracleDateTime(context.getContext());
+ table = OracleUtilities.generateExportTableMapperTableName(
+ mapperId, sysDateTime, null);
+
+ updateColumnNames = OracleUtilities.
+ getExportUpdateKeyColumnNames(jobConfiguration.toJobConfig);
+
+ tableColumns = OracleQueries.getToTableColumns(
+ connection, table, true, false);
+
+ if (updateMode == UpdateMode.Merge || updateMode == UpdateMode.Update) {
+ // Should we use the APPEND_VALUES Oracle hint?...
+ useAppendValuesOracleHint = canUseOracleAppendValuesHint();
+ }
+
+ }
+
+ @Override
+ public void load(LoaderContext context, LinkConfiguration linkConfiguration,
+ ToJobConfiguration jobConfiguration) throws Exception {
+ LOG.debug("Running Oracle JDBC connector loader");
+ this.context = context;
+
+ //TODO: Mapper ID
+ mapperId = 1;
+ //TODO: Hardcoded values
+ rowsPerBatch = 5000;
+ rowsPerCommit = 5000;
+
+ // Retrieve the JDBC URL that should be used by this mapper.
+ // We achieve this by modifying the JDBC URL property in the
+ // configuration, prior to the OraOopDBRecordWriter's (ancestral)
+ // constructor using the configuration to establish a connection
+ // to the database - via DBConfiguration.getConnection()...
+ String mapperJdbcUrlPropertyName =
+ OracleUtilities.getMapperJdbcUrlPropertyName(mapperId);
+
+ // Get this mapper's JDBC URL
+ String mapperJdbcUrl = context.getString(mapperJdbcUrlPropertyName, null);
+
+ LOG.debug(String.format("Mapper %d has a JDBC URL of: %s", mapperId,
+ mapperJdbcUrl == null ? "<null>" : mapperJdbcUrl));
+
+ connection = OracleConnectionFactory.createOracleJdbcConnection(
+ OracleJdbcConnectorConstants.ORACLE_JDBC_DRIVER_CLASS,
+ mapperJdbcUrl,
+ linkConfiguration.connectionConfig.username,
+ linkConfiguration.connectionConfig.password);
+ String thisOracleInstanceName =
+ OracleQueries.getCurrentOracleInstanceName(connection);
+ LOG.info(String.format(
+ "This record writer is connected to Oracle via the JDBC URL: \n"
+ + "\t\"%s\"\n" + "\tto the Oracle instance: \"%s\"", connection
+ .toString(), thisOracleInstanceName));
+ OracleConnectionFactory.initializeOracleConnection(
+ connection, linkConfiguration.connectionConfig);
+ connection.setAutoCommit(false);
+ oracleVersion = OracleQueries.getOracleVersion(connection);
+
+ if (jobConfiguration.toJobConfig.updateKey == null ||
+ jobConfiguration.toJobConfig.updateKey.isEmpty()) {
+ setupInsert(linkConfiguration, jobConfiguration);
+ } else {
+ setupUpdate(linkConfiguration, jobConfiguration);
+ }
+
+ // Has the user forced the use of APPEND_VALUES either on or off?...
+ useAppendValuesOracleHint =
+ allowUserToOverrideUseOfTheOracleAppendValuesHint(
+ jobConfiguration.toJobConfig,
+ useAppendValuesOracleHint);
+
+ insertData();
+ connection.close();
+ }
+
+ @Override
+ public long getRowsWritten() {
+ return rowsWritten;
+ }
+
+ private void insertData() throws Exception {
+ // If using APPEND_VALUES, check the batch size and commit frequency...
+ if (useAppendValuesOracleHint) {
+ if(rowsPerBatch < OracleJdbcConnectorConstants.
+ MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT) {
+ LOG.info(String
+ .format(
+ "The number of rows per batch-insert has been changed from %d "
+ + "to %d. This is in response "
+ + "to the Oracle APPEND_VALUES hint being used.",
+ rowsPerBatch, OracleJdbcConnectorConstants.
+ MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT));
+ rowsPerBatch = OracleJdbcConnectorConstants.
+ MIN_APPEND_VALUES_BATCH_SIZE_DEFAULT;
+ }
+ // Need to commit after each batch when using APPEND_VALUES
+ if(rowsPerCommit!=rowsPerBatch) {
+ LOG.info(String
+ .format(
+ "The number of rows to insert per commit has been "
+ + "changed from %d to %d. This is in response "
+ + "to the Oracle APPEND_VALUES hint being used.",
+ rowsPerCommit, rowsPerBatch));
+ rowsPerCommit = rowsPerBatch;
+ }
+ }
+
+ mapperRowNumber = 1;
+
+ String sql = getBatchInsertSqlStatement(useAppendValuesOracleHint
+ ? "/*+APPEND_VALUES*/" : "");
+ PreparedStatement statement = connection.prepareStatement(sql);
+
+ Column[] columns = context.getSchema().getColumnsArray();
+ Object[] array;
+ boolean checkUpdateColumns = false;
+ List<Integer> updateColumnIndexes = null;
+ if(updateColumnNames!=null) {
+ checkUpdateColumns = true;
+ updateColumnIndexes = new ArrayList<Integer>();
+ for (int idx = 0; idx < this.updateColumnNames.length; idx++) {
+ for (int i = 0; i < columns.length; i++) {
+ if(columns[i].getName().equals(updateColumnNames[idx])) {
+ updateColumnIndexes.add(i);
+ }
+ }
+ }
+ }
+
+ while ((array = context.getDataReader().readArrayRecord()) != null) {
+ if(checkUpdateColumns) {
+ boolean updateKeyValueIsNull = false;
+ for (Integer i : updateColumnIndexes) {
+ Object updateKeyValue = array[i];
+ if (updateKeyValue == null) {
+ this.numberOfRowsSkipped++;
+ updateKeyValueIsNull = true;
+ break;
+ }
+ }
+
+ if (updateKeyValueIsNull) {
+ continue;
+ }
+ }
+ rowsWritten++;
+ configurePreparedStatementColumns(statement, columns, array);
+ if(rowsWritten % rowsPerBatch == 0) {
+ statement.executeBatch();
+ }
+ if(rowsWritten % rowsPerCommit == 0) {
+ connection.commit();
+ }
+ }
+ if(rowsWritten % rowsPerBatch != 0) {
+ statement.executeBatch();
+ }
+ connection.commit();
+ statement.close();
+
+ if (numberOfRowsSkipped > 0) {
+ LOG.warn(String.format(
+ "%d records were skipped due to a NULL value within one of the "
+ + "update-key column(s).\nHaving a NULL value prevents a record "
+ + "from being able to be matched to a row in the Oracle table.",
+ numberOfRowsSkipped));
+ }
+ }
+
+ private String getBatchInsertSqlStatement(String oracleHint) {
+
+ // String[] columnNames = this.getColumnNames();
+ StringBuilder sqlNames = new StringBuilder();
+ StringBuilder sqlValues = new StringBuilder();
+
+ /*
+ * NOTE: "this.oracleTableColumns" may contain a different list of columns
+ * than "this.getColumnNames()". This is because: (1)
+ * "this.getColumnNames()" includes columns with data-types that are not
+ * supported by OraOop. (2) "this.oracleTableColumns" includes any
+ * pseudo-columns that we've added to the export table (and don't exist in
+ * the HDFS file being read). For example, if exporting to a partitioned
+ * table (that OraOop created), there are two pseudo-columns we added to
+ * the table to identify the export job and the mapper.
+ */
+
+ int colCount = 0;
+ for (int idx = 0; idx < this.tableColumns.size(); idx++) {
+ OracleTableColumn oracleTableColumn = this.tableColumns.get(idx);
+ String columnName = oracleTableColumn.getName();
+
+ // column names...
+ if (colCount > 0) {
+ sqlNames.append("\n,");
+ }
+ sqlNames.append(columnName);
+
+ // column values...
+ if (colCount > 0) {
+ sqlValues.append("\n,");
+ }
+
+ String pseudoColumnValue =
+ generateInsertValueForPseudoColumn(columnName);
+
+ String bindVarName = null;
+
+ if (pseudoColumnValue != null) {
+ bindVarName = pseudoColumnValue;
+ } else if (oracleTableColumn.getOracleType() == OracleQueries
+ .getOracleType("STRUCT")) {
+ if (oracleTableColumn.getDataType().equals(
+ OracleJdbcConnectorConstants.Oracle.URITYPE)) {
+ bindVarName =
+ String.format("urifactory.getUri(%s)",
+ columnNameToBindVariable(columnName));
+ }
+ //TODO: Date as string?
+ /*} else if (getConf().getBoolean(
+ OraOopConstants.ORAOOP_MAP_TIMESTAMP_AS_STRING,
+ OraOopConstants.ORAOOP_MAP_TIMESTAMP_AS_STRING_DEFAULT)) {
+ if (oracleTableColumn.getOracleType() == OraOopOracleQueries
+ .getOracleType("DATE")) {
+ bindVarName =
+ String.format("to_date(%s, 'yyyy-mm-dd hh24:mi:ss')",
+ columnNameToBindVariable(columnName));
+ } else if (oracleTableColumn.getOracleType() == OraOopOracleQueries
+ .getOracleType("TIMESTAMP")) {
+ bindVarName =
+ String.format("to_timestamp(%s, 'yyyy-mm-dd hh24:mi:ss.ff')",
+ columnNameToBindVariable(columnName));
+ } else if (oracleTableColumn.getOracleType() == OraOopOracleQueries
+ .getOracleType("TIMESTAMPTZ")) {
+ bindVarName =
+ String.format(
+ "to_timestamp_tz(%s, 'yyyy-mm-dd hh24:mi:ss.ff TZR')",
+ columnNameToBindVariable(columnName));
+ } else if (oracleTableColumn.getOracleType() == OraOopOracleQueries
+ .getOracleType("TIMESTAMPLTZ")) {
+ bindVarName =
+ String.format(
+ "to_timestamp_tz(%s, 'yyyy-mm-dd hh24:mi:ss.ff TZR')",
+ columnNameToBindVariable(columnName));
+ }*/
+ }
+
+ if (bindVarName == null) {
+ bindVarName = columnNameToBindVariable(columnName);
+ }
+
+ sqlValues.append(bindVarName);
+
+ colCount++;
+ }
+
+ String sql =
+ String.format("insert %s into %s\n" + "(%s)\n" + "values\n"
+ + "(%s)\n", oracleHint, this.table.toString(), sqlNames
+ .toString(), sqlValues.toString());
+
+ LOG.info("Batch-Mode insert statement:\n" + sql);
+ return sql;
+ }
+
+ private String generateInsertValueForPseudoColumn(String columnName) {
+
+ if (columnName.equalsIgnoreCase(
+ OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_PARTITION)) {
+
+ String partitionValueStr =
+ context.getString(
+ OracleJdbcConnectorConstants.ORAOOP_EXPORT_PARTITION_DATE_VALUE);
+ if (partitionValueStr == null) {
+ throw new RuntimeException(
+ "Unable to recall the value of the partition date-time.");
+ }
+
+ return String.format("to_date('%s', '%s')", partitionValueStr,
+ OracleJdbcConnectorConstants.ORAOOP_EXPORT_PARTITION_DATE_FORMAT);
+ }
+
+ if (columnName.equalsIgnoreCase(
+ OracleJdbcConnectorConstants.COLUMN_NAME_EXPORT_SUBPARTITION)) {
+ return Integer.toString(this.mapperId);
+ }
+
+ return null;
+ }
+
+ private String columnNameToBindVariable(String columnName) {
+ return ":" + columnName;
+ }
+
+ private void configurePreparedStatementColumns(
+ PreparedStatement statement, Column[] columns, Object[] array)
+ throws SQLException {
+
+ String bindValueName;
+
+ if (this.tableHasMapperRowNumberColumn) {
+ bindValueName = columnNameToBindVariable(OracleJdbcConnectorConstants.
+ COLUMN_NAME_EXPORT_MAPPER_ROW).replaceFirst(":", "");
+ try {
+ OracleQueries.setLongAtName(statement, bindValueName,
+ this.mapperRowNumber);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ this.mapperRowNumber++;
+ }
+
+ for (int i = 0; i < array.length; i++) {
+ String colName = columns[i].getName();
+ bindValueName = columnNameToBindVariable(colName).replaceFirst(":", "");
+ OracleTableColumn oracleTableColumn =
+ tableColumns.findColumnByName(colName);
+ setBindValueAtName(statement, bindValueName, array[i],
+ oracleTableColumn);
+ }
+ statement.addBatch();
+ }
+
+ private void setBindValueAtName(PreparedStatement statement,
+ String bindValueName, Object bindValue, OracleTableColumn column)
+ throws SQLException {
+ if (column.getOracleType()
+ == OracleQueries.getOracleType("NUMBER")) {
+ OracleQueries.setBigDecimalAtName(statement, bindValueName,
+ (BigDecimal) bindValue);
+ } else if (column.getOracleType() == OracleQueries
+ .getOracleType("VARCHAR")) {
+ OracleQueries.setStringAtName(statement, bindValueName,
+ (String) bindValue);
+ } else if (column.getOracleType() == OracleQueries
+ .getOracleType("TIMESTAMP")
+ || column.getOracleType() == OracleQueries
+ .getOracleType("TIMESTAMPTZ")
+ || column.getOracleType() == OracleQueries
+ .getOracleType("TIMESTAMPLTZ")) {
+ Object objValue = bindValue;
+ if (objValue instanceof LocalDateTime) {
+ //TODO: Improve date handling
+ LocalDateTime value = (LocalDateTime) objValue;
+ Timestamp timestampValue =
+ new Timestamp(value.toDateTime().getMillis());
+ OracleQueries.setTimestampAtName(statement, bindValueName,
+ timestampValue);
+ } else {
+ String value = (String) objValue;
+
+ if (value == null || value.equalsIgnoreCase("null")) {
+ value = "";
+ }
+
+ OracleQueries.setStringAtName(statement, bindValueName, value);
+ }
+ } else if (column.getOracleType() == OracleQueries
+ .getOracleType("BINARY_DOUBLE")) {
+ Double value = (Double) bindValue;
+ if (value != null) {
+ OracleQueries.setBinaryDoubleAtName(statement, bindValueName,
+ value);
+ } else {
+ OracleQueries.setObjectAtName(statement, bindValueName, null);
+ }
+ } else if (column.getOracleType() == OracleQueries
+ .getOracleType("BINARY_FLOAT")) {
+ Float value = (Float) bindValue;
+ if (value != null) {
+ OracleQueries.setBinaryFloatAtName(statement, bindValueName,
+ value);
+ } else {
+ OracleQueries.setObjectAtName(statement, bindValueName, null);
+ }
+ } else if (column.getOracleType() == OracleQueries
+ .getOracleType("STRUCT")) { // <- E.g. URITYPE
+ if (column.getDataType().equals(
+ OracleJdbcConnectorConstants.Oracle.URITYPE)) {
+ String value = (String) bindValue;
+ OracleQueries.setStringAtName(statement, bindValueName, value);
+ } else {
+ String msg =
+ String.format(
+ "%s needs to be updated to cope with the data-type: %s "
+ + "where the Oracle data_type is \"%s\".",
+ OracleUtilities.getCurrentMethodName(), column.getDataType(),
+ column.getOracleType());
+ LOG.error(msg);
+ throw new UnsupportedOperationException(msg);
+ }
+ } else {
+ // LOB data-types are currently not supported during
+ // a Sqoop Export.
+ // JIRA: SQOOP-117
+ // OraOopConstants.SUPPORTED_EXPORT_ORACLE_DATA_TYPES_CLAUSE
+ // will already have excluded all LOB columns.
+
+ // case oracle.jdbc.OracleTypes.CLOB:
+ // {
+ // oracle.sql.CLOB clob = new
+ // oracle.sql.CLOB(connection);
+ // Object value = fieldMap.get(colName);
+ // //clob.set
+ // statement.setCLOBAtName(bindValueName, clob);
+ // break;
+ // }
+ String msg =
+ String.format(
+ "%s may need to be updated to cope with the data-type: %s",
+ OracleUtilities.getCurrentMethodName(), column.getOracleType());
+ LOG.debug(msg);
+
+ OracleQueries
+ .setObjectAtName(statement, bindValueName, bindValue);
+ }
+ }
+
+ private boolean canUseOracleAppendValuesHint() {
+
+ // Should we use the APPEND_VALUES Oracle hint?...
+ // (Yes, if this is Oracle 11.2 or above)...
+ boolean result = oracleVersion.isGreaterThanOrEqualTo(11, 2, 0, 0);
+
+ // If there is a BINARY_DOUBLE or BINARY_FLOAT column, then we'll avoid
+ // using
+ // the APPEND_VALUES hint. If there is a NULL in the HDFS file, then we'll
+ // encounter
+ // "ORA-12838: cannot read/modify an object after modifying it in parallel"
+ // due to the JDBC driver issuing the INSERT statement twice to the database
+ // without a COMMIT in between (as was observed via WireShark).
+ // We're not sure why this happens - we just know how to avoid it.
+ if (result) {
+ boolean binaryDoubleColumnExists = false;
+ boolean binaryFloatColumnExists = false;
+ for (int idx = 0; idx < this.tableColumns.size(); idx++) {
+ OracleTableColumn oracleTableColumn = this.tableColumns.get(idx);
+ if(oracleTableColumn.getOracleType()==
+ OracleQueries.getOracleType("BINARY_DOUBLE")) {
+ binaryDoubleColumnExists = true;
+ }
+ if(oracleTableColumn.getOracleType()==
+ OracleQueries.getOracleType("BINARY_FLOAT")) {
+ binaryFloatColumnExists = true;
+ }
+ }
+
+ if (binaryDoubleColumnExists || binaryFloatColumnExists) {
+ result = false;
+ LOG.info("The APPEND_VALUES Oracle hint will not be used for the "
+ + "INSERT SQL statement, as the Oracle table "
+ + "contains either a BINARY_DOUBLE or BINARY_FLOAT column.");
+ }
+ }
+
+ return result;
+ }
+
+ protected boolean allowUserToOverrideUseOfTheOracleAppendValuesHint(
+ ToJobConfig jobConfig, boolean useAppendValuesOracleHint) {
+
+ boolean result = useAppendValuesOracleHint;
+
+ // Has the user forced the use of APPEND_VALUES either on or off?...
+ switch (OracleUtilities.getOracleAppendValuesHintUsage(jobConfig)) {
+
+ case OFF:
+ result = false;
+ LOG.debug(String
+ .format(
+ "Use of the APPEND_VALUES Oracle hint has been forced OFF. "
+ + "(It was %s to used).",
+ useAppendValuesOracleHint ? "going" : "not going"));
+ break;
+
+ case ON:
+ result = true;
+ LOG.debug(String
+ .format(
+ "Use of the APPEND_VALUES Oracle hint has been forced ON. "
+ + "(It was %s to used).",
+ useAppendValuesOracleHint ? "going" : "not going"));
+ break;
+
+ case AUTO:
+ LOG.debug(String.format("The APPEND_VALUES Oracle hint %s be used.",
+ result ? "will" : "will not"));
+ break;
+
+ default:
+ throw new RuntimeException("Invalid value for APPEND_VALUES.");
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartition.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartition.java
new file mode 100644
index 0000000..9aeacaf
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartition.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleDataChunk;
+import org.apache.sqoop.job.etl.Partition;
+
+public class OracleJdbcPartition extends Partition {
+
+
+ private int splitId;
+ private double totalNumberOfBlocksInAllSplits;
+ private String splitLocation;
+ private List<OracleDataChunk> oracleDataChunks;
+
+ // NB: Update write(), readFields() and getDebugDetails() if you add fields
+ // here.
+
+ public OracleJdbcPartition() {
+
+ this.splitId = -1;
+ this.splitLocation = "";
+ this.oracleDataChunks = new ArrayList<OracleDataChunk>();
+ }
+
+ public OracleJdbcPartition(List<OracleDataChunk> dataChunks) {
+
+ setOracleDataChunks(dataChunks);
+ }
+
+ public void setOracleDataChunks(List<OracleDataChunk> dataChunks) {
+
+ this.oracleDataChunks = dataChunks;
+ }
+
+ public List<OracleDataChunk> getDataChunks() {
+
+ return this.oracleDataChunks;
+ }
+
+ public int getNumberOfDataChunks() {
+
+ if (this.getDataChunks() == null) {
+ return 0;
+ } else {
+ return this.getDataChunks().size();
+ }
+ }
+
+ /**
+ * @return The total number of blocks within the data-chunks of this split
+ */
+ public long getLength() {
+
+ return this.getTotalNumberOfBlocksInThisSplit();
+ }
+
+ public int getTotalNumberOfBlocksInThisSplit() {
+
+ if (this.getNumberOfDataChunks() == 0) {
+ return 0;
+ }
+
+ int result = 0;
+ for (OracleDataChunk dataChunk : this.getDataChunks()) {
+ result += dataChunk.getNumberOfBlocks();
+ }
+
+ return result;
+ }
+
+ public OracleDataChunk findDataChunkById(String id) {
+
+ for (OracleDataChunk dataChunk : this.getDataChunks()) {
+ if (dataChunk.getId().equals(id)) {
+ return dataChunk;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public void write(DataOutput output) throws IOException {
+
+ output.writeInt(splitId);
+
+ if (this.oracleDataChunks == null) {
+ output.writeInt(0);
+ } else {
+ output.writeInt(this.oracleDataChunks.size());
+ for (OracleDataChunk dataChunk : this.oracleDataChunks) {
+ output.writeUTF(dataChunk.getClass().getName());
+ dataChunk.write(output);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ /** {@inheritDoc} */
+ public void readFields(DataInput input) throws IOException {
+
+ this.splitId = input.readInt();
+
+ int dataChunkCount = input.readInt();
+ if (dataChunkCount == 0) {
+ this.oracleDataChunks = null;
+ } else {
+ Class<? extends OracleDataChunk> dataChunkClass;
+ OracleDataChunk dataChunk;
+ this.oracleDataChunks =
+ new ArrayList<OracleDataChunk>(dataChunkCount);
+ for (int idx = 0; idx < dataChunkCount; idx++) {
+ try {
+ dataChunkClass =
+ (Class<? extends OracleDataChunk>) Class.forName(input.readUTF());
+ dataChunk = dataChunkClass.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ dataChunk.readFields(input);
+ this.oracleDataChunks.add(dataChunk);
+ }
+ }
+ }
+
+ public String toString() {
+
+ StringBuilder result = new StringBuilder();
+
+ if (this.getNumberOfDataChunks() == 0) {
+ result.append(String.format(
+ "Split[%s] does not contain any Oracle data-chunks.", this.splitId));
+ } else {
+ result.append(String.format(
+ "Split[%s] includes the Oracle data-chunks:\n", this.splitId));
+ for (OracleDataChunk dataChunk : getDataChunks()) {
+ result.append(dataChunk.toString());
+ }
+ }
+ return result.toString();
+ }
+
+ protected int getSplitId() {
+ return this.splitId;
+ }
+
+ protected void setSplitId(int newSplitId) {
+ this.splitId = newSplitId;
+ }
+
+ protected void setSplitLocation(String newSplitLocation) {
+ this.splitLocation = newSplitLocation;
+ }
+
+ protected void setTotalNumberOfBlocksInAllSplits(
+ int newTotalNumberOfBlocksInAllSplits) {
+ this.totalNumberOfBlocksInAllSplits = newTotalNumberOfBlocksInAllSplits;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartitioner.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartitioner.java
new file mode 100644
index 0000000..00c7752
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcPartitioner.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleDataChunk;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
+
+public class OracleJdbcPartitioner extends
+ Partitioner<LinkConfiguration, FromJobConfiguration> {
+
+ private static final Logger LOG =
+ Logger.getLogger(OracleJdbcPartitioner.class);
+
+ @Override
+ public List<Partition> getPartitions(PartitionerContext context,
+ LinkConfiguration linkConfiguration,
+ FromJobConfiguration jobConfiguration) {
+ try {
+ Connection connection = OracleConnectionFactory.makeConnection(
+ linkConfiguration.connectionConfig);
+ OracleTable table = OracleUtilities.decodeOracleTableName(
+ linkConfiguration.connectionConfig.username,
+ jobConfiguration.fromJobConfig.tableName);
+
+ long desiredNumberOfMappers = context.getMaxPartitions();
+ List<String> partitionList = getPartitionList(
+ jobConfiguration.fromJobConfig);
+
+ List<Partition> splits = null;
+ try {
+ OracleConnectionFactory.initializeOracleConnection(connection,
+ linkConfiguration.connectionConfig);
+
+ // The number of chunks generated will *not* be a multiple of the number
+ // of splits,
+ // to ensure that each split doesn't always get data from the start of
+ // each data-file...
+ long numberOfChunksPerOracleDataFile = (desiredNumberOfMappers * 2) + 1;
+
+ // Get the Oracle data-chunks for the table...
+ List<? extends OracleDataChunk> dataChunks;
+ if (OracleUtilities.getOraOopOracleDataChunkMethod(
+ jobConfiguration.fromJobConfig).equals(
+ OracleUtilities.OracleDataChunkMethod.PARTITION)) {
+ dataChunks =
+ OracleQueries.getOracleDataChunksPartition(connection, table,
+ partitionList);
+ } else {
+ dataChunks =
+ OracleQueries.getOracleDataChunksExtent(connection, table,
+ partitionList, numberOfChunksPerOracleDataFile);
+ }
+
+ if (dataChunks.size() == 0) {
+ String errMsg;
+ if (OracleUtilities.getOraOopOracleDataChunkMethod(
+ jobConfiguration.fromJobConfig).equals(
+ OracleUtilities.OracleDataChunkMethod.PARTITION)) {
+ errMsg =
+ String
+ .format(
+ "The table %s does not contain any partitions and you "
+ + "have specified to chunk the table by partitions.",
+ table.getName());
+ } else {
+ errMsg =
+ String.format("The table %s does not contain any data.", table
+ .getName());
+ }
+ LOG.fatal(errMsg);
+ throw new RuntimeException(errMsg);
+ } else {
+ OracleUtilities.OracleBlockToSplitAllocationMethod
+ blockAllocationMethod = OracleUtilities
+ .getOracleBlockToSplitAllocationMethod(
+ jobConfiguration.fromJobConfig,
+ OracleUtilities.
+ OracleBlockToSplitAllocationMethod.ROUNDROBIN);
+
+ // Group the Oracle data-chunks into splits...
+ splits =
+ groupTableDataChunksIntoSplits(dataChunks, desiredNumberOfMappers,
+ blockAllocationMethod);
+
+ /*String oraoopLocations =
+ jobContext.getConfiguration().get("oraoop.locations", "");
+ String[] locations = oraoopLocations.split(",");
+ for (int idx = 0; idx < locations.length; idx++) {
+ if (idx < splits.size()) {
+ String location = locations[idx].trim();
+ if (!location.isEmpty()) {
+ ((OraOopDBInputSplit) splits.get(idx)).setSplitLocation(location);
+
+ LOG.info(String
+ .format("Split[%d] has been assigned location \"%s\".", idx,
+ location));
+ }
+ }
+ }*/
+
+ }
+ } catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ return splits;
+ } catch (SQLException ex) {
+ throw new RuntimeException(String.format(
+ "Unable to connect to the Oracle database at %s\nError:%s",
+ linkConfiguration.connectionConfig.connectionString, ex
+ .getMessage()), ex);
+ }
+ }
+
+ private List<String> getPartitionList(FromJobConfig jobConfig) {
+ LOG.debug("Partition list = " + jobConfig.partitionList);
+ List<String> result =
+ OracleUtilities.splitOracleStringList(jobConfig.partitionList);
+ if (result != null && result.size() > 0) {
+ LOG.debug("Partition filter list: " + result.toString());
+ }
+ return result;
+ }
+
+ protected static
+ List<Partition> groupTableDataChunksIntoSplits(
+ List<? extends OracleDataChunk> dataChunks,
+ long desiredNumberOfSplits,
+ OracleUtilities.OracleBlockToSplitAllocationMethod
+ blockAllocationMethod) {
+
+ long numberOfDataChunks = dataChunks.size();
+ long actualNumberOfSplits =
+ Math.min(numberOfDataChunks, desiredNumberOfSplits);
+ long totalNumberOfBlocksInAllDataChunks = 0;
+ for (OracleDataChunk dataChunk : dataChunks) {
+ totalNumberOfBlocksInAllDataChunks += dataChunk.getNumberOfBlocks();
+ }
+
+ String debugMsg = String.format(
+ "The table being imported by sqoop has %d blocks "
+ + "that have been divided into %d chunks "
+ + "which will be processed in %d splits. "
+ + "The chunks will be allocated to the splits using the method : %s",
+ totalNumberOfBlocksInAllDataChunks, numberOfDataChunks,
+ actualNumberOfSplits, blockAllocationMethod.toString());
+ LOG.info(debugMsg);
+
+ List<Partition> splits =
+ new ArrayList<Partition>((int) actualNumberOfSplits);
+
+ for (int i = 0; i < actualNumberOfSplits; i++) {
+ OracleJdbcPartition split = new OracleJdbcPartition();
+ //split.setSplitId(i);
+ //split.setTotalNumberOfBlocksInAllSplits(
+ // totalNumberOfBlocksInAllDataChunks);
+ splits.add(split);
+ }
+
+ switch (blockAllocationMethod) {
+
+ case RANDOM:
+ // Randomize the order of the data chunks and then "fall through" into
+ // the ROUNDROBIN block below...
+ Collections.shuffle(dataChunks);
+
+ // NB: No "break;" statement here - we're intentionally falling into the
+ // ROUNDROBIN block below...
+
+ //$FALL-THROUGH$
+ case ROUNDROBIN:
+ int idxSplitRoundRobin = 0;
+ for (OracleDataChunk dataChunk : dataChunks) {
+
+ if (idxSplitRoundRobin >= splits.size()) {
+ idxSplitRoundRobin = 0;
+ }
+ OracleJdbcPartition split =
+ (OracleJdbcPartition) splits.get(idxSplitRoundRobin++);
+
+ split.getDataChunks().add(dataChunk);
+ }
+ break;
+
+ case SEQUENTIAL:
+ double dataChunksPerSplit = dataChunks.size() / (double) splits.size();
+ int dataChunksAllocatedToSplits = 0;
+
+ int idxSplitSeq = 0;
+ for (OracleDataChunk dataChunk : dataChunks) {
+
+ OracleJdbcPartition split =
+ (OracleJdbcPartition) splits.get(idxSplitSeq);
+ split.getDataChunks().add(dataChunk);
+
+ dataChunksAllocatedToSplits++;
+
+ if (dataChunksAllocatedToSplits
+ >= (dataChunksPerSplit * (idxSplitSeq + 1))
+ && idxSplitSeq < splits.size()) {
+ idxSplitSeq++;
+ }
+ }
+ break;
+
+ default:
+ throw new RuntimeException("Block allocation method not implemented.");
+
+ }
+
+ if (LOG.isDebugEnabled()) {
+ for (int idx = 0; idx < splits.size(); idx++) {
+ LOG.debug("\n\t"
+ + splits.get(idx).toString());
+ }
+ }
+
+ return splits;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToDestroyer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToDestroyer.java
new file mode 100644
index 0000000..8429a38
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToDestroyer.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ConnectionConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries.CreateExportChangesTableOptions;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTableColumns;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.ExportTableUpdateTechnique;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.InsertMode;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.UpdateMode;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class OracleJdbcToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> {
+
+ private static final Logger LOG =
+ Logger.getLogger(OracleJdbcToDestroyer.class);
+
+ protected Connection connection;
+ protected OracleTable table;
+ protected int numMappers = 8;
+
+ private void connect(ConnectionConfig connectionConfig) {
+ try {
+ connection = OracleConnectionFactory.makeConnection(connectionConfig);
+ } catch (SQLException ex) {
+ throw new RuntimeException(String.format(
+ "Unable to connect to the Oracle database at %s\n"
+ + "Error:%s", connectionConfig.connectionString, ex
+ .getMessage()), ex);
+ }
+ }
+
+ @Override
+ public void destroy(DestroyerContext context,
+ LinkConfiguration linkConfiguration,
+ ToJobConfiguration jobConfiguration) {
+ LOG.debug("Running Oracle JDBC connector destroyer");
+
+ table = OracleUtilities.decodeOracleTableName(
+ linkConfiguration.connectionConfig.username,
+ jobConfiguration.toJobConfig.tableName);
+
+ if (jobConfiguration.toJobConfig.updateKey == null ||
+ jobConfiguration.toJobConfig.updateKey.isEmpty()) {
+
+ // Is each mapper inserting rows into a unique table?...
+ InsertMode insertMode = OracleUtilities.getExportInsertMode(
+ jobConfiguration.toJobConfig, context.getContext());
+
+ if(insertMode==InsertMode.ExchangePartition) {
+ connect(linkConfiguration.connectionConfig);
+ Object sysDateTime =
+ OracleUtilities.recallOracleDateTime(context.getContext());
+
+ exchangePartitionUniqueMapperTableDataIntoMainExportTable(sysDateTime);
+
+ }
+
+ } else {
+ connect(linkConfiguration.connectionConfig);
+ Object sysDateTime =
+ OracleUtilities.recallOracleDateTime(context.getContext());
+ try {
+ updateMainExportTableFromUniqueMapperTable(jobConfiguration.toJobConfig,
+ context.getContext(), sysDateTime);
+ } catch(SQLException e) {
+ throw new RuntimeException(
+ String.format(
+ "Unable to update the table %s.",table.toString()), e);
+ }
+ }
+ }
+
+ private void exchangePartitionUniqueMapperTableDataIntoMainExportTable(
+ Object sysDateTime) {
+
+ for(int i=0; i<numMappers; i++) {
+ long start = System.nanoTime();
+
+ OracleTable mapperTable =
+ OracleUtilities.generateExportTableMapperTableName(
+ i, sysDateTime, null);
+
+ String subPartitionName =
+ OracleUtilities.generateExportTableSubPartitionName(
+ i, sysDateTime);
+
+ try {
+ OracleQueries.exchangeSubpartition(connection,
+ table, subPartitionName, mapperTable);
+
+ double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
+ LOG.info(String
+ .format(
+ "Time spent performing an \"exchange subpartition with "
+ + "table\": %f sec.",
+ timeInSec));
+
+ LOG.debug(String.format("Dropping temporary mapper table %s",
+ mapperTable.toString()));
+ OracleQueries.dropTable(connection, mapperTable);
+ } catch (SQLException ex) {
+ throw new RuntimeException(
+ String
+ .format(
+ "Unable to perform an \"exchange subpartition\" operation "
+ + "for the table %s, for the subpartition named "
+ + "\"%s\" with the table named \"%s\".",
+ table.toString(), subPartitionName,
+ mapperTable.toString()), ex);
+ }
+ }
+ }
+
+ private void updateMainExportTableFromUniqueMapperTable(ToJobConfig jobConfig,
+ ImmutableContext context, Object sysDateTime)
+ throws SQLException {
+
+ String[] updateColumnNames = OracleUtilities.
+ getExportUpdateKeyColumnNames(jobConfig);
+
+ OracleTableColumns tableColumns = OracleQueries.getToTableColumns(
+ connection, table, true, false);
+
+ UpdateMode updateMode = OracleUtilities.getExportUpdateMode(jobConfig);
+
+ ExportTableUpdateTechnique exportTableUpdateTechnique =
+ OracleUtilities.getExportTableUpdateTechnique(context, updateMode);
+
+ CreateExportChangesTableOptions changesTableOptions;
+ boolean parallelizationEnabled =
+ OracleUtilities.enableOracleParallelProcessingDuringExport(jobConfig);
+
+ switch (exportTableUpdateTechnique) {
+
+ case ReInsertUpdatedRows:
+ case UpdateSql:
+ changesTableOptions =
+ CreateExportChangesTableOptions.OnlyRowsThatDiffer;
+ break;
+
+ case ReInsertUpdatedRowsAndNewRows:
+ case MergeSql:
+ changesTableOptions =
+ CreateExportChangesTableOptions.RowsThatDifferPlusNewRows;
+ break;
+
+ default:
+ throw new RuntimeException(String.format(
+ "Update %s to cater for the ExportTableUpdateTechnique \"%s\".",
+ OracleUtilities.getCurrentMethodName(),
+ exportTableUpdateTechnique.toString()));
+ }
+
+ String temporaryTableStorageClause =
+ OracleUtilities.getTemporaryTableStorageClause(jobConfig);
+
+ for(int i=0; i<numMappers; i++) {
+
+ OracleTable mapperTable =
+ OracleUtilities.generateExportTableMapperTableName(
+ i, sysDateTime, null);
+
+ OracleTable changesTable =
+ OracleUtilities.generateExportTableMapperTableName(Integer
+ .toString(i) + "_CHG", sysDateTime, null);
+
+ try {
+ int changeTableRowCount =
+ OracleQueries.createExportChangesTable(connection,
+ changesTable, temporaryTableStorageClause, mapperTable,
+ table, updateColumnNames, changesTableOptions,
+ parallelizationEnabled);
+
+ if (changeTableRowCount == 0) {
+ LOG.debug(String.format(
+ "The changes-table does not contain any rows. %s is now exiting.",
+ OracleUtilities.getCurrentMethodName()));
+ continue;
+ }
+
+ switch (exportTableUpdateTechnique) {
+
+ case ReInsertUpdatedRows:
+ case ReInsertUpdatedRowsAndNewRows:
+
+ OracleQueries.deleteRowsFromTable(connection,
+ table, changesTable, updateColumnNames,
+ parallelizationEnabled);
+
+ OracleQueries.insertRowsIntoExportTable(connection,
+ table, changesTable, sysDateTime, i,
+ parallelizationEnabled);
+ break;
+
+ case UpdateSql:
+
+ long start = System.nanoTime();
+
+ OracleQueries.updateTable(connection, table,
+ changesTable, updateColumnNames, tableColumns, sysDateTime, i,
+ parallelizationEnabled);
+
+ double timeInSec = (System.nanoTime() - start) / Math.pow(10, 9);
+ LOG.info(String.format("Time spent performing an update: %f sec.",
+ timeInSec));
+ break;
+
+ case MergeSql:
+
+ long mergeStart = System.nanoTime();
+
+ OracleQueries.mergeTable(connection, table,
+ changesTable, updateColumnNames, tableColumns, sysDateTime,
+ i, parallelizationEnabled);
+
+ double mergeTimeInSec = (System.nanoTime() - mergeStart)
+ / Math.pow(10, 9);
+ LOG.info(String.format("Time spent performing a merge: %f sec.",
+ mergeTimeInSec));
+
+ break;
+
+ default:
+ throw new RuntimeException(
+ String.format(
+ "Update %s to cater for the ExportTableUpdateTechnique \"%s\".",
+ OracleUtilities.getCurrentMethodName(),
+ exportTableUpdateTechnique.toString()));
+ }
+
+ connection.commit();
+ } catch (SQLException ex) {
+ connection.rollback();
+ throw ex;
+ } finally {
+ OracleQueries.dropTable(connection, changesTable);
+ LOG.debug(String.format("Dropping temporary mapper table %s",
+ mapperTable.toString()));
+ OracleQueries.dropTable(connection, mapperTable);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToInitializer.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToInitializer.java
new file mode 100644
index 0000000..f1d92f0
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/OracleJdbcToInitializer.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ConnectionConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfig;
+import org.apache.sqoop.connector.jdbc.oracle.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleQueries;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTable;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTablePartition;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleTablePartitions;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities;
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleUtilities.UpdateMode;
+import org.apache.sqoop.job.etl.InitializerContext;
+
+public class OracleJdbcToInitializer extends
+ OracleJdbcCommonInitializer<ToJobConfiguration> {
+
+ private static final Logger LOG =
+ Logger.getLogger(OracleJdbcToInitializer.class);
+
+ @Override
+ public void connect(InitializerContext context,
+ LinkConfiguration linkConfiguration, ToJobConfiguration jobConfiguration)
+ throws SQLException {
+ super.connect(context, linkConfiguration, jobConfiguration);
+ table = OracleUtilities.decodeOracleTableName(
+ linkConfiguration.connectionConfig.username,
+ jobConfiguration.toJobConfig.tableName);
+ }
+
+ @Override
+ public void initialize(InitializerContext context,
+ LinkConfiguration linkConfiguration,
+ ToJobConfiguration jobConfiguration) {
+ super.initialize(context, linkConfiguration, jobConfiguration);
+ LOG.debug("Running Oracle JDBC connector initializer");
+ try {
+ createAnyRequiredOracleObjects(context.getContext(),
+ jobConfiguration.toJobConfig, linkConfiguration.connectionConfig);
+
+ if (!isSqoopTableAnOracleTable(connection,
+ linkConfiguration.connectionConfig.username, table)) {
+ throw new RuntimeException("Can only load data into Oracle tables.");
+ }
+ } catch(SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private void createAnyRequiredOracleObjects(MutableContext context,
+ ToJobConfig jobConfig, ConnectionConfig connectionConfig)
+ throws SQLException {
+
+ // The SYSDATE on the Oracle database will be used as the partition value
+ // for this export job...
+ Object sysDateTime = OracleQueries.getSysDate(connection);
+ String sysDateStr =
+ OracleQueries.oraDATEToString(sysDateTime, "yyyy-mm-dd hh24:mi:ss");
+ context.setString(OracleJdbcConnectorConstants.SQOOP_ORACLE_JOB_SYSDATE,
+ sysDateStr);
+
+ checkForOldOraOopTemporaryOracleTables(connection, sysDateTime,
+ OracleQueries.getCurrentSchema(connection));
+
+ // Store the actual partition value, so the N mappers know what value to
+ // insert...
+ String partitionValue =
+ OracleQueries.oraDATEToString(sysDateTime,
+ OracleJdbcConnectorConstants.ORAOOP_EXPORT_PARTITION_DATE_FORMAT);
+ context.setString(
+ OracleJdbcConnectorConstants.ORAOOP_EXPORT_PARTITION_DATE_VALUE,
+ partitionValue);
+
+ // Generate the (22 character) partition name...
+ String partitionName =
+ OracleUtilities
+ .createExportTablePartitionNameFromOracleTimestamp(sysDateTime);
+
+ //TODO: Number of mappers needs to be fixed
+ int numMappers = 8;
+
+ String exportTableTemplate = jobConfig.templateTable;
+
+ if(exportTableTemplate==null) {
+ exportTableTemplate = "";
+ }
+
+ String user = connectionConfig.username;
+ //TODO: This is from the other Oracle Manager
+ //if (user == null) {
+ // user = OracleManager.getSessionUser(connection);
+ //}
+
+ OracleTable templateTableContext =
+ OracleUtilities.decodeOracleTableName(user, exportTableTemplate);
+
+ boolean noLoggingOnNewTable = BooleanUtils.isTrue(jobConfig.nologging);
+
+ String updateKeyCol = jobConfig.updateKey;
+
+ /* =========================== */
+ /* VALIDATION OF INPUTS */
+ /* =========================== */
+
+ if (updateKeyCol == null || updateKeyCol.isEmpty()) {
+ // We're performing an "insert" export, not an "update" export.
+
+ // Check that the "oraoop.export.merge" property has not been specified,
+ // as this would be
+ // an invalid scenario...
+ if (OracleUtilities.getExportUpdateMode(jobConfig) == UpdateMode.Merge) {
+ throw new RuntimeException("The merge option can only be used if "
+ + "an update key is specified.");
+ }
+ }
+
+ if (OracleUtilities
+ .userWantsToCreatePartitionedExportTableFromTemplate(jobConfig)
+ || OracleUtilities
+ .userWantsToCreateNonPartitionedExportTableFromTemplate(jobConfig)) {
+
+ // OraOop will create the export table.
+
+ if (table.getName().length()
+ > OracleJdbcConnectorConstants.Oracle.MAX_IDENTIFIER_LENGTH) {
+ String msg =
+ String.format(
+ "The Oracle table name \"%s\" is longer than %d characters.\n"
+ + "Oracle will not allow a table with this name to be created.",
+ table.getName(),
+ OracleJdbcConnectorConstants.Oracle.MAX_IDENTIFIER_LENGTH);
+ throw new RuntimeException(msg);
+ }
+
+ if (updateKeyCol != null && !updateKeyCol.isEmpty()) {
+
+ // We're performing an "update" export, not an "insert" export.
+
+ // Check whether the user is attempting an "update" (i.e. a non-merge).
+ // If so, they're
+ // asking to only UPDATE rows in a (about to be created) (empty) table
+ // that contains no rows.
+ // This will be a waste of time, as we'd be attempting to perform UPDATE
+ // operations against a
+ // table with no rows in it...
+ UpdateMode updateMode = OracleUtilities.getExportUpdateMode(jobConfig);
+ if (updateMode == UpdateMode.Update) {
+ throw new RuntimeException(String.format(
+ "\n\nCombining the template table option with the merge "
+ + "option is nonsensical, as this would create an "
+ + "empty table and then perform "
+ + "a lot of work that results in a table containing no rows.\n"));
+ }
+ }
+
+ // Check that the specified template table actually exists and is a
+ // table...
+ String templateTableObjectType =
+ OracleQueries.getOracleObjectType(connection,
+ templateTableContext);
+ if (templateTableObjectType == null) {
+ throw new RuntimeException(String.format(
+ "The specified Oracle template table \"%s\" does not exist.",
+ templateTableContext.toString()));
+ }
+
+ if (!templateTableObjectType.equalsIgnoreCase(
+ OracleJdbcConnectorConstants.Oracle.OBJECT_TYPE_TABLE)) {
+ throw new RuntimeException(
+ String.format(
+ "The specified Oracle template table \"%s\" is not an "
+ + "Oracle table, it's a %s.",
+ templateTableContext.toString(), templateTableObjectType));
+ }
+
+ if (BooleanUtils.isTrue(jobConfig.dropTableIfExists)) {
+ OracleQueries.dropTable(connection, table);
+ }
+
+ // Check that there is no existing database object with the same name of
+ // the table to be created...
+ String newTableObjectType =
+ OracleQueries.getOracleObjectType(connection, table);
+ if (newTableObjectType != null) {
+ throw new RuntimeException(
+ String.format(
+ "%s cannot create a new Oracle table named %s as a \"%s\" "
+ + "with this name already exists.",
+ OracleJdbcConnectorConstants.CONNECTOR_NAME, table.toString(),
+ newTableObjectType));
+ }
+ } else {
+ // The export table already exists.
+
+ if (updateKeyCol != null && !updateKeyCol.isEmpty()) {
+
+ // We're performing an "update" export, not an "insert" export.
+
+ // Check that there exists an index on the export table on the
+ // update-key column(s).
+ // Without such an index, this export may perform like a real dog...
+ String[] updateKeyColumns =
+ OracleUtilities.getExportUpdateKeyColumnNames(jobConfig);
+ if (!OracleQueries.doesIndexOnColumnsExist(connection,
+ table, updateKeyColumns)) {
+ String msg = String.format(
+ "\n**************************************************************"
+ + "***************************************************************"
+ + "\n\tThe table %1$s does not have a valid index on "
+ + "the column(s) %2$s.\n"
+ + "\tAs a consequence, this export may take a long time to "
+ + "complete.\n"
+ + "\tIf performance is unacceptable, consider reattempting this "
+ + "job after creating an index "
+ + "on this table via the SQL...\n"
+ + "\t\tcreate index <index_name> on %1$s(%2$s);\n"
+ + "****************************************************************"
+ + "*************************************************************",
+ table.toString(),
+ OracleUtilities.stringArrayToCSV(updateKeyColumns));
+ LOG.warn(msg);
+ }
+ }
+ }
+
+ boolean createMapperTables = false;
+
+ if (updateKeyCol != null && !updateKeyCol.isEmpty()) {
+ createMapperTables = true;
+ }
+
+ if (OracleUtilities
+ .userWantsToCreatePartitionedExportTableFromTemplate(jobConfig)) {
+ /* ================================= */
+ /* CREATE A PARTITIONED TABLE */
+ /* ================================= */
+
+ // Create a new Oracle table using the specified template...
+
+ String[] subPartitionNames =
+ OracleUtilities.generateExportTableSubPartitionNames(numMappers,
+ sysDateTime);
+ // Create the export table from a template table...
+ String tableStorageClause =
+ OracleUtilities.getExportTableStorageClause(jobConfig);
+
+ OracleQueries.createExportTableFromTemplateWithPartitioning(
+ connection, table,
+ tableStorageClause, templateTableContext, noLoggingOnNewTable,
+ partitionName, sysDateTime, numMappers,
+ subPartitionNames);
+
+ createMapperTables = true;
+ } else if (OracleUtilities
+ .userWantsToCreateNonPartitionedExportTableFromTemplate(jobConfig)) {
+ /* ===================================== */
+ /* CREATE A NON-PARTITIONED TABLE */
+ /* ===================================== */
+ String tableStorageClause =
+ OracleUtilities.getExportTableStorageClause(jobConfig);
+
+ OracleQueries.createExportTableFromTemplate(connection,
+ table, tableStorageClause,
+ templateTableContext, noLoggingOnNewTable);
+ } else {
+ /* ===================================================== */
+ /* ADD ADDITIONAL PARTITIONS TO AN EXISTING TABLE */
+ /* ===================================================== */
+
+ // If the export table is partitioned, and the partitions were created by
+ // OraOop, then we need
+ // create additional partitions...
+
+ OracleTablePartitions tablePartitions =
+ OracleQueries.getPartitions(connection, table);
+ // Find any partition name starting with "ORAOOP_"...
+ OracleTablePartition oraOopPartition =
+ tablePartitions.findPartitionByRegEx("^"
+ + OracleJdbcConnectorConstants.
+ EXPORT_TABLE_PARTITION_NAME_PREFIX);
+
+ if (tablePartitions.size() > 0 && oraOopPartition == null) {
+
+ for (int idx = 0; idx < tablePartitions.size(); idx++) {
+ LOG.info(String.format(
+ "The Oracle table %s has a partition named \"%s\".",
+ table.toString(),
+ tablePartitions.get(idx).getName()));
+ }
+
+ LOG.warn(String.format(
+ "The Oracle table %s is partitioned.\n"
+ + "These partitions were not created by %s.",
+ table.toString(),
+ OracleJdbcConnectorConstants.CONNECTOR_NAME));
+ }
+
+ if (oraOopPartition != null) {
+
+ // Indicate in the configuration what's happening...
+ context.setBoolean(OracleJdbcConnectorConstants.
+ EXPORT_TABLE_HAS_SQOOP_PARTITIONS, true);
+
+ LOG.info(String.format(
+ "The Oracle table %s is partitioned.\n"
+ + "These partitions were created by %s, so "
+ + "additional partitions will now be created.\n"
+ + "The name of the new partition will be \"%s\".",
+ table.toString(), OracleJdbcConnectorConstants.
+ CONNECTOR_NAME, partitionName));
+
+ String[] subPartitionNames =
+ OracleUtilities.generateExportTableSubPartitionNames(numMappers,
+ sysDateTime);
+
+ // Add another partition (and N subpartitions) to this existing,
+ // partitioned export table...
+ OracleQueries.createMoreExportTablePartitions(connection,
+ table, partitionName,
+ sysDateTime, subPartitionNames);
+
+ createMapperTables = true;
+ }
+ }
+
+ if(createMapperTables) {
+ createUniqueMapperTable(sysDateTime, numMappers, jobConfig);
+ }
+ }
+
+ private void createUniqueMapperTable(Object sysDateTime,
+ int numMappers, ToJobConfig jobConfig)
+ throws SQLException {
+
+ // Mappers insert data into a unique table before either:
+ // - exchanging it into a subpartition of the 'real' export table; or
+ // - merging it into the 'real' export table.
+
+ for (int i=0; i<numMappers; i++) {
+ OracleTable mapperTable =
+ OracleUtilities.generateExportTableMapperTableName(i,
+ sysDateTime, null);
+
+ // If this mapper is being reattempted in response to a failure, we need
+ // to delete the
+ // temporary table created by the previous attempt...
+ OracleQueries.dropTable(connection, mapperTable);
+
+ String temporaryTableStorageClause =
+ OracleUtilities.getTemporaryTableStorageClause(jobConfig);
+
+ OracleQueries.createExportTableForMapper(connection,
+ mapperTable, temporaryTableStorageClause, table
+ , false); // <- addOraOopPartitionColumns
+
+ LOG.debug(String.format("Created temporary mapper table %s", mapperTable
+ .toString()));
+ }
+ }
+
+ private void checkForOldOraOopTemporaryOracleTables(Connection connection,
+ Object sysDateTime, String schema) {
+
+ try {
+
+ StringBuilder message = new StringBuilder();
+ message
+ .append(String.format(
+ "The following tables appear to be old temporary tables created by "
+ + "%s that have not been deleted.\n"
+ + "They are probably left over from jobs that encountered an error and "
+ + "could not clean up after themselves.\n"
+ + "You might want to drop these Oracle tables in order to reclaim "
+ + "Oracle storage space:\n",
+ OracleJdbcConnectorConstants.CONNECTOR_NAME));
+ boolean showMessage = false;
+
+ String generatedTableName =
+ OracleUtilities.generateExportTableMapperTableName(0, sysDateTime,
+ schema).getName();
+ generatedTableName = generatedTableName.replaceAll("[0-9]", "%");
+ generatedTableName =
+ OracleUtilities.replaceAll(generatedTableName, "%%", "%");
+ Date sysDate = OracleQueries.oraDATEToDate(sysDateTime);
+
+ List<OracleTable> tables =
+ OracleQueries.getTablesWithTableNameLike(connection, schema,
+ generatedTableName);
+
+ for (OracleTable oracleTable : tables) {
+ OracleUtilities.DecodedExportMapperTableName tableName =
+ OracleUtilities.decodeExportTableMapperTableName(oracleTable);
+ if (tableName != null) {
+ Date tableDate =
+ OracleQueries.oraDATEToDate(tableName.getTableDateTime());
+ double daysApart =
+ (sysDate.getTime() - tableDate.getTime()) / (1000 * 60 * 60 * 24);
+ if (daysApart > 1.0) {
+ showMessage = true;
+ message.append(String.format("\t%s\n", oracleTable.toString()));
+ }
+ }
+ }
+
+ if (showMessage) {
+ LOG.info(message.toString());
+ }
+ } catch (Exception ex) {
+ LOG.warn(String.format(
+ "%s was unable to check for the existance of old "
+ + "temporary Oracle tables.\n" + "Error:\n%s",
+ OracleJdbcConnectorConstants.CONNECTOR_NAME, ex.toString()));
+ }
+ }
+
+ private boolean isSqoopTableAnOracleTable(Connection connection,
+ String connectionUserName, OracleTable tableContext) {
+
+ String oracleObjectType;
+
+ try {
+
+ // Find the table via dba_tables...
+ OracleTable oracleTable =
+ OracleQueries.getTable(connection, tableContext.getSchema(),
+ tableContext.getName());
+ if (oracleTable != null) {
+ return true;
+ }
+
+ // If we could not find the table via dba_tables, then try and determine
+ // what type of database object the
+ // user was referring to. Perhaps they've specified the name of a view?...
+ oracleObjectType =
+ OracleQueries.getOracleObjectType(connection, tableContext);
+
+ if (oracleObjectType == null) {
+ LOG.info(String.format(
+ "%1$s will not process this Sqoop connection, "
+ + "as the Oracle user %2$s does not own a table named %3$s.\n"
+ + "\tPlease prefix the table name with the owner.\n "
+ + "\tNote: You may need to double-quote the owner and/or table name."
+ + "\n\tE.g. sqoop ... --username %4$s --table %2$s.%3$s\n",
+ OracleJdbcConnectorConstants.CONNECTOR_NAME, tableContext.getSchema(),
+ tableContext.getName(), connectionUserName));
+ return false;
+ }
+
+ } catch (SQLException ex) {
+ LOG.warn(String.format(
+ "Unable to determine the Oracle-type of the object named %s owned by "
+ + "%s.\nError:\n" + "%s", tableContext.getName(), tableContext
+ .getSchema(), ex.getMessage()));
+
+ // In the absence of conflicting information, let's assume the object is
+ // actually a table...
+ return true;
+ }
+
+ boolean result =
+ oracleObjectType.equalsIgnoreCase(
+ OracleJdbcConnectorConstants.Oracle.OBJECT_TYPE_TABLE);
+
+ if (!result) {
+ LOG.info(String.format("%s will not process this sqoop connection, "
+ + "as %s is not an Oracle table, it's a %s.",
+ OracleJdbcConnectorConstants.CONNECTOR_NAME, tableContext.toString(),
+ oracleObjectType));
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ConnectionConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ConnectionConfig.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ConnectionConfig.java
new file mode 100644
index 0000000..c355a77
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ConnectionConfig.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle.configuration;
+
+import org.apache.sqoop.connector.jdbc.oracle.util.OracleConnectionFactory;
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.validators.AbstractValidator;
+import org.apache.sqoop.validation.validators.StartsWith;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ *
+ */
+@ConfigClass(validators = {@Validator(ConnectionConfig.ConfigValidator.class)})
+public class ConnectionConfig {
+ @Input(size = 128, validators = {@Validator(value = StartsWith.class, strArg = "jdbc:")} )
+ public String connectionString;
+
+ @Input(size = 40)
+ public String username;
+
+ @Input(size = 40, sensitive = true)
+ public String password;
+
+ @Input
+ public Map<String, String> jdbcProperties;
+
+ @Input
+ public String timeZone;
+
+ @Input
+ public String actionName;
+
+ @Input
+ public Integer fetchSize;
+
+ @Input
+ public String initializationStatements;
+
+ @Input
+ public Boolean jdbcUrlVerbatim;
+
+ @Input
+ public String racServiceName;
+
+ public static class ConfigValidator extends AbstractValidator<ConnectionConfig> {
+ @Override
+ public void validate(ConnectionConfig linkConfig) {
+ // See if we can connect to the database
+ try {
+ OracleConnectionFactory.makeConnection(linkConfig);
+ } catch (SQLException e) {
+ addMessage(Status.WARNING, "Can't connect to the database with given credentials: " + e.getMessage());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfig.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfig.java
new file mode 100644
index 0000000..38c808f
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfig.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.validators.NotEmpty;
+
+/**
+ *
+ */
+@ConfigClass
+public class FromJobConfig {
+
+ @Input(size = 2000, validators = { @Validator(NotEmpty.class)})
+ public String tableName;
+
+ @Input
+ public Boolean consistentRead;
+
+ @Input
+ public Long consistentReadScn;
+
+ @Input(size = 2000)
+ public String partitionList;
+
+ @Input(size = 2000)
+ public String dataChunkMethod;
+
+ @Input(size = 2000)
+ public String dataChunkAllocationMethod;
+
+ @Input(size = 2000)
+ public String whereClauseLocation;
+
+ @Input
+ public Boolean omitLobColumns;
+
+ @Input
+ public String queryHint;
+
+ @Input(size = 2000)
+ public String conditions;
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfiguration.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfiguration.java
new file mode 100644
index 0000000..6a6c1aa
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/FromJobConfiguration.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Config;
+
+/**
+ *
+ */
+@ConfigurationClass
+public class FromJobConfiguration {
+ @Config public FromJobConfig fromJobConfig;
+
+ public FromJobConfiguration() {
+ fromJobConfig = new FromJobConfig();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/LinkConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/LinkConfiguration.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/LinkConfiguration.java
new file mode 100644
index 0000000..990343b
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/LinkConfiguration.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Config;
+
+/**
+ *
+ */
+@ConfigurationClass
+public class LinkConfiguration {
+
+ @Config public ConnectionConfig connectionConfig;
+
+ public LinkConfiguration() {
+ connectionConfig = new ConnectionConfig();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfig.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfig.java
new file mode 100644
index 0000000..939a87a
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfig.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.validators.NotEmpty;
+
+/**
+ *
+ */
+@ConfigClass
+public class ToJobConfig {
+
+ @Input(size = 2000, validators = { @Validator(NotEmpty.class)})
+ public String tableName;
+
+ @Input(size = 2000)
+ public String templateTable;
+
+ @Input
+ public Boolean partitioned;
+
+ @Input
+ public Boolean nologging;
+
+ @Input(size = 2000)
+ public String updateKey;
+
+ @Input
+ public Boolean updateMerge;
+
+ @Input
+ public Boolean dropTableIfExists;
+
+ @Input(size = 2000)
+ public String storageClause;
+
+ @Input(size = 2000)
+ public String temporaryStorageClause;
+
+ @Input(size = 2000)
+ public String appendValuesHint;
+
+ @Input
+ public Boolean parallel;
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfiguration.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfiguration.java
new file mode 100644
index 0000000..b34df1a
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/configuration/ToJobConfiguration.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.oracle.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Config;
+
+/**
+ *
+ */
+@ConfigurationClass
+public class ToJobConfiguration {
+ @Config public ToJobConfig toJobConfig;
+
+ public ToJobConfiguration() {
+ toJobConfig = new ToJobConfig();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/fa3c77b6/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleActiveInstance.java
----------------------------------------------------------------------
diff --git a/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleActiveInstance.java b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleActiveInstance.java
new file mode 100644
index 0000000..b46bce5
--- /dev/null
+++ b/connector/connector-oracle-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/oracle/util/OracleActiveInstance.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.jdbc.oracle.util;
+
+/**
+ * Wraps data from v$active_instances.
+ */
+public class OracleActiveInstance {
+
+ private String instanceName;
+ private String hostName;
+
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ public void setInstanceName(String newInstanceName) {
+ this.instanceName = newInstanceName;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String newHostName) {
+ this.hostName = newHostName;
+ }
+}