You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ch...@apache.org on 2013/02/15 08:49:26 UTC
git commit: SQOOP-785: Add splitter implementation for
NUMERIC/DECIMAL type
Updated Branches:
refs/heads/sqoop2 c8af63092 -> 641a9c856
SQOOP-785: Add splitter implementation for NUMERIC/DECIMAL type
(Jarcec Cecho via Cheolsoo Park)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/641a9c85
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/641a9c85
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/641a9c85
Branch: refs/heads/sqoop2
Commit: 641a9c856f2bfdcb3612a028d522969507c1e336
Parents: c8af630
Author: Cheolsoo Park <ch...@apache.org>
Authored: Thu Feb 14 23:48:42 2013 -0800
Committer: Cheolsoo Park <ch...@apache.org>
Committed: Thu Feb 14 23:48:42 2013 -0800
----------------------------------------------------------------------
.../connector/jdbc/GenericJdbcConnectorError.java | 7 ++-
.../jdbc/GenericJdbcImportPartitioner.java | 67 ++++++++++++++-
.../connector/jdbc/TestImportPartitioner.java | 46 ++++++++++-
3 files changed, 117 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/641a9c85/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
index 1fcea5f..f2ac979 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
@@ -65,7 +65,12 @@ public enum GenericJdbcConnectorError implements ErrorCode {
/** The table columns cannot be specified when
* the table sql is specified during export. */
GENERIC_JDBC_CONNECTOR_0014("The table columns cannot be specified "
- + "when the table sql is specified during export");
+ + "when the table sql is specified during export"),
+
+ /** Unsupported values in partition column */
+ GENERIC_JDBC_CONNECTOR_0015("Partition column contains unsupported values"),
+
+ ;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/641a9c85/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
index d276c57..6d1a9fd 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
@@ -17,6 +17,7 @@
*/
package org.apache.sqoop.connector.jdbc;
+import java.math.BigDecimal;
import java.sql.Types;
import java.util.LinkedList;
import java.util.List;
@@ -30,6 +31,8 @@ import org.apache.sqoop.job.etl.PartitionerContext;
public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
+ private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);
+
private long numberPartitions;
private String partitionColumnName;
private int partitionColumnType;
@@ -61,7 +64,7 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
case Types.NUMERIC:
case Types.DECIMAL:
// Decimal column
- // TODO: Add partition function
+ return partitionNumericColumn();
case Types.BIT:
case Types.BOOLEAN:
@@ -163,6 +166,68 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
return partitions;
}
+ protected List<Partition> partitionNumericColumn() {
+ List<Partition> partitions = new LinkedList<Partition>();
+
+ // All null valeus will result in single partition
+ if (partitionMinValue == null && partitionMaxValue == null) {
+ GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+ partition.setConditions(partitionColumnName + "IS NULL");
+ partitions.add(partition);
+ return partitions;
+ }
+
+ // Having one end in null is not supported
+ if (partitionMinValue == null || partitionMaxValue == null) {
+ throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015);
+ }
+
+ BigDecimal minValue = new BigDecimal(partitionMinValue);
+ BigDecimal maxValue = new BigDecimal(partitionMaxValue);
+
+ // Get all the split points together.
+ List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
+
+ BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions));
+ if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
+ splitSize = NUMERIC_MIN_INCREMENT;
+ }
+
+ BigDecimal curVal = minValue;
+
+ while (curVal.compareTo(maxValue) <= 0) {
+ splitPoints.add(curVal);
+ curVal = curVal.add(splitSize);
+ }
+
+ if (splitPoints.get(splitPoints.size() - 1).compareTo(maxValue) != 0 || splitPoints.size() == 1) {
+ splitPoints.remove(splitPoints.size() - 1);
+ splitPoints.add(maxValue);
+ }
+
+ // Turn the split points into a set of intervals.
+ BigDecimal start = splitPoints.get(0);
+ for (int i = 1; i < splitPoints.size(); i++) {
+ BigDecimal end = splitPoints.get(i);
+
+ GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+ partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1));
+ partitions.add(partition);
+
+ start = end;
+ }
+
+ return partitions;
+ }
+
+ protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
+ try {
+ return numerator.divide(denominator);
+ } catch (ArithmeticException ae) {
+ return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
+ }
+ }
+
protected String constructConditions(
Object lowerBound, Object upperBound, boolean lastOne) {
StringBuilder conditions = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/641a9c85/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
index 43eb1c2..3150e7c 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
@@ -17,6 +17,7 @@
*/
package org.apache.sqoop.connector.jdbc;
+import java.math.BigDecimal;
import java.sql.Types;
import java.util.Iterator;
import java.util.List;
@@ -178,7 +179,6 @@ public class TestImportPartitioner extends TestCase {
context.setString(
GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
String.valueOf((double)(START + NUMBER_OF_ROWS - 1)));
- context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "3");
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
@@ -194,6 +194,50 @@ public class TestImportPartitioner extends TestCase {
});
}
+ public void testNumericEvenPartition() throws Exception {
+ MutableContext context = new MutableMapContext();
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "ICOL");
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(START));
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1));
+
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+ Partitioner partitioner = new GenericJdbcImportPartitioner();
+ PartitionerContext partitionerContext = new PartitionerContext(context, 5);
+ List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+
+ verifyResult(partitions, new String[] {
+ "-5 <= ICOL AND ICOL < -3",
+ "-3 <= ICOL AND ICOL < -1",
+ "-1 <= ICOL AND ICOL < 1",
+ "1 <= ICOL AND ICOL < 3",
+ "3 <= ICOL AND ICOL <= 5"
+ });
+ }
+
+ public void testNumericUnevenPartition() throws Exception {
+ MutableContext context = new MutableMapContext();
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL");
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START)));
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START + NUMBER_OF_ROWS - 1)));
+
+ ConnectionConfiguration connConf = new ConnectionConfiguration();
+ ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+ Partitioner partitioner = new GenericJdbcImportPartitioner();
+ PartitionerContext partitionerContext = new PartitionerContext(context, 3);
+ List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+
+ verifyResult(partitions, new String[]{
+ "-5 <= DCOL AND DCOL < -2",
+ "-2 <= DCOL AND DCOL < 1",
+ "1 <= DCOL AND DCOL <= 5"
+ });
+ }
+
private void verifyResult(List<Partition> partitions,
String[] expected) {
assertEquals(expected.length, partitions.size());