You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ch...@apache.org on 2013/02/15 08:49:26 UTC

git commit: SQOOP-785: Add splitter implementation for NUMERIC/DECIMAL type

Updated Branches:
  refs/heads/sqoop2 c8af63092 -> 641a9c856


SQOOP-785: Add splitter implementation for NUMERIC/DECIMAL type

(Jarcec Cecho via Cheolsoo Park)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/641a9c85
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/641a9c85
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/641a9c85

Branch: refs/heads/sqoop2
Commit: 641a9c856f2bfdcb3612a028d522969507c1e336
Parents: c8af630
Author: Cheolsoo Park <ch...@apache.org>
Authored: Thu Feb 14 23:48:42 2013 -0800
Committer: Cheolsoo Park <ch...@apache.org>
Committed: Thu Feb 14 23:48:42 2013 -0800

----------------------------------------------------------------------
 .../connector/jdbc/GenericJdbcConnectorError.java  |    7 ++-
 .../jdbc/GenericJdbcImportPartitioner.java         |   67 ++++++++++++++-
 .../connector/jdbc/TestImportPartitioner.java      |   46 ++++++++++-
 3 files changed, 117 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/641a9c85/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
index 1fcea5f..f2ac979 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
@@ -65,7 +65,12 @@ public enum GenericJdbcConnectorError implements ErrorCode {
   /** The table columns cannot be specified when
    *  the table sql is specified during export. */
   GENERIC_JDBC_CONNECTOR_0014("The table columns cannot be specified "
-      + "when the table sql is specified during export");
+      + "when the table sql is specified during export"),
+
+  /** Unsupported values in partition column */
+  GENERIC_JDBC_CONNECTOR_0015("Partition column contains unsupported values"),
+
+  ;
 
   private final String message;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/641a9c85/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
index d276c57..6d1a9fd 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
+import java.math.BigDecimal;
 import java.sql.Types;
 import java.util.LinkedList;
 import java.util.List;
@@ -30,6 +31,8 @@ import org.apache.sqoop.job.etl.PartitionerContext;
 
 public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
 
+  private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);
+
   private long numberPartitions;
   private String partitionColumnName;
   private int partitionColumnType;
@@ -61,7 +64,7 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
     case Types.NUMERIC:
     case Types.DECIMAL:
       // Decimal column
-      // TODO: Add partition function
+      return partitionNumericColumn();
 
     case Types.BIT:
     case Types.BOOLEAN:
@@ -163,6 +166,68 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
     return partitions;
   }
 
+  protected List<Partition> partitionNumericColumn() {
+    List<Partition> partitions = new LinkedList<Partition>();
+
+    // All null valeus will result in single partition
+    if (partitionMinValue == null && partitionMaxValue == null) {
+      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+      partition.setConditions(partitionColumnName + "IS NULL");
+      partitions.add(partition);
+      return partitions;
+    }
+
+    // Having one end in null is not supported
+    if (partitionMinValue == null || partitionMaxValue == null) {
+      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015);
+    }
+
+    BigDecimal minValue = new BigDecimal(partitionMinValue);
+    BigDecimal maxValue = new BigDecimal(partitionMaxValue);
+
+    // Get all the split points together.
+    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
+
+    BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions));
+    if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
+      splitSize = NUMERIC_MIN_INCREMENT;
+    }
+
+    BigDecimal curVal = minValue;
+
+    while (curVal.compareTo(maxValue) <= 0) {
+      splitPoints.add(curVal);
+      curVal = curVal.add(splitSize);
+    }
+
+    if (splitPoints.get(splitPoints.size() - 1).compareTo(maxValue) != 0 || splitPoints.size() == 1) {
+      splitPoints.remove(splitPoints.size() - 1);
+      splitPoints.add(maxValue);
+    }
+
+    // Turn the split points into a set of intervals.
+    BigDecimal start = splitPoints.get(0);
+    for (int i = 1; i < splitPoints.size(); i++) {
+      BigDecimal end = splitPoints.get(i);
+
+      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+      partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1));
+      partitions.add(partition);
+
+      start = end;
+    }
+
+    return partitions;
+  }
+
+  protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
+    try {
+      return numerator.divide(denominator);
+    } catch (ArithmeticException ae) {
+      return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
+    }
+  }
+
   protected String constructConditions(
       Object lowerBound, Object upperBound, boolean lastOne) {
     StringBuilder conditions = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/641a9c85/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
index 43eb1c2..3150e7c 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
+import java.math.BigDecimal;
 import java.sql.Types;
 import java.util.Iterator;
 import java.util.List;
@@ -178,7 +179,6 @@ public class TestImportPartitioner extends TestCase {
     context.setString(
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
         String.valueOf((double)(START + NUMBER_OF_ROWS - 1)));
-    context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "3");
 
     ConnectionConfiguration connConf = new ConnectionConfiguration();
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
@@ -194,6 +194,50 @@ public class TestImportPartitioner extends TestCase {
     });
   }
 
+  public void testNumericEvenPartition() throws Exception {
+    MutableContext context = new MutableMapContext();
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "ICOL");
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(START));
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1));
+
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    Partitioner partitioner = new GenericJdbcImportPartitioner();
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+
+    verifyResult(partitions, new String[] {
+        "-5 <= ICOL AND ICOL < -3",
+        "-3 <= ICOL AND ICOL < -1",
+        "-1 <= ICOL AND ICOL < 1",
+        "1 <= ICOL AND ICOL < 3",
+        "3 <= ICOL AND ICOL <= 5"
+    });
+  }
+
+  public void testNumericUnevenPartition() throws Exception {
+    MutableContext context = new MutableMapContext();
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, "DCOL");
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.NUMERIC));
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START)));
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START + NUMBER_OF_ROWS - 1)));
+
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    Partitioner partitioner = new GenericJdbcImportPartitioner();
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+
+    verifyResult(partitions, new String[]{
+      "-5 <= DCOL AND DCOL < -2",
+      "-2 <= DCOL AND DCOL < 1",
+      "1 <= DCOL AND DCOL <= 5"
+    });
+  }
+
   private void verifyResult(List<Partition> partitions,
       String[] expected) {
     assertEquals(expected.length, partitions.size());