You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/12/16 20:35:46 UTC
sqoop git commit: SQOOP-2727: Sqoop2: Use PreparedStatements for
constructing GenericJDBCPartitions
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 0ab7c05e1 -> 6e33d6afb
SQOOP-2727: Sqoop2: Use PreparedStatements for constructing GenericJDBCPartitions
(Abraham Fine via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/6e33d6af
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/6e33d6af
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/6e33d6af
Branch: refs/heads/sqoop2
Commit: 6e33d6afb43a2e6c2b6ac4efa001b3a0380dec6e
Parents: 0ab7c05
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Dec 16 20:35:17 2015 +0100
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Dec 16 20:35:17 2015 +0100
----------------------------------------------------------------------
.../sqoop/common/test/db/DatabaseProvider.java | 4 +-
.../connector/jdbc/GenericJdbcExecutor.java | 2 +-
.../connector/jdbc/GenericJdbcExtractor.java | 33 ++-
.../jdbc/GenericJdbcFromInitializer.java | 26 +-
.../connector/jdbc/GenericJdbcPartition.java | 217 ++++++++++++++-
.../connector/jdbc/GenericJdbcPartitioner.java | 270 ++++++++-----------
.../sqoop/connector/jdbc/TestExtractor.java | 58 ++--
.../jdbc/TestGenericJdbcPartition.java | 78 ++++++
.../sqoop/connector/jdbc/TestPartitioner.java | 68 ++++-
.../jdbc/generic/TableStagedRDBMSTest.java | 6 +
10 files changed, 544 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
index 3b0b568..afc5016 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/db/DatabaseProvider.java
@@ -371,9 +371,9 @@ abstract public class DatabaseProvider {
} else if (value instanceof Character) {
preparedStatement.setString(parameterIndex, value.toString());
} else if (value instanceof Timestamp) {
- preparedStatement.setString(parameterIndex, value.toString());
+ preparedStatement.setTimestamp(parameterIndex, (Timestamp) value);
} else if (value instanceof BigDecimal) {
- preparedStatement.setString(parameterIndex, value.toString());
+ preparedStatement.setBigDecimal(parameterIndex, (BigDecimal) value);
} else {
preparedStatement.setObject(parameterIndex, value);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index ff33a4b..7c943c2 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -50,7 +50,7 @@ import java.util.Set;
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings({
"SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING", "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE"})
-public class GenericJdbcExecutor {
+public class GenericJdbcExecutor implements AutoCloseable {
private static final Logger LOG = Logger.getLogger(GenericJdbcExecutor.class);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
index edb2754..d56abec 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
@@ -17,6 +17,7 @@
*/
package org.apache.sqoop.connector.jdbc;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -35,7 +36,8 @@ import org.joda.time.LocalDate;
import org.joda.time.LocalDateTime;
import org.joda.time.LocalTime;
-@edu.umd.cs.findbugs.annotations.SuppressWarnings("SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE")
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ {"SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING", "OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE"})
public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobConfiguration, GenericJdbcPartition> {
public static final Logger LOG = Logger.getLogger(GenericJdbcExtractor.class);
@@ -43,19 +45,14 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo
private long rowsRead = 0;
@Override
public void extract(ExtractorContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig, GenericJdbcPartition partition) {
- GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig);
-
- String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL);
- String conditions = partition.getConditions();
- query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions);
- LOG.info("Using query: " + query);
-
rowsRead = 0;
Schema schema = context.getSchema();
Column[] schemaColumns = schema.getColumnsArray();
- try (Statement statement = executor.getConnection().createStatement(
- ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- ResultSet resultSet = statement.executeQuery(query);) {
+ try (
+ GenericJdbcExecutor executor = new GenericJdbcExecutor(linkConfig);
+ PreparedStatement preparedStatement = createPreparedStatement(context, executor, partition);
+ ResultSet resultSet = preparedStatement.executeQuery()
+ ) {
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
if (schemaColumns.length != columnCount) {
@@ -96,8 +93,6 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo
throw new SqoopException(
GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e);
- } finally {
- executor.close();
}
}
@@ -106,4 +101,16 @@ public class GenericJdbcExtractor extends Extractor<LinkConfiguration, FromJobCo
return rowsRead;
}
+ private PreparedStatement createPreparedStatement(ExtractorContext context, GenericJdbcExecutor executor, GenericJdbcPartition partition) throws SQLException {
+ String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL);
+ String condition = partition.getCondition();
+ query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, condition);
+
+ LOG.info("Creating PreparedStatement with query: " + query);
+
+ PreparedStatement preparedStatement = executor.getConnection().prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ partition.addParamsToPreparedStatement(preparedStatement);
+
+ return preparedStatement;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
index 41be8bb..13715a0 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
@@ -259,10 +259,6 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
}
- // Boundaries for the job
- String min = rs.getString(1);
- String max = rs.getString(2);
-
// Type of the partition column
ResultSetMetaData rsmd = rs.getMetaData();
if (rsmd.getColumnCount() != 2) {
@@ -270,6 +266,28 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
}
int columnType = rsmd.getColumnType(1);
+ String min;
+ String max;
+ // Boundaries for the job
+ switch (columnType) {
+ case Types.DATE:
+ min = String.valueOf(rs.getDate(1).getTime());
+ max = String.valueOf(rs.getDate(2).getTime());
+ break;
+ case Types.TIMESTAMP:
+ min = String.valueOf(rs.getTimestamp(1).getTime());
+ max = String.valueOf(rs.getTimestamp(2).getTime());
+ break;
+ case Types.TIME:
+ min = String.valueOf(rs.getTime(1).getTime());
+ max = String.valueOf(rs.getTime(2).getTime());
+ break;
+ default:
+ min = rs.getString(1);
+ max = rs.getString(2);
+ }
+
+
LOG.info("Boundaries for the job: min=" + min + ", max=" + max + ", columnType=" + columnType);
context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, columnType);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
index 65400ef..924bbb2 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
@@ -20,34 +20,231 @@ package org.apache.sqoop.connector.jdbc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimeZone;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.error.code.GenericJdbcConnectorError;
import org.apache.sqoop.job.etl.Partition;
+@edu.umd.cs.findbugs.annotations.SuppressWarnings("DB_DUPLICATE_SWITCH_CLAUSES")
public class GenericJdbcPartition extends Partition {
- private String conditions;
+ private String condition;
+ private List<Integer> sqlTypes;
+ private List<Object> params;
- public void setConditions(String conditions) {
- this.conditions = conditions;
+
+ public GenericJdbcPartition() {
+ sqlTypes = new ArrayList<>();
+ params = new ArrayList<>();
+ }
+
+ public void setCondition(String condition) {
+ this.condition = condition;
+ }
+
+ public String getCondition() {
+ return condition;
+ }
+
+ public void addParamsToPreparedStatement(PreparedStatement ps) throws SQLException {
+ for (int i = 0; i < params.size(); i++) {
+ ps.setObject(i + 1, params.get(i));
+ }
}
- public String getConditions() {
- return conditions;
+ public void addParam(int sqlType, Object param) {
+ sqlTypes.add(sqlType);
+ params.add(param);
+ }
+
+
+ public List<Object> getParams() {
+ return params;
+ }
+
+ public List<Integer> getSqlTypes() {
+ return sqlTypes;
}
@Override
public void readFields(DataInput in) throws IOException {
- conditions = in.readUTF();
+ int numParams = in.readInt();
+ condition = in.readUTF();
+ for (int i = 0; i < numParams; i++) {
+ int type = in.readInt();
+ sqlTypes.add(type);
+ switch (type) {
+ case Types.TINYINT:
+ case Types.SMALLINT:
+ case Types.INTEGER:
+ case Types.BIGINT:
+ params.add(in.readLong());
+ break;
+ case Types.REAL:
+ case Types.FLOAT:
+ params.add(in.readDouble());
+ break;
+ case Types.DOUBLE:
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ params.add(new BigDecimal(in.readUTF()));
+ break;
+ case Types.BIT:
+ case Types.BOOLEAN:
+ params.add(in.readBoolean());
+ break;
+ case Types.DATE:
+ params.add(new java.sql.Date(in.readLong()));
+ break;
+ case Types.TIME:
+ params.add(new java.sql.Time(in.readLong()));
+ break;
+ case Types.TIMESTAMP:
+ params.add(new java.sql.Timestamp(in.readLong()));
+ break;
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGVARCHAR:
+ params.add(in.readUTF());
+ break;
+ default:
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
+ String.valueOf(type));
+ }
+ }
}
@Override
public void write(DataOutput out) throws IOException {
- out.writeUTF(conditions);
+ out.writeInt(sqlTypes.size());
+ out.writeUTF(condition);
+ for (int i = 0; i < sqlTypes.size(); i++) {
+ int type = sqlTypes.get(i);
+ out.writeInt(type);
+ switch (type) {
+ case Types.TINYINT:
+ case Types.SMALLINT:
+ case Types.INTEGER:
+ case Types.BIGINT:
+ out.writeLong((Long) params.get(i));
+ break;
+ case Types.REAL:
+ case Types.FLOAT:
+ out.writeDouble((Double) params.get(i));
+ break;
+ case Types.DOUBLE:
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ out.writeUTF(params.get(i).toString());
+ break;
+ case Types.BIT:
+ case Types.BOOLEAN:
+ out.writeBoolean((Boolean) params.get(i));
+ break;
+ case Types.DATE:
+ out.writeLong(((java.sql.Date) params.get(i)).getTime());
+ break;
+ case Types.TIME:
+ out.writeLong(((java.sql.Time) params.get(i)).getTime());
+ break;
+ case Types.TIMESTAMP:
+ out.writeLong(((java.sql.Timestamp) params.get(i)).getTime());
+ break;
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGVARCHAR:
+ out.writeUTF((String) params.get(i));
+ break;
+ default:
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
+ String.valueOf(type));
+ }
+ }
}
@Override
- public String toString() {
- return conditions;
+ public String toString(){
+ return asStringWithTimezone(TimeZone.getTimeZone("UTC"));
}
-}
+ public String asStringWithTimezone(TimeZone timeZone) {
+ DateFormat dateDateFormat = new SimpleDateFormat("yyyy-MM-dd");
+
+ DateFormat timeDateFormat = new SimpleDateFormat("HH:mm:ss");
+ timeDateFormat.setTimeZone(timeZone);
+
+ DateFormat timestampDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ timestampDateFormat.setTimeZone(timeZone);
+
+ String condition = getCondition();
+ for (int j = 0; j < getParams().size(); j++) {
+ Object param = getParams().get(j);
+ int sqlType = getSqlTypes().get(j);
+ switch (sqlType) {
+ case Types.TINYINT:
+ case Types.SMALLINT:
+ case Types.INTEGER:
+ case Types.BIGINT:
+ case Types.REAL:
+ case Types.FLOAT:
+ case Types.DOUBLE:
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ case Types.BIT:
+ case Types.BOOLEAN:
+ condition = condition.replaceFirst("\\?", param.toString());
+ break;
+ case Types.DATE:
+ condition = condition.replaceFirst("\\?", "'" + dateDateFormat.format(param) + "'");
+ break;
+ case Types.TIME:
+ condition = condition.replaceFirst("\\?", "'" + timeDateFormat.format(param) + "'");
+ break;
+ case Types.TIMESTAMP:
+ condition = condition.replaceFirst("\\?", "'" + timestampDateFormat.format(param) + "'");
+ break;
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGVARCHAR:
+ default:
+ condition = condition.replaceFirst("\\?", "'" + param.toString() + "'");
+ break;
+ }
+ }
+ return condition;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ GenericJdbcPartition that = (GenericJdbcPartition) o;
+
+ if (getCondition() != null ? !getCondition().equals(that.getCondition())
+ : that.getCondition() != null)
+ return false;
+ if (!getSqlTypes().equals(that.getSqlTypes())) return false;
+ return getParams().equals(that.getParams());
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getCondition() != null ? getCondition().hashCode() : 0;
+ result = 31 * result + getSqlTypes().hashCode();
+ result = 31 * result + getParams().hashCode();
+ return result;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
index 2a42ed4..77b486c 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
@@ -49,7 +49,7 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
@Override
public List<Partition> getPartitions(PartitionerContext context, LinkConfiguration linkConfig,
- FromJobConfiguration fromJobConfig) {
+ FromJobConfiguration fromJobConfig) {
List<Partition> partitions = new LinkedList<Partition>();
numberPartitions = context.getMaxPartitions();
@@ -65,14 +65,14 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
if (partitionMinValue == null && partitionMaxValue == null) {
GenericJdbcPartition partition = new GenericJdbcPartition();
- partition.setConditions(partitionColumnName + " IS NULL");
+ partition.setCondition(partitionColumnName + " IS NULL");
partitions.add(partition);
return partitions;
}
if (allowNullValueInPartitionColumn) {
GenericJdbcPartition partition = new GenericJdbcPartition();
- partition.setConditions(partitionColumnName + " IS NULL");
+ partition.setCondition(partitionColumnName + " IS NULL");
partitions.add(partition);
if (numberPartitions > 1) {
numberPartitions -= 1;
@@ -83,85 +83,60 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
}
switch (partitionColumnType) {
- case Types.TINYINT:
- case Types.SMALLINT:
- case Types.INTEGER:
- case Types.BIGINT:
- // Integer column
- partitions.addAll(partitionIntegerColumn());
- break;
-
- case Types.REAL:
- case Types.FLOAT:
- case Types.DOUBLE:
- // Floating point column
- partitions.addAll(partitionFloatingPointColumn());
- break;
-
- case Types.NUMERIC:
- case Types.DECIMAL:
- // Decimal column
- partitions.addAll(partitionNumericColumn());
- break;
-
- case Types.BIT:
- case Types.BOOLEAN:
- // Boolean column
- return partitionBooleanColumn();
-
- case Types.DATE:
- case Types.TIME:
- case Types.TIMESTAMP:
- // Date time column
- partitions.addAll(partitionDateTimeColumn());
- break;
-
- case Types.CHAR:
- case Types.VARCHAR:
- case Types.LONGVARCHAR:
- // Text column
- partitions.addAll(partitionTextColumn());
- break;
-
- default:
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
- String.valueOf(partitionColumnType));
- }
+ case Types.TINYINT:
+ case Types.SMALLINT:
+ case Types.INTEGER:
+ case Types.BIGINT:
+ // Integer column
+ partitions.addAll(partitionIntegerColumn(partitionColumnType));
+ break;
- return partitions;
- }
+ case Types.REAL:
+ case Types.FLOAT:
+ case Types.DOUBLE:
+ // Floating point column
+ partitions.addAll(partitionFloatingPointColumn(partitionColumnType));
+ break;
- protected List<Partition> partitionDateTimeColumn() {
- List<Partition> partitions = new LinkedList<Partition>();
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ // Decimal column
+ partitions.addAll(partitionNumericColumn(partitionColumnType));
+ break;
+
+ case Types.BIT:
+ case Types.BOOLEAN:
+ // Boolean column
+ return partitionBooleanColumn();
- long minDateValue = 0;
- long maxDateValue = 0;
- SimpleDateFormat sdf = null;
- switch(partitionColumnType) {
case Types.DATE:
- sdf = new SimpleDateFormat("yyyy-MM-dd");
- minDateValue = Date.valueOf(partitionMinValue).getTime();
- maxDateValue = Date.valueOf(partitionMaxValue).getTime();
- break;
case Types.TIME:
- sdf = new SimpleDateFormat("HH:mm:ss");
- minDateValue = Time.valueOf(partitionMinValue).getTime();
- maxDateValue = Time.valueOf(partitionMaxValue).getTime();
+ case Types.TIMESTAMP:
+ // Date time column
+ partitions.addAll(partitionDateTimeColumn());
break;
- // Here should be the type of Types.TIMESTAMP:
- default:
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- minDateValue = Timestamp.valueOf(partitionMinValue).getTime();
- maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime();
+
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGVARCHAR:
+ // Text column
+ partitions.addAll(partitionTextColumn());
break;
+
+ default:
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
+ String.valueOf(partitionColumnType));
}
+ return partitions;
+ }
- minDateValue += TimeZone.getDefault().getOffset(minDateValue);
- maxDateValue += TimeZone.getDefault().getOffset(maxDateValue);
+ protected List<Partition> partitionDateTimeColumn() {
+ List<Partition> partitions = new LinkedList<Partition>();
- sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+ long minDateValue = Long.parseLong(partitionMinValue);
+ long maxDateValue = Long.parseLong(partitionMaxValue);
long interval = (maxDateValue - minDateValue) / numberPartitions;
long remainder = (maxDateValue - minDateValue) % numberPartitions;
@@ -173,57 +148,41 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
long lowerBound;
long upperBound = minDateValue;
- Object objLB = null;
- Object objUB = null;
-
+ GenericJdbcPartition partition;
for (int i = 1; i < numberPartitions; i++) {
lowerBound = upperBound;
upperBound = lowerBound + interval;
upperBound += (i <= remainder) ? 1 : 0;
+ partition = new GenericJdbcPartition();
switch(partitionColumnType) {
case Types.DATE:
- objLB = new Date(lowerBound);
- objUB = new Date(upperBound);
+ constructDateConditions(partition, Types.DATE, new Date(lowerBound), new Date(upperBound), false);
break;
case Types.TIME:
- objLB = new Time(lowerBound);
- objUB = new Time(upperBound);
-
+ constructDateConditions(partition, Types.TIME, new Time(lowerBound), new Time(upperBound), false);
break;
// Here should be the type of Types.TIMESTAMP:
default:
- objLB = new Timestamp(lowerBound);
- objUB = new Timestamp(upperBound);
+ constructDateConditions(partition, Types.TIMESTAMP, new Timestamp(lowerBound), new Timestamp(upperBound), false);
break;
}
-
- GenericJdbcPartition partition = new GenericJdbcPartition();
- partition.setConditions(
- constructDateConditions(sdf, objLB, objUB, false));
partitions.add(partition);
}
+ partition = new GenericJdbcPartition();
switch(partitionColumnType) {
case Types.DATE:
- objLB = new Date(upperBound);
- objUB = new Date(maxDateValue);
+ constructDateConditions(partition, Types.DATE, new Date(upperBound), new Date(maxDateValue), true);
break;
case Types.TIME:
- objLB = new Time(upperBound);
- objUB = new Time(maxDateValue);
+ constructDateConditions(partition, Types.TIME, new Time(upperBound), new Time(maxDateValue), true);
break;
// Here should be the type of Types.TIMESTAMP:
default:
- objLB = new Timestamp(upperBound);
- objUB = new Timestamp(maxDateValue);
+ constructDateConditions(partition, Types.TIMESTAMP, new Timestamp(upperBound), new Timestamp(maxDateValue), true);
break;
}
-
-
- GenericJdbcPartition partition = new GenericJdbcPartition();
- partition.setConditions(
- constructDateConditions(sdf, objLB, objUB, true));
partitions.add(partition);
return partitions;
}
@@ -236,7 +195,7 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
// Remove common prefix if any as it does not affect outcome.
int maxPrefixLen = Math.min(partitionMinValue.length(),
- partitionMaxValue.length());
+ partitionMaxValue.length());
// Calculate common prefix length
int cpLen = 0;
@@ -259,8 +218,8 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
// Having one single value means that we can create only one single split
if(minStringBD.equals(maxStringBD)) {
GenericJdbcPartition partition = new GenericJdbcPartition();
- partition.setConditions(constructTextConditions(prefix, 0, 0,
- partitionMinValue, partitionMaxValue, true, true));
+ constructTextConditions(partition, prefix, 0, 0,
+ partitionMinValue, partitionMaxValue, true, true);
partitions.add(partition);
return partitions;
}
@@ -269,7 +228,7 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD),
- new BigDecimal(numberPartitions));
+ new BigDecimal(numberPartitions));
if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
splitSize = NUMERIC_MIN_INCREMENT;
}
@@ -289,12 +248,12 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
}
if (splitPoints.size() == 0
- || splitPoints.get(0).compareTo(minStringBD) != 0) {
+ || splitPoints.get(0).compareTo(minStringBD) != 0) {
splitPoints.add(0, minStringBD);
}
if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0
- || splitPoints.size() == 1) {
+ || splitPoints.size() == 1) {
splitPoints.add(maxStringBD);
}
@@ -304,8 +263,8 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
BigDecimal end = splitPoints.get(i);
GenericJdbcPartition partition = new GenericJdbcPartition();
- partition.setConditions(constructTextConditions(prefix, start, end,
- partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1));
+ constructTextConditions(partition, prefix, start, end,
+ partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1);
partitions.add(partition);
start = end;
@@ -315,7 +274,7 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
}
- protected List<Partition> partitionIntegerColumn() {
+ protected List<Partition> partitionIntegerColumn(int sqlType) {
List<Partition> partitions = new LinkedList<Partition>();
long minValue = partitionMinValue == null ? Long.MIN_VALUE
@@ -337,20 +296,18 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
upperBound += (i <= remainder) ? 1 : 0;
GenericJdbcPartition partition = new GenericJdbcPartition();
- partition.setConditions(
- constructConditions(lowerBound, upperBound, false));
+ constructConditions(partition, sqlType, lowerBound, upperBound, false);
partitions.add(partition);
}
GenericJdbcPartition partition = new GenericJdbcPartition();
- partition.setConditions(
- constructConditions(upperBound, maxValue, true));
+ constructConditions(partition, sqlType, upperBound, maxValue, true);
partitions.add(partition);
return partitions;
}
- protected List<Partition> partitionFloatingPointColumn() {
+ protected List<Partition> partitionFloatingPointColumn(int sqlType) {
List<Partition> partitions = new LinkedList<Partition>();
@@ -367,20 +324,18 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
upperBound = lowerBound + interval;
GenericJdbcPartition partition = new GenericJdbcPartition();
- partition.setConditions(
- constructConditions(lowerBound, upperBound, false));
+ constructConditions(partition, sqlType, lowerBound, upperBound, false);
partitions.add(partition);
}
GenericJdbcPartition partition = new GenericJdbcPartition();
- partition.setConditions(
- constructConditions(upperBound, maxValue, true));
+ constructConditions(partition, sqlType, upperBound, maxValue, true);
partitions.add(partition);
return partitions;
}
- protected List<Partition> partitionNumericColumn() {
+ protected List<Partition> partitionNumericColumn(int sqlType) {
List<Partition> partitions = new LinkedList<Partition>();
// Having one end in null is not supported
if (partitionMinValue == null || partitionMaxValue == null) {
@@ -393,7 +348,7 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
// Having one single value means that we can create only one single split
if(minValue.equals(maxValue)) {
GenericJdbcPartition partition = new GenericJdbcPartition();
- partition.setConditions(constructConditions(minValue));
+ constructConditions(partition, sqlType, minValue);
partitions.add(partition);
return partitions;
}
@@ -420,17 +375,20 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
}
// Turn the split points into a set of intervals.
- BigDecimal start = splitPoints.get(0);
- for (int i = 1; i < splitPoints.size(); i++) {
- BigDecimal end = splitPoints.get(i);
+ BigDecimal end = splitPoints.get(0);
+ for (int i = 0; i < numberPartitions - 1; i++) {
+ BigDecimal start = splitPoints.get(i);
+ end = splitPoints.get(i + 1);
GenericJdbcPartition partition = new GenericJdbcPartition();
- partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1));
+ constructConditions(partition, sqlType, start, end, false);
partitions.add(partition);
-
- start = end;
}
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+ constructConditions(partition, sqlType, end, maxValue, true);
+ partitions.add(partition);
+
return partitions;
}
@@ -447,9 +405,9 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
if(minValue.equals(maxValue)) {
GenericJdbcPartition partition = new GenericJdbcPartition();
- conditions.append(partitionColumnName).append(" = ")
- .append(maxValue);
- partition.setConditions(conditions.toString());
+ conditions.append(partitionColumnName).append(" = ?");
+ partition.setCondition(conditions.toString());
+ partition.addParam(Types.BOOLEAN, maxValue);
partitions.add(partition);
return partitions;
}
@@ -459,18 +417,18 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
if (partitionMinValue == null) {
conditions = new StringBuilder();
conditions.append(partitionColumnName).append(" IS NULL");
- partition.setConditions(conditions.toString());
+ partition.setCondition(conditions.toString());
partitions.add(partition);
}
partition = new GenericJdbcPartition();
conditions = new StringBuilder();
conditions.append(partitionColumnName).append(" = TRUE");
- partition.setConditions(conditions.toString());
+ partition.setCondition(conditions.toString());
partitions.add(partition);
partition = new GenericJdbcPartition();
conditions = new StringBuilder();
conditions.append(partitionColumnName).append(" = FALSE");
- partition.setConditions(conditions.toString());
+ partition.setCondition(conditions.toString());
partitions.add(partition);
return partitions;
}
@@ -493,54 +451,53 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
}
}
- protected String constructConditions(
- Object lowerBound, Object upperBound, boolean lastOne) {
+ protected void constructConditions(
+ GenericJdbcPartition partition, int sqlType, Object lowerBound, Object upperBound, boolean lastOne) {
StringBuilder conditions = new StringBuilder();
- conditions.append(lowerBound);
- conditions.append(" <= ");
+ partition.addParam(sqlType, lowerBound);
+ conditions.append("? <= ");
conditions.append(partitionColumnName);
conditions.append(" AND ");
conditions.append(partitionColumnName);
- conditions.append(lastOne ? " <= " : " < ");
- conditions.append(upperBound);
- return conditions.toString();
+ conditions.append(lastOne ? " <= ?" : " < ?");
+ partition.addParam(sqlType, upperBound);
+
+ partition.setCondition(conditions.toString());
}
- protected String constructConditions(Object value) {
- return new StringBuilder()
- .append(partitionColumnName)
- .append(" = ")
- .append(value)
- .toString()
- ;
+ protected void constructConditions(GenericJdbcPartition partition, int sqlType, Object value) {
+ String condition = partitionColumnName + " = ?";
+ partition.addParam(sqlType, value);
+ partition.setCondition(condition);
}
- protected String constructDateConditions(SimpleDateFormat sdf,
- Object lowerBound, Object upperBound, boolean lastOne) {
+ protected void constructDateConditions(GenericJdbcPartition partition, int dateType,
+ Object lowerBound, Object upperBound, boolean lastOne) {
StringBuilder conditions = new StringBuilder();
- conditions.append('\'').append(sdf.format((java.util.Date)lowerBound)).append('\'');
- conditions.append(" <= ");
+ partition.addParam(dateType, lowerBound);
+ conditions.append("? <= ");
conditions.append(partitionColumnName);
conditions.append(" AND ");
conditions.append(partitionColumnName);
- conditions.append(lastOne ? " <= " : " < ");
- conditions.append('\'').append(sdf.format((java.util.Date)upperBound)).append('\'');
- return conditions.toString();
+ conditions.append(lastOne ? " <= ?" : " < ?");
+ partition.setCondition(conditions.toString());
+ partition.addParam(dateType, upperBound);
}
- protected String constructTextConditions(String prefix, Object lowerBound, Object upperBound,
- String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) {
+ protected void constructTextConditions(GenericJdbcPartition partition, String prefix, Object lowerBound, Object upperBound,
+ String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) {
StringBuilder conditions = new StringBuilder();
String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound);
String ubString = prefix + bigDecimalToText((BigDecimal)upperBound);
- conditions.append('\'').append(firstOne ? lowerStringBound : lbString).append('\'');
- conditions.append(" <= ");
+ partition.addParam(Types.VARCHAR, firstOne ? lowerStringBound : lbString);
+ conditions.append("? <= ");
conditions.append(partitionColumnName);
conditions.append(" AND ");
conditions.append(partitionColumnName);
- conditions.append(lastOne ? " <= " : " < ");
- conditions.append('\'').append(lastOne ? upperStringBound : ubString).append('\'');
- return conditions.toString();
+ conditions.append(lastOne ? " <= ?" : " < ?");
+ partition.addParam(Types.VARCHAR, lastOne ? upperStringBound : ubString);
+
+ partition.setCondition(conditions.toString());
}
/**
@@ -607,5 +564,4 @@ public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJ
return sb.toString();
}
-
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
index 3b52128..9847d5e 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
@@ -36,6 +36,8 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.sql.Types;
+
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
@@ -54,7 +56,7 @@ public class TestExtractor {
public TestExtractor() {
tableName = getClass().getSimpleName().toUpperCase();
- nullDataTableName = getClass().getSimpleName().toUpperCase() + "NULL";
+ nullDataTableName = getClass().getSimpleName().toUpperCase() + "NULL";
}
@BeforeMethod(alwaysRun = true)
@@ -63,13 +65,13 @@ public class TestExtractor {
if (!executor.existTable(tableName)) {
executor.executeUpdate("CREATE TABLE "
- + executor.encloseIdentifier(tableName)
- + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
+ + executor.encloseIdentifier(tableName)
+ + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
int value = START + i;
String sql = "INSERT INTO " + executor.encloseIdentifier(tableName)
- + " VALUES(" + value + ", " + value + ", '" + value + "', '2004-10-19')";
+ + " VALUES(" + value + ", " + value + ", '" + value + "', '2004-10-19')";
executor.executeUpdate(sql);
}
}
@@ -93,7 +95,7 @@ public class TestExtractor {
FromJobConfiguration jobConfig = new FromJobConfiguration();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
- "SELECT * FROM " + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}");
+ "SELECT * FROM " + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}");
GenericJdbcPartition partition;
@@ -107,15 +109,21 @@ public class TestExtractor {
ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
partition = new GenericJdbcPartition();
- partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
+ partition.setCondition("? <= DCOL AND DCOL < ?");
+ partition.addParam(Types.NUMERIC, -50.0);
+ partition.addParam(Types.NUMERIC, -16.6666666666666665);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
partition = new GenericJdbcPartition();
- partition.setConditions("-16.6666666666666665 <= DCOL AND DCOL < 16.666666666666667");
+ partition.setCondition("? <= DCOL AND DCOL < ?");
+ partition.addParam(Types.NUMERIC, -16.6666666666666665);
+ partition.addParam(Types.NUMERIC, 16.6666666666666667);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
partition = new GenericJdbcPartition();
- partition.setConditions("16.666666666666667 <= DCOL AND DCOL <= 50.0");
+ partition.setCondition("? <= DCOL AND DCOL <= ?");
+ partition.addParam(Types.NUMERIC, 16.6666666666666667);
+ partition.addParam(Types.NUMERIC, 50.0);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
}
@@ -132,8 +140,8 @@ public class TestExtractor {
FromJobConfiguration jobConfig = new FromJobConfiguration();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
- "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL,SQOOP_SUBQUERY_ALIAS.DATECOL FROM " + "(SELECT * FROM "
- + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
+ "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL,SQOOP_SUBQUERY_ALIAS.DATECOL FROM " + "(SELECT * FROM "
+ + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
GenericJdbcPartition partition;
@@ -147,15 +155,21 @@ public class TestExtractor {
ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
partition = new GenericJdbcPartition();
- partition.setConditions("-50 <= ICOL AND ICOL < -16");
+ partition.setCondition("? <= ICOL AND ICOL < ?");
+ partition.addParam(Types.NUMERIC, -50);
+ partition.addParam(Types.NUMERIC, -16);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
partition = new GenericJdbcPartition();
- partition.setConditions("-16 <= ICOL AND ICOL < 17");
+ partition.setCondition("? <= ICOL AND ICOL < ?");
+ partition.addParam(Types.NUMERIC, -16);
+ partition.addParam(Types.NUMERIC, 17);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
partition = new GenericJdbcPartition();
- partition.setConditions("17 <= ICOL AND ICOL < 50");
+ partition.setCondition("? <= ICOL AND ICOL < ?");
+ partition.addParam(Types.NUMERIC, 17);
+ partition.addParam(Types.NUMERIC, 50);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
}
@@ -172,9 +186,9 @@ public class TestExtractor {
FromJobConfiguration jobConfig = new FromJobConfiguration();
context.setString(
- GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
- "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + "(SELECT * FROM "
- + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
+ GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
+ "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM " + "(SELECT * FROM "
+ + executor.encloseIdentifier(tableName) + " WHERE ${CONDITIONS}) SQOOP_SUBQUERY_ALIAS");
GenericJdbcPartition partition = new GenericJdbcPartition();
@@ -183,7 +197,9 @@ public class TestExtractor {
Schema schema = new Schema("TestIncorrectColumns");
ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
- partition.setConditions("-50 <= ICOL AND ICOL < -16");
+ partition.setCondition("? <= ICOL AND ICOL < ?");
+ partition.addParam(Types.NUMERIC, -50);
+ partition.addParam(Types.NUMERIC, -16);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
}
@@ -193,7 +209,7 @@ public class TestExtractor {
if (!executor.existTable(nullDataTableName)) {
executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(nullDataTableName)
- + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
+ + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE)");
for (int i = 0; i < NUMBER_OF_ROWS; i++) {
int value = i;
@@ -210,7 +226,7 @@ public class TestExtractor {
FromJobConfiguration jobConfig = new FromJobConfiguration();
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL,
- "SELECT * FROM " + executor.encloseIdentifier(nullDataTableName) + " WHERE ${CONDITIONS}");
+ "SELECT * FROM " + executor.encloseIdentifier(nullDataTableName) + " WHERE ${CONDITIONS}");
Extractor extractor = new GenericJdbcExtractor();
DummyNullDataWriter writer = new DummyNullDataWriter();
@@ -220,7 +236,9 @@ public class TestExtractor {
ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
GenericJdbcPartition partition = new GenericJdbcPartition();
- partition.setConditions("-50 <= ICOL AND ICOL < -16");
+ partition.setCondition("? <= ICOL AND ICOL < ?");
+ partition.addParam(Types.NUMERIC, -50);
+ partition.addParam(Types.NUMERIC, -16);
extractor.extract(extractorContext, linkConfig, jobConfig, partition);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcPartition.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcPartition.java
new file mode 100644
index 0000000..0a97939
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcPartition.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import static org.testng.Assert.*;
+
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.math.BigDecimal;
+import java.sql.Types;
+
+public class TestGenericJdbcPartition {
+
+ @Test
+ public void testSerialization() throws Exception {
+ GenericJdbcPartition expectedPartition = new GenericJdbcPartition();
+ expectedPartition.setCondition("TEST_CONDITION");
+
+ expectedPartition.addParam(Types.TINYINT, 1L);
+ expectedPartition.addParam(Types.SMALLINT, 2L);
+ expectedPartition.addParam(Types.INTEGER, 3L);
+ expectedPartition.addParam(Types.BIGINT, 4L);
+
+ expectedPartition.addParam(Types.REAL, 1.1);
+ expectedPartition.addParam(Types.FLOAT, 1.2);
+
+ expectedPartition.addParam(Types.DOUBLE, new BigDecimal(1.3));
+ expectedPartition.addParam(Types.NUMERIC, new BigDecimal(1.4));
+ expectedPartition.addParam(Types.DECIMAL, new BigDecimal(1.5));
+
+ expectedPartition.addParam(Types.BIT, true);
+ expectedPartition.addParam(Types.BOOLEAN, false);
+
+ expectedPartition.addParam(Types.DATE, java.sql.Date.valueOf("2015-12-14"));
+ expectedPartition.addParam(Types.TIME, java.sql.Time.valueOf("00:00:00"));
+ expectedPartition.addParam(Types.TIMESTAMP, java.sql.Timestamp.valueOf("2015-12-16 00:00:00"));
+
+ expectedPartition.addParam(Types.CHAR, "a");
+ expectedPartition.addParam(Types.VARCHAR, "aa");
+ expectedPartition.addParam(Types.LONGVARCHAR, "aaa");
+
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutput dataOutput = new DataOutputStream(byteArrayOutputStream);
+
+ expectedPartition.write(dataOutput);
+
+ GenericJdbcPartition actualPartition = new GenericJdbcPartition();
+
+ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
+ DataInput dataInput = new DataInputStream(byteArrayInputStream);
+
+ actualPartition.readFields(dataInput);
+
+ assertEquals(actualPartition, expectedPartition);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
index 3a767ab..b250f94 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
@@ -22,8 +22,12 @@ import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.TimeZone;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
@@ -249,6 +253,37 @@ public class TestPartitioner {
});
}
+ // We may round the quotient when calculating splitsize, this tests ensure we catch those values in the final partition
+ @Test
+ public void testNumericInaccurateSplit() throws Exception {
+ MutableContext context = new MutableMapContext();
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL");
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(1)));
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(13)));
+
+ LinkConfiguration linkConfig = new LinkConfiguration();
+ FromJobConfiguration jobConfig = new FromJobConfiguration();
+
+ Partitioner partitioner = new GenericJdbcPartitioner();
+ PartitionerContext partitionerContext = new PartitionerContext(context, 11, null, "test_user");
+ List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
+
+ verifyResult(partitions, new String[] {
+ "1 <= DCOL AND DCOL < 2",
+ "2 <= DCOL AND DCOL < 3",
+ "3 <= DCOL AND DCOL < 4",
+ "4 <= DCOL AND DCOL < 5",
+ "5 <= DCOL AND DCOL < 6",
+ "6 <= DCOL AND DCOL < 7",
+ "7 <= DCOL AND DCOL < 8",
+ "8 <= DCOL AND DCOL < 9",
+ "9 <= DCOL AND DCOL < 10",
+ "10 <= DCOL AND DCOL < 11",
+ "11 <= DCOL AND DCOL <= 13"
+ });
+ }
+
@Test
public void testNumericSinglePartition() throws Exception {
MutableContext context = new MutableMapContext();
@@ -276,10 +311,9 @@ public class TestPartitioner {
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.DATE));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
- Date.valueOf("2004-10-20").toString());
- context.setString(GenericJdbcConnectorConstants
- .CONNECTOR_JDBC_PARTITION_MAXVALUE, Date.valueOf("2013-10-17")
- .toString());
+ String.valueOf(Date.valueOf("2004-10-20").getTime()));
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
+ String.valueOf(Date.valueOf("2013-10-17").getTime()));
LinkConfiguration linkConfig = new LinkConfiguration();
@@ -300,15 +334,21 @@ public class TestPartitioner {
@Test
public void testTimePartition() throws Exception {
+ DateFormat timeDateFormat = new SimpleDateFormat("HH:mm:ss");
+ timeDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ Long startTime = timeDateFormat.parse("01:01:01").getTime();
+ Long endTime = timeDateFormat.parse("10:40:50").getTime();
+
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TCOL");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIME));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
- Time.valueOf("01:01:01").toString());
+ String.valueOf(startTime));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
- Time.valueOf("10:40:50").toString());
+ String.valueOf(endTime));
LinkConfiguration linkConfig = new LinkConfiguration();
@@ -327,15 +367,21 @@ public class TestPartitioner {
@Test
public void testTimestampPartition() throws Exception {
+ DateFormat timestampDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ timestampDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ Long startTime = timestampDateFormat.parse("2013-01-01 01:01:01.123").getTime();
+ Long endTime = timestampDateFormat.parse("2013-12-31 10:40:50.654").getTime();
+
MutableContext context = new MutableMapContext();
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TSCOL");
context.setString(GenericJdbcConnectorConstants
.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIMESTAMP));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
- Timestamp.valueOf("2013-01-01 01:01:01.123").toString());
+ String.valueOf(startTime));
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
- Timestamp.valueOf("2013-12-31 10:40:50.654").toString());
+ String.valueOf(endTime));
LinkConfiguration linkConfig = new LinkConfiguration();
FromJobConfiguration jobConfig = new FromJobConfiguration();
@@ -513,8 +559,8 @@ public class TestPartitioner {
Iterator<Partition> iterator = partitions.iterator();
for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i],
- ((GenericJdbcPartition)iterator.next()).getConditions());
+ GenericJdbcPartition partition = ((GenericJdbcPartition) iterator.next());
+ assertEquals(partition.toString(), expected[i]);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6e33d6af/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
index 2565c3f..70b6eff 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
@@ -89,4 +89,10 @@ public class TableStagedRDBMSTest extends ConnectorTestCase {
dropTable();
}
+ // shorter name for oracle
+ @Override
+ public TableName getTableName() {
+ return new TableName("stagedrdbms");
+ }
+
}