You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/06/13 22:40:23 UTC
git commit: SQOOP-1013: Sqoop2: Provide splitters for additional data
types to Generic JDBC Connector
Updated Branches:
refs/heads/sqoop2 6d1779a05 -> 84127409a
SQOOP-1013: Sqoop2: Provide splitters for additional data types to Generic JDBC Connector
(Venkat Ranganathan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/84127409
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/84127409
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/84127409
Branch: refs/heads/sqoop2
Commit: 84127409a64aeacc220f21566267cf775b60958f
Parents: 6d1779a
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Thu Jun 13 13:39:43 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Thu Jun 13 13:39:43 2013 -0700
----------------------------------------------------------------------
.../jdbc/GenericJdbcImportPartitioner.java | 338 +++++++++++++++++--
.../connector/jdbc/TestImportPartitioner.java | 184 ++++++++++
2 files changed, 499 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/84127409/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
index f80f30d..eef18f2 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
@@ -18,6 +18,9 @@
package org.apache.sqoop.connector.jdbc;
import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.sql.Types;
import java.util.LinkedList;
import java.util.List;
@@ -33,6 +36,7 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);
+
private long numberPartitions;
private String partitionColumnName;
private int partitionColumnType;
@@ -47,6 +51,14 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);
+ if (partitionMinValue == null && partitionMaxValue == null) {
+ List<Partition> partitions = new LinkedList<Partition>();
+ GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+ partition.setConditions(partitionColumnName + "IS NULL");
+ partitions.add(partition);
+ return partitions;
+ }
+
switch (partitionColumnType) {
case Types.TINYINT:
case Types.SMALLINT:
@@ -69,19 +81,19 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
case Types.BIT:
case Types.BOOLEAN:
// Boolean column
- // TODO: Add partition function
+ return partitionBooleanColumn();
case Types.DATE:
case Types.TIME:
case Types.TIMESTAMP:
// Date time column
- // TODO: Add partition function
+ return partitionDateTimeColumn();
case Types.CHAR:
case Types.VARCHAR:
case Types.LONGVARCHAR:
// Text column
- // TODO: Add partition function
+ return partitionTextColumn();
default:
throw new SqoopException(
@@ -89,18 +101,168 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
String.valueOf(partitionColumnType));
}
}
+ protected List<Partition> partitionDateTimeColumn() {
+ List<Partition> partitions = new LinkedList<Partition>();
- protected List<Partition> partitionIntegerColumn() {
+ long minDateValue = 0;
+ long maxDateValue = 0;
+
+ switch(partitionColumnType) {
+ case Types.DATE:
+ minDateValue = Date.valueOf(partitionMinValue).getTime();
+ maxDateValue = Date.valueOf(partitionMaxValue).getTime();
+ break;
+ case Types.TIME:
+ minDateValue = Time.valueOf(partitionMinValue).getTime();
+ maxDateValue = Time.valueOf(partitionMaxValue).getTime();
+ break;
+ case Types.TIMESTAMP:
+ minDateValue = Timestamp.valueOf(partitionMinValue).getTime();
+ maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime();
+ break;
+ }
+ long interval = (maxDateValue - minDateValue) / numberPartitions;
+ long remainder = (maxDateValue - minDateValue) % numberPartitions;
+
+ if (interval == 0) {
+ numberPartitions = (int)remainder;
+ }
+ long lowerBound;
+ long upperBound = minDateValue;
+
+ Object objLB = null;
+ Object objUB = null;
+
+ for (int i = 1; i < numberPartitions; i++) {
+ lowerBound = upperBound;
+ upperBound = lowerBound + interval;
+ upperBound += (i <= remainder) ? 1 : 0;
+
+ switch(partitionColumnType) {
+ case Types.DATE:
+ objLB = new Date(lowerBound);
+ objUB = new Date(upperBound);
+ break;
+ case Types.TIME:
+ objLB = new Time(lowerBound);
+ objUB = new Time(upperBound);
+ break;
+ case Types.TIMESTAMP:
+ objLB = new Timestamp(lowerBound);
+ objUB = new Timestamp(upperBound);
+ break;
+ }
+
+ GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+ partition.setConditions(
+ constructDateConditions(objLB, objUB, false));
+ partitions.add(partition);
+ }
+ switch(partitionColumnType) {
+ case Types.DATE:
+ objLB = new Date(upperBound);
+ objUB = new Date(maxDateValue);
+ break;
+ case Types.TIME:
+ objLB = new Time(upperBound);
+ objUB = new Time(maxDateValue);
+ break;
+ case Types.TIMESTAMP:
+ objLB = new Timestamp(upperBound);
+ objUB = new Timestamp(maxDateValue);
+ break;
+ }
+ GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+ partition.setConditions(
+ constructDateConditions(objLB, objUB, true));
+ partitions.add(partition);
+ return partitions;
+ }
+
+ protected List<Partition> partitionTextColumn() {
List<Partition> partitions = new LinkedList<Partition>();
- if (partitionMinValue == null && partitionMaxValue == null) {
+ String minStringValue = null;
+ String maxStringValue = null;
+
+ // Remove common prefix if any as it does not affect outcome.
+ int maxPrefixLen = Math.min(partitionMinValue.length(),
+ partitionMaxValue.length());
+ // Calculate common prefix length
+ int cpLen = 0;
+
+ for (cpLen = 0; cpLen < maxPrefixLen; cpLen++) {
+ char c1 = partitionMinValue.charAt(cpLen);
+ char c2 = partitionMaxValue.charAt(cpLen);
+ if (c1 != c2) {
+ break;
+ }
+ }
+
+ // The common prefix has length 'sharedLen'. Extract it from both.
+ String prefix = partitionMinValue.substring(0, cpLen);
+ minStringValue = partitionMinValue.substring(cpLen);
+ maxStringValue = partitionMaxValue.substring(cpLen);
+
+ BigDecimal minStringBD = textToBigDecimal(minStringValue);
+ BigDecimal maxStringBD = textToBigDecimal(maxStringValue);
+
+ // Having one single value means that we can create only one single split
+ if(minStringBD.equals(maxStringBD)) {
GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(partitionColumnName + "IS NULL");
+ partition.setConditions(constructTextConditions(prefix, maxStringBD));
partitions.add(partition);
return partitions;
}
- long minValue = Long.parseLong(partitionMinValue);
+ // Get all the split points together.
+ List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
+
+ BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD),
+ new BigDecimal(numberPartitions));
+ if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
+ splitSize = NUMERIC_MIN_INCREMENT;
+ }
+
+ BigDecimal curVal = minStringBD;
+
+ while (curVal.compareTo(maxStringBD) <= 0) {
+ splitPoints.add(curVal);
+ curVal = curVal.add(splitSize);
+ }
+
+ if (splitPoints.size() == 0
+ || splitPoints.get(0).compareTo(minStringBD) != 0) {
+ splitPoints.add(0, minStringBD);
+ }
+
+ if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0
+ || splitPoints.size() == 1) {
+ splitPoints.add(maxStringBD);
+ }
+
+ // Turn the split points into a set of string intervals.
+ BigDecimal start = splitPoints.get(0);
+ for (int i = 1; i < splitPoints.size(); i++) {
+ BigDecimal end = splitPoints.get(i);
+
+ GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+ partition.setConditions(constructTextConditions(prefix, start,
+ end, i == splitPoints.size() - 1));
+ partitions.add(partition);
+
+ start = end;
+ }
+
+ return partitions;
+ }
+
+
+ protected List<Partition> partitionIntegerColumn() {
+ List<Partition> partitions = new LinkedList<Partition>();
+
+ long minValue = partitionMinValue == null ? Long.MIN_VALUE
+ : Long.parseLong(partitionMinValue);
long maxValue = Long.parseLong(partitionMaxValue);
long interval = (maxValue - minValue) / numberPartitions;
@@ -134,14 +296,9 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
protected List<Partition> partitionFloatingPointColumn() {
List<Partition> partitions = new LinkedList<Partition>();
- if (partitionMinValue == null && partitionMaxValue == null) {
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(partitionColumnName + "IS NULL");
- partitions.add(partition);
- return partitions;
- }
- double minValue = Double.parseDouble(partitionMinValue);
+ double minValue = partitionMinValue == null ? Double.MIN_VALUE
+ : Double.parseDouble(partitionMinValue);
double maxValue = Double.parseDouble(partitionMaxValue);
double interval = (maxValue - minValue) / numberPartitions;
@@ -168,15 +325,6 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
protected List<Partition> partitionNumericColumn() {
List<Partition> partitions = new LinkedList<Partition>();
-
- // All null values will result in single partition
- if (partitionMinValue == null && partitionMaxValue == null) {
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(partitionColumnName + "IS NULL");
- partitions.add(partition);
- return partitions;
- }
-
// Having one end in null is not supported
if (partitionMinValue == null || partitionMaxValue == null) {
throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015);
@@ -190,12 +338,14 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
partition.setConditions(constructConditions(minValue));
partitions.add(partition);
+ return partitions;
}
// Get all the split points together.
List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions));
+
if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
splitSize = NUMERIC_MIN_INCREMENT;
}
@@ -227,6 +377,60 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
return partitions;
}
+ protected List<Partition> partitionBooleanColumn() {
+ List<Partition> partitions = new LinkedList<Partition>();
+
+
+ Boolean minValue = parseBooleanValue(partitionMinValue);
+ Boolean maxValue = parseBooleanValue(partitionMaxValue);
+
+ StringBuilder conditions = new StringBuilder();
+
+ // Having one single value means that we can create only one single split
+ if(minValue.equals(maxValue)) {
+ GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+
+ conditions.append(partitionColumnName).append(" = ")
+ .append(maxValue);
+ partition.setConditions(conditions.toString());
+ partitions.add(partition);
+ return partitions;
+ }
+
+ GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+
+ if (partitionMinValue == null) {
+ conditions = new StringBuilder();
+ conditions.append(partitionColumnName).append(" IS NULL");
+ partition.setConditions(conditions.toString());
+ partitions.add(partition);
+ }
+ partition = new GenericJdbcImportPartition();
+ conditions = new StringBuilder();
+ conditions.append(partitionColumnName).append(" = TRUE");
+ partition.setConditions(conditions.toString());
+ partitions.add(partition);
+ partition = new GenericJdbcImportPartition();
+ conditions = new StringBuilder();
+ conditions.append(partitionColumnName).append(" = FALSE");
+ partition.setConditions(conditions.toString());
+ partitions.add(partition);
+ return partitions;
+ }
+
+ private Boolean parseBooleanValue(String value) {
+ if (value == null) {
+ return null;
+ }
+ if (value.equals("1")) {
+ return Boolean.TRUE;
+ } else if (value.equals("0")) {
+ return Boolean.FALSE;
+ } else {
+ return Boolean.parseBoolean(value);
+ }
+ }
+
protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
try {
return numerator.divide(denominator);
@@ -256,4 +460,92 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
.toString()
;
}
+
+ protected String constructDateConditions(
+ Object lowerBound, Object upperBound, boolean lastOne) {
+ StringBuilder conditions = new StringBuilder();
+ conditions.append('\'').append(lowerBound.toString()).append('\'');
+ conditions.append(" <= ");
+ conditions.append(partitionColumnName);
+ conditions.append(" AND ");
+ conditions.append(partitionColumnName);
+ conditions.append(lastOne ? " <= " : " < ");
+ conditions.append('\'').append(upperBound.toString()).append('\'');
+ return conditions.toString();
+ }
+
+ protected String constructTextConditions(String prefix,
+ Object lowerBound, Object upperBound, boolean lastOne) {
+ StringBuilder conditions = new StringBuilder();
+ String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound);
+ String ubString = prefix + bigDecimalToText((BigDecimal)upperBound);
+ conditions.append('\'').append(lbString).append('\'');
+ conditions.append(" <= ");
+ conditions.append(partitionColumnName);
+ conditions.append(" AND ");
+ conditions.append(partitionColumnName);
+ conditions.append(lastOne ? " <= " : " < ");
+ conditions.append('\'').append(ubString).append('\'');
+ return conditions.toString();
+ }
+
+ protected String constructTextConditions(String prefix, Object value) {
+ return new StringBuilder()
+ .append(partitionColumnName)
+ .append(" = ").append('\'')
+ .append(prefix + bigDecimalToText((BigDecimal)value))
+ .append('\'').toString()
+ ;
+ }
+
+
+ /**
+ * Converts a string to a BigDecimal representation in Base 2^21 format.
+ * The maximum Unicode code point value defined is 10FFFF. Although
+ * not all database system support UTF16 and mostly we expect UCS2
+ * characters only, for completeness, we assume that all the unicode
+ * characters are supported.
+ * Given a string 's' containing characters s_0, s_1,..s_n,
+ * the string is interpreted as the number: 0.s_0 s_1 s_2 s_3 s_48)
+ * This can be split and each split point can be converted back to
+ * a string value for comparison purposes. The number of characters
+ * is restricted to prevent repeating fractions and rounding errors
+ * towards the higher fraction positions.
+ */
+ private static final BigDecimal UNITS_BASE = new BigDecimal(2097152);
+ private static final int MAX_CHARS_TO_CONVERT = 4;
+
+ private BigDecimal textToBigDecimal(String str) {
+ BigDecimal result = BigDecimal.ZERO;
+ BigDecimal divisor = UNITS_BASE;
+
+ int len = Math.min(str.length(), MAX_CHARS_TO_CONVERT);
+
+ for (int n = 0; n < len; ) {
+ int codePoint = str.codePointAt(n);
+ n += Character.charCount(codePoint);
+ BigDecimal val = divide(new BigDecimal(codePoint), divisor);
+ result = result.add(val);
+ divisor = divisor.multiply(UNITS_BASE);
+ }
+
+ return result;
+ }
+
+ private String bigDecimalToText(BigDecimal bd) {
+ BigDecimal curVal = bd.stripTrailingZeros();
+ StringBuilder sb = new StringBuilder();
+
+ for (int n = 0; n < MAX_CHARS_TO_CONVERT; ++n) {
+ curVal = curVal.multiply(UNITS_BASE);
+ int cp = curVal.intValue();
+ if (0 == cp) {
+ break;
+ }
+ curVal = curVal.subtract(new BigDecimal(cp));
+ sb.append(Character.toChars(cp));
+ }
+ return sb.toString();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/84127409/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
index ee314d0..522a515 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
@@ -18,6 +18,9 @@
package org.apache.sqoop.connector.jdbc;
import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.sql.Types;
import java.util.Iterator;
import java.util.List;
@@ -26,6 +29,7 @@ import junit.framework.TestCase;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.Constants;
@@ -257,6 +261,186 @@ public class TestImportPartitioner extends TestCase {
});
}
+
+ public void testDatePartition() throws Exception {
+ MutableContext context = new MutableMapContext();
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL");
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.DATE));
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
+ Date.valueOf("2013-01-01").toString());
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_MAXVALUE, Date.valueOf("2013-12-31").toString());
+
+
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+ Partitioner partitioner = new GenericJdbcImportPartitioner();
+ PartitionerContext partitionerContext = new PartitionerContext(context,
+ 3);
+ List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+ verifyResult(partitions, new String[]{
+ "'2013-01-01' <= DCOL AND DCOL < '2013-05-02'",
+ "'2013-05-02' <= DCOL AND DCOL < '2013-08-31'",
+ "'2013-08-31' <= DCOL AND DCOL <= '2013-12-31'",
+ });
+
+ }
+
+ public void testTimePartition() throws Exception {
+ MutableContext context = new MutableMapContext();
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TCOL");
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIME));
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
+ Time.valueOf("01:01:01").toString());
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
+ Time.valueOf("10:40:50").toString());
+
+
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+ Partitioner partitioner = new GenericJdbcImportPartitioner();
+ PartitionerContext partitionerContext = new PartitionerContext(context,
+ 3);
+ List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+ verifyResult(partitions, new String[]{
+ "'01:01:01' <= TCOL AND TCOL < '04:14:17'",
+ "'04:14:17' <= TCOL AND TCOL < '07:27:33'",
+ "'07:27:33' <= TCOL AND TCOL <= '10:40:50'",
+ });
+ }
+
+ public void testTimestampPartition() throws Exception {
+ MutableContext context = new MutableMapContext();
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TSCOL");
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIMESTAMP));
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
+ Timestamp.valueOf("2013-01-01 01:01:01.123").toString());
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
+ Timestamp.valueOf("2013-12-31 10:40:50.654").toString());
+
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+ Partitioner partitioner = new GenericJdbcImportPartitioner();
+ PartitionerContext partitionerContext = new PartitionerContext(context,
+ 3);
+ List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+ verifyResult(partitions, new String[]{
+ "'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 13:14:17.634'",
+ "'2013-05-02 13:14:17.634' <= TSCOL AND TSCOL < '2013-09-01 00:27:34.144'",
+ "'2013-09-01 00:27:34.144' <= TSCOL AND TSCOL <= '2013-12-31 10:40:50.654'",
+ });
+ }
+
+ public void testBooleanPartition() throws Exception {
+ MutableContext context = new MutableMapContext();
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "BCOL");
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.BOOLEAN));
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_MINVALUE, "0");
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_MAXVALUE, "1");
+
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+ Partitioner partitioner = new GenericJdbcImportPartitioner();
+ PartitionerContext partitionerContext = new PartitionerContext(context,
+ 3);
+ List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+ verifyResult(partitions, new String[]{
+ "BCOL = TRUE",
+ "BCOL = FALSE",
+ });
+ }
+
+ public void testVarcharPartition() throws Exception {
+ MutableContext context = new MutableMapContext();
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR));
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_MINVALUE, "A");
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Z");
+
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+ Partitioner partitioner = new GenericJdbcImportPartitioner();
+ PartitionerContext partitionerContext = new PartitionerContext(context,
+ 25);
+ List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+
+ verifyResult(partitions, new String[] {
+ "'A' <= VCCOL AND VCCOL < 'B'",
+ "'B' <= VCCOL AND VCCOL < 'C'",
+ "'C' <= VCCOL AND VCCOL < 'D'",
+ "'D' <= VCCOL AND VCCOL < 'E'",
+ "'E' <= VCCOL AND VCCOL < 'F'",
+ "'F' <= VCCOL AND VCCOL < 'G'",
+ "'G' <= VCCOL AND VCCOL < 'H'",
+ "'H' <= VCCOL AND VCCOL < 'I'",
+ "'I' <= VCCOL AND VCCOL < 'J'",
+ "'J' <= VCCOL AND VCCOL < 'K'",
+ "'K' <= VCCOL AND VCCOL < 'L'",
+ "'L' <= VCCOL AND VCCOL < 'M'",
+ "'M' <= VCCOL AND VCCOL < 'N'",
+ "'N' <= VCCOL AND VCCOL < 'O'",
+ "'O' <= VCCOL AND VCCOL < 'P'",
+ "'P' <= VCCOL AND VCCOL < 'Q'",
+ "'Q' <= VCCOL AND VCCOL < 'R'",
+ "'R' <= VCCOL AND VCCOL < 'S'",
+ "'S' <= VCCOL AND VCCOL < 'T'",
+ "'T' <= VCCOL AND VCCOL < 'U'",
+ "'U' <= VCCOL AND VCCOL < 'V'",
+ "'V' <= VCCOL AND VCCOL < 'W'",
+ "'W' <= VCCOL AND VCCOL < 'X'",
+ "'X' <= VCCOL AND VCCOL < 'Y'",
+ "'Y' <= VCCOL AND VCCOL <= 'Z'",
+ });
+ }
+
+ public void testVarcharPartitionWithCommonPrefix() throws Exception {
+ MutableContext context = new MutableMapContext();
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR));
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA");
+ context.setString(GenericJdbcConnectorConstants
+ .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAF");
+
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+ Partitioner partitioner = new GenericJdbcImportPartitioner();
+ PartitionerContext partitionerContext = new PartitionerContext(context,
+ 5);
+
+ List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+
+ verifyResult(partitions, new String[] {
+ "'AAA' <= VCCOL AND VCCOL < 'AAB'",
+ "'AAB' <= VCCOL AND VCCOL < 'AAC'",
+ "'AAC' <= VCCOL AND VCCOL < 'AAD'",
+ "'AAD' <= VCCOL AND VCCOL < 'AAE'",
+ "'AAE' <= VCCOL AND VCCOL <= 'AAF'",
+ });
+
+ }
+
private void verifyResult(List<Partition> partitions,
String[] expected) {
assertEquals(expected.length, partitions.size());