You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/08/08 22:12:22 UTC
[4/5] SQOOP-1376: Sqoop2: From/To: Refactor connector interface
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
new file mode 100644
index 0000000..55d386b
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils;
+import org.apache.sqoop.job.Constants;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.utils.ClassUtils;
+
+public class GenericJdbcFromInitializer extends Initializer<ConnectionConfiguration, FromJobConfiguration> {
+
+ private static final Logger LOG =
+ Logger.getLogger(GenericJdbcFromInitializer.class);
+
+ private GenericJdbcExecutor executor;
+
+ @Override
+ public void initialize(InitializerContext context, ConnectionConfiguration connection, FromJobConfiguration job) {
+ configureJdbcProperties(context.getContext(), connection, job);
+ try {
+ configurePartitionProperties(context.getContext(), connection, job);
+ configureTableProperties(context.getContext(), connection, job);
+ } finally {
+ executor.close();
+ }
+ }
+
+ @Override
+ public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, FromJobConfiguration job) {
+ List<String> jars = new LinkedList<String>();
+
+ jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver));
+
+ return jars;
+ }
+
+ @Override
+ public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, FromJobConfiguration fromJobConfiguration) {
+ configureJdbcProperties(context.getContext(), connectionConfiguration, fromJobConfiguration);
+
+ String schemaName = fromJobConfiguration.table.tableName;
+ if(schemaName == null) {
+ schemaName = "Query";
+ }
+
+ Schema schema = new Schema(schemaName);
+
+ ResultSet rs = null;
+ ResultSetMetaData rsmt = null;
+ try {
+ rs = executor.executeQuery(
+ context.getString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL)
+ .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")
+ );
+
+ rsmt = rs.getMetaData();
+ for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
+ Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i));
+
+ String columnName = rsmt.getColumnName(i);
+ if (columnName == null || columnName.equals("")) {
+ columnName = rsmt.getColumnLabel(i);
+ if (null == columnName) {
+ columnName = "Column " + i;
+ }
+ }
+
+ column.setName(columnName);
+ schema.addColumn(column);
+ }
+
+ return schema;
+ } catch (SQLException e) {
+ throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
+ } finally {
+ if(rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ LOG.info("Ignoring exception while closing ResultSet", e);
+ }
+ }
+ }
+ }
+
+ private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, FromJobConfiguration jobConfig) {
+ String driver = connectionConfig.connection.jdbcDriver;
+ String url = connectionConfig.connection.connectionString;
+ String username = connectionConfig.connection.username;
+ String password = connectionConfig.connection.password;
+
+ assert driver != null;
+ assert url != null;
+
+ executor = new GenericJdbcExecutor(driver, url, username, password);
+ }
+
+ private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, FromJobConfiguration jobConfig) {
+ // ----- configure column name -----
+
+ String partitionColumnName = jobConfig.table.partitionColumn;
+
+ if (partitionColumnName == null) {
+ // if column is not specified by the user,
+ // find the primary key of the table (when there is a table).
+ String tableName = jobConfig.table.tableName;
+ if (tableName != null) {
+ partitionColumnName = executor.getPrimaryKey(tableName);
+ }
+ }
+
+ if (partitionColumnName != null) {
+ context.setString(
+ GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
+ partitionColumnName);
+
+ } else {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005);
+ }
+
+ // ----- configure column type, min value, and max value -----
+
+ String minMaxQuery = jobConfig.table.boundaryQuery;
+
+ if (minMaxQuery == null) {
+ StringBuilder builder = new StringBuilder();
+
+ String schemaName = jobConfig.table.schemaName;
+ String tableName = jobConfig.table.tableName;
+ String tableSql = jobConfig.table.sql;
+
+ if (tableName != null && tableSql != null) {
+ // when both table name and table sql are specified:
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
+
+ } else if (tableName != null) {
+ // when table name is specified:
+
+ // For databases that support schemas (IE: postgresql).
+ String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+ String column = partitionColumnName;
+ builder.append("SELECT MIN(");
+ builder.append(column);
+ builder.append("), MAX(");
+ builder.append(column);
+ builder.append(") FROM ");
+ builder.append(fullTableName);
+
+ } else if (tableSql != null) {
+ String column = executor.qualify(
+ partitionColumnName, GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
+ builder.append("SELECT MIN(");
+ builder.append(column);
+ builder.append("), MAX(");
+ builder.append(column);
+ builder.append(") FROM ");
+ builder.append("(");
+ builder.append(tableSql.replace(
+ GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1"));
+ builder.append(") ");
+ builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
+
+ } else {
+ // when neither are specified:
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
+ }
+
+ minMaxQuery = builder.toString();
+ }
+
+
+ LOG.debug("Using minMaxQuery: " + minMaxQuery);
+ ResultSet rs = executor.executeQuery(minMaxQuery);
+ try {
+ ResultSetMetaData rsmd = rs.getMetaData();
+ if (rsmd.getColumnCount() != 2) {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
+ }
+
+ rs.next();
+
+ int columnType = rsmd.getColumnType(1);
+ String min = rs.getString(1);
+ String max = rs.getString(2);
+
+ LOG.info("Boundaries: min=" + min + ", max=" + max + ", columnType=" + columnType);
+
+ context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, columnType);
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, min);
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, max);
+
+ } catch (SQLException e) {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006, e);
+ }
+ }
+
+ private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, FromJobConfiguration jobConfig) {
+ String dataSql;
+ String fieldNames;
+
+ String schemaName = jobConfig.table.schemaName;
+ String tableName = jobConfig.table.tableName;
+ String tableSql = jobConfig.table.sql;
+ String tableColumns = jobConfig.table.columns;
+
+ if (tableName != null && tableSql != null) {
+ // when both table name and table sql are specified:
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
+
+ } else if (tableName != null) {
+ // when table name is specified:
+
+ // For databases that support schemas (IE: postgresql).
+ String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+
+ if (tableColumns == null) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("SELECT * FROM ");
+ builder.append(fullTableName);
+ builder.append(" WHERE ");
+ builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
+ dataSql = builder.toString();
+
+ String[] queryColumns = executor.getQueryColumns(dataSql.replace(
+ GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
+ fieldNames = StringUtils.join(queryColumns, ',');
+
+ } else {
+ StringBuilder builder = new StringBuilder();
+ builder.append("SELECT ");
+ builder.append(tableColumns);
+ builder.append(" FROM ");
+ builder.append(fullTableName);
+ builder.append(" WHERE ");
+ builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
+ dataSql = builder.toString();
+
+ fieldNames = tableColumns;
+ }
+ } else if (tableSql != null) {
+ // when table sql is specified:
+
+ assert tableSql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
+
+ if (tableColumns == null) {
+ dataSql = tableSql;
+
+ String[] queryColumns = executor.getQueryColumns(dataSql.replace(
+ GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
+ fieldNames = StringUtils.join(queryColumns, ',');
+
+ } else {
+ String[] columns = StringUtils.split(tableColumns, ',');
+ StringBuilder builder = new StringBuilder();
+ builder.append("SELECT ");
+ builder.append(executor.qualify(
+ columns[0], GenericJdbcConnectorConstants.SUBQUERY_ALIAS));
+ for (int i = 1; i < columns.length; i++) {
+ builder.append(",");
+ builder.append(executor.qualify(
+ columns[i], GenericJdbcConnectorConstants.SUBQUERY_ALIAS));
+ }
+ builder.append(" FROM ");
+ builder.append("(");
+ builder.append(tableSql);
+ builder.append(") ");
+ builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
+ dataSql = builder.toString();
+
+ fieldNames = tableColumns;
+ }
+ } else {
+ // when neither are specified:
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
+ }
+
+ LOG.info("Using dataSql: " + dataSql);
+ LOG.info("Field names: " + fieldNames);
+
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL, dataSql);
+ context.setString(Constants.JOB_ETL_FIELD_NAMES, fieldNames);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
deleted file mode 100644
index 2cf07fe..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
-import org.apache.sqoop.job.etl.Destroyer;
-import org.apache.sqoop.job.etl.DestroyerContext;
-
-public class GenericJdbcImportDestroyer extends Destroyer<ConnectionConfiguration, ImportJobConfiguration> {
-
- private static final Logger LOG =
- Logger.getLogger(GenericJdbcImportDestroyer.class);
-
- @Override
- public void destroy(DestroyerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
- LOG.info("Running generic JDBC connector destroyer");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
deleted file mode 100644
index 3f9aa9b..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
-import org.apache.sqoop.job.etl.ExtractorContext;
-import org.apache.sqoop.job.etl.Extractor;
-
-public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration, GenericJdbcImportPartition> {
-
- public static final Logger LOG = Logger.getLogger(GenericJdbcImportExtractor.class);
-
- private long rowsRead = 0;
- @Override
- public void extract(ExtractorContext context, ConnectionConfiguration connection, ImportJobConfiguration job, GenericJdbcImportPartition partition) {
- String driver = connection.connection.jdbcDriver;
- String url = connection.connection.connectionString;
- String username = connection.connection.username;
- String password = connection.connection.password;
- GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
-
- String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL);
- String conditions = partition.getConditions();
- query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions);
- LOG.info("Using query: " + query);
-
- rowsRead = 0;
- ResultSet resultSet = executor.executeQuery(query);
-
- try {
- ResultSetMetaData metaData = resultSet.getMetaData();
- int column = metaData.getColumnCount();
- while (resultSet.next()) {
- Object[] array = new Object[column];
- for (int i = 0; i< column; i++) {
- array[i] = resultSet.getObject(i + 1) == null ? GenericJdbcConnectorConstants.SQL_NULL_VALUE
- : resultSet.getObject(i + 1);
- }
- context.getDataWriter().writeArrayRecord(array);
- rowsRead++;
- }
- } catch (SQLException e) {
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e);
-
- } finally {
- executor.close();
- }
- }
-
- @Override
- public long getRowsRead() {
- return rowsRead;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
deleted file mode 100644
index 96818ba..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.MutableContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
-import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils;
-import org.apache.sqoop.job.Constants;
-import org.apache.sqoop.job.etl.Initializer;
-import org.apache.sqoop.job.etl.InitializerContext;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.Column;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class GenericJdbcImportInitializer extends Initializer<ConnectionConfiguration, ImportJobConfiguration> {
-
- private static final Logger LOG =
- Logger.getLogger(GenericJdbcImportInitializer.class);
-
- private GenericJdbcExecutor executor;
-
- @Override
- public void initialize(InitializerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
- configureJdbcProperties(context.getContext(), connection, job);
- try {
- configurePartitionProperties(context.getContext(), connection, job);
- configureTableProperties(context.getContext(), connection, job);
- } finally {
- executor.close();
- }
- }
-
- @Override
- public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
- List<String> jars = new LinkedList<String>();
-
- jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver));
-
- return jars;
- }
-
- @Override
- public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ImportJobConfiguration importJobConfiguration) {
- configureJdbcProperties(context.getContext(), connectionConfiguration, importJobConfiguration);
-
- String schemaName = importJobConfiguration.table.tableName;
- if(schemaName == null) {
- schemaName = "Query";
- }
-
- Schema schema = new Schema(schemaName);
-
- ResultSet rs = null;
- ResultSetMetaData rsmt = null;
- try {
- rs = executor.executeQuery(
- context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)
- .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")
- );
-
- rsmt = rs.getMetaData();
- for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
- Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i));
-
- String columnName = rsmt.getColumnName(i);
- if (columnName == null || columnName.equals("")) {
- columnName = rsmt.getColumnLabel(i);
- if (null == columnName) {
- columnName = "Column " + i;
- }
- }
-
- column.setName(columnName);
- schema.addColumn(column);
- }
-
- return schema;
- } catch (SQLException e) {
- throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
- } finally {
- if(rs != null) {
- try {
- rs.close();
- } catch (SQLException e) {
- LOG.info("Ignoring exception while closing ResultSet", e);
- }
- }
- }
- }
-
- private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
- String driver = connectionConfig.connection.jdbcDriver;
- String url = connectionConfig.connection.connectionString;
- String username = connectionConfig.connection.username;
- String password = connectionConfig.connection.password;
-
- assert driver != null;
- assert url != null;
-
- executor = new GenericJdbcExecutor(driver, url, username, password);
- }
-
- private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
- // ----- configure column name -----
-
- String partitionColumnName = jobConfig.table.partitionColumn;
-
- if (partitionColumnName == null) {
- // if column is not specified by the user,
- // find the primary key of the table (when there is a table).
- String tableName = jobConfig.table.tableName;
- if (tableName != null) {
- partitionColumnName = executor.getPrimaryKey(tableName);
- }
- }
-
- if (partitionColumnName != null) {
- context.setString(
- GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
- partitionColumnName);
-
- } else {
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005);
- }
-
- // ----- configure column type, min value, and max value -----
-
- String minMaxQuery = jobConfig.table.boundaryQuery;
-
- if (minMaxQuery == null) {
- StringBuilder builder = new StringBuilder();
-
- String schemaName = jobConfig.table.schemaName;
- String tableName = jobConfig.table.tableName;
- String tableSql = jobConfig.table.sql;
-
- if (tableName != null && tableSql != null) {
- // when both table name and table sql are specified:
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
-
- } else if (tableName != null) {
- // when table name is specified:
-
- // For databases that support schemas (IE: postgresql).
- String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
-
- String column = partitionColumnName;
- builder.append("SELECT MIN(");
- builder.append(column);
- builder.append("), MAX(");
- builder.append(column);
- builder.append(") FROM ");
- builder.append(fullTableName);
-
- } else if (tableSql != null) {
- String column = executor.qualify(
- partitionColumnName, GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
- builder.append("SELECT MIN(");
- builder.append(column);
- builder.append("), MAX(");
- builder.append(column);
- builder.append(") FROM ");
- builder.append("(");
- builder.append(tableSql.replace(
- GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1"));
- builder.append(") ");
- builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
-
- } else {
- // when neither are specified:
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
- }
-
- minMaxQuery = builder.toString();
- }
-
-
- LOG.debug("Using minMaxQuery: " + minMaxQuery);
- ResultSet rs = executor.executeQuery(minMaxQuery);
- try {
- ResultSetMetaData rsmd = rs.getMetaData();
- if (rsmd.getColumnCount() != 2) {
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
- }
-
- rs.next();
-
- int columnType = rsmd.getColumnType(1);
- String min = rs.getString(1);
- String max = rs.getString(2);
-
- LOG.info("Boundaries: min=" + min + ", max=" + max + ", columnType=" + columnType);
-
- context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, columnType);
- context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, min);
- context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, max);
-
- } catch (SQLException e) {
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006, e);
- }
- }
-
- private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
- String dataSql;
- String fieldNames;
-
- String schemaName = jobConfig.table.schemaName;
- String tableName = jobConfig.table.tableName;
- String tableSql = jobConfig.table.sql;
- String tableColumns = jobConfig.table.columns;
-
- if (tableName != null && tableSql != null) {
- // when both table name and table sql are specified:
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
-
- } else if (tableName != null) {
- // when table name is specified:
-
- // For databases that support schemas (IE: postgresql).
- String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
-
- if (tableColumns == null) {
- StringBuilder builder = new StringBuilder();
- builder.append("SELECT * FROM ");
- builder.append(fullTableName);
- builder.append(" WHERE ");
- builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
- dataSql = builder.toString();
-
- String[] queryColumns = executor.getQueryColumns(dataSql.replace(
- GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
- fieldNames = StringUtils.join(queryColumns, ',');
-
- } else {
- StringBuilder builder = new StringBuilder();
- builder.append("SELECT ");
- builder.append(tableColumns);
- builder.append(" FROM ");
- builder.append(fullTableName);
- builder.append(" WHERE ");
- builder.append(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
- dataSql = builder.toString();
-
- fieldNames = tableColumns;
- }
- } else if (tableSql != null) {
- // when table sql is specified:
-
- assert tableSql.contains(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN);
-
- if (tableColumns == null) {
- dataSql = tableSql;
-
- String[] queryColumns = executor.getQueryColumns(dataSql.replace(
- GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));
- fieldNames = StringUtils.join(queryColumns, ',');
-
- } else {
- String[] columns = StringUtils.split(tableColumns, ',');
- StringBuilder builder = new StringBuilder();
- builder.append("SELECT ");
- builder.append(executor.qualify(
- columns[0], GenericJdbcConnectorConstants.SUBQUERY_ALIAS));
- for (int i = 1; i < columns.length; i++) {
- builder.append(",");
- builder.append(executor.qualify(
- columns[i], GenericJdbcConnectorConstants.SUBQUERY_ALIAS));
- }
- builder.append(" FROM ");
- builder.append("(");
- builder.append(tableSql);
- builder.append(") ");
- builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
- dataSql = builder.toString();
-
- fieldNames = tableColumns;
- }
- } else {
- // when neither are specified:
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
- }
-
- LOG.info("Using dataSql: " + dataSql);
- LOG.info("Field names: " + fieldNames);
-
- context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL, dataSql);
- context.setString(Constants.JOB_ETL_FIELD_NAMES, fieldNames);
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java
deleted file mode 100644
index 66ed556..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartition.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.sqoop.job.etl.Partition;
-
-public class GenericJdbcImportPartition extends Partition {
-
- private String conditions;
-
- public void setConditions(String conditions) {
- this.conditions = conditions;
- }
-
- public String getConditions() {
- return conditions;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- conditions = in.readUTF();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(conditions);
- }
-
- @Override
- public String toString() {
- return conditions;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
deleted file mode 100644
index d103223..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
+++ /dev/null
@@ -1,605 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
-import org.apache.sqoop.job.etl.Partition;
-import org.apache.sqoop.job.etl.Partitioner;
-import org.apache.sqoop.job.etl.PartitionerContext;
-
-public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
-
- private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);
-
-
- private long numberPartitions;
- private String partitionColumnName;
- private int partitionColumnType;
- private String partitionMinValue;
- private String partitionMaxValue;
- private Boolean partitionColumnNull;
-
- @Override
- public List<Partition> getPartitions(PartitionerContext context,ConnectionConfiguration connection, ImportJobConfiguration job) {
- List<Partition> partitions = new LinkedList<Partition>();
-
- numberPartitions = context.getMaxPartitions();
- partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME);
- partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1);
- partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
- partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);
-
- partitionColumnNull = job.table.partitionColumnNull;
- if (partitionColumnNull == null) {
- partitionColumnNull = false;
- }
-
- if (partitionMinValue == null && partitionMaxValue == null) {
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(partitionColumnName + " IS NULL");
- partitions.add(partition);
- return partitions;
- }
-
- if (partitionColumnNull) {
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(partitionColumnName + " IS NULL");
- partitions.add(partition);
- numberPartitions -= 1;
- }
-
- switch (partitionColumnType) {
- case Types.TINYINT:
- case Types.SMALLINT:
- case Types.INTEGER:
- case Types.BIGINT:
- // Integer column
- partitions.addAll(partitionIntegerColumn());
- break;
-
- case Types.REAL:
- case Types.FLOAT:
- case Types.DOUBLE:
- // Floating point column
- partitions.addAll(partitionFloatingPointColumn());
- break;
-
- case Types.NUMERIC:
- case Types.DECIMAL:
- // Decimal column
- partitions.addAll(partitionNumericColumn());
- break;
-
- case Types.BIT:
- case Types.BOOLEAN:
- // Boolean column
- return partitionBooleanColumn();
-
- case Types.DATE:
- case Types.TIME:
- case Types.TIMESTAMP:
- // Date time column
- partitions.addAll(partitionDateTimeColumn());
- break;
-
- case Types.CHAR:
- case Types.VARCHAR:
- case Types.LONGVARCHAR:
- // Text column
- partitions.addAll(partitionTextColumn());
- break;
-
- default:
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
- String.valueOf(partitionColumnType));
- }
-
- return partitions;
- }
-
- protected List<Partition> partitionDateTimeColumn() {
- List<Partition> partitions = new LinkedList<Partition>();
-
- long minDateValue = 0;
- long maxDateValue = 0;
- SimpleDateFormat sdf = null;
- switch(partitionColumnType) {
- case Types.DATE:
- sdf = new SimpleDateFormat("yyyy-MM-dd");
- minDateValue = Date.valueOf(partitionMinValue).getTime();
- maxDateValue = Date.valueOf(partitionMaxValue).getTime();
- break;
- case Types.TIME:
- sdf = new SimpleDateFormat("HH:mm:ss");
- minDateValue = Time.valueOf(partitionMinValue).getTime();
- maxDateValue = Time.valueOf(partitionMaxValue).getTime();
- break;
- case Types.TIMESTAMP:
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- minDateValue = Timestamp.valueOf(partitionMinValue).getTime();
- maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime();
- break;
- }
-
-
- minDateValue += TimeZone.getDefault().getOffset(minDateValue);
- maxDateValue += TimeZone.getDefault().getOffset(maxDateValue);
-
- sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-
- long interval = (maxDateValue - minDateValue) / numberPartitions;
- long remainder = (maxDateValue - minDateValue) % numberPartitions;
-
- if (interval == 0) {
- numberPartitions = (int)remainder;
- }
-
- long lowerBound;
- long upperBound = minDateValue;
-
- Object objLB = null;
- Object objUB = null;
-
- for (int i = 1; i < numberPartitions; i++) {
- lowerBound = upperBound;
- upperBound = lowerBound + interval;
- upperBound += (i <= remainder) ? 1 : 0;
-
- switch(partitionColumnType) {
- case Types.DATE:
- objLB = new Date(lowerBound);
- objUB = new Date(upperBound);
- break;
- case Types.TIME:
- objLB = new Time(lowerBound);
- objUB = new Time(upperBound);
-
- break;
- case Types.TIMESTAMP:
- objLB = new Timestamp(lowerBound);
- objUB = new Timestamp(upperBound);
- break;
- }
-
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(
- constructDateConditions(sdf, objLB, objUB, false));
- partitions.add(partition);
- }
-
- switch(partitionColumnType) {
- case Types.DATE:
- objLB = new Date(upperBound);
- objUB = new Date(maxDateValue);
- break;
- case Types.TIME:
- objLB = new Time(upperBound);
- objUB = new Time(maxDateValue);
- break;
- case Types.TIMESTAMP:
- objLB = new Timestamp(upperBound);
- objUB = new Timestamp(maxDateValue);
- break;
- }
-
-
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(
- constructDateConditions(sdf, objLB, objUB, true));
- partitions.add(partition);
- return partitions;
- }
-
- protected List<Partition> partitionTextColumn() {
- List<Partition> partitions = new LinkedList<Partition>();
-
- String minStringValue = null;
- String maxStringValue = null;
-
- // Remove common prefix if any as it does not affect outcome.
- int maxPrefixLen = Math.min(partitionMinValue.length(),
- partitionMaxValue.length());
- // Calculate common prefix length
- int cpLen = 0;
-
- for (cpLen = 0; cpLen < maxPrefixLen; cpLen++) {
- char c1 = partitionMinValue.charAt(cpLen);
- char c2 = partitionMaxValue.charAt(cpLen);
- if (c1 != c2) {
- break;
- }
- }
-
- // The common prefix has length 'sharedLen'. Extract it from both.
- String prefix = partitionMinValue.substring(0, cpLen);
- minStringValue = partitionMinValue.substring(cpLen);
- maxStringValue = partitionMaxValue.substring(cpLen);
-
- BigDecimal minStringBD = textToBigDecimal(minStringValue);
- BigDecimal maxStringBD = textToBigDecimal(maxStringValue);
-
- // Having one single value means that we can create only one single split
- if(minStringBD.equals(maxStringBD)) {
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(constructTextConditions(prefix, 0, 0,
- partitionMinValue, partitionMaxValue, true, true));
- partitions.add(partition);
- return partitions;
- }
-
- // Get all the split points together.
- List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
-
- BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD),
- new BigDecimal(numberPartitions));
- if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
- splitSize = NUMERIC_MIN_INCREMENT;
- }
-
- BigDecimal curVal = minStringBD;
-
- int parts = 0;
-
- while (curVal.compareTo(maxStringBD) <= 0 && parts < numberPartitions) {
- splitPoints.add(curVal);
- curVal = curVal.add(splitSize);
- // bigDecimalToText approximates to next comparison location.
- // Make sure we are still in range
- String text = bigDecimalToText(curVal);
- curVal = textToBigDecimal(text);
- ++parts;
- }
-
- if (splitPoints.size() == 0
- || splitPoints.get(0).compareTo(minStringBD) != 0) {
- splitPoints.add(0, minStringBD);
- }
-
- if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0
- || splitPoints.size() == 1) {
- splitPoints.add(maxStringBD);
- }
-
- // Turn the split points into a set of string intervals.
- BigDecimal start = splitPoints.get(0);
- for (int i = 1; i < splitPoints.size(); i++) {
- BigDecimal end = splitPoints.get(i);
-
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(constructTextConditions(prefix, start, end,
- partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1));
- partitions.add(partition);
-
- start = end;
- }
-
- return partitions;
- }
-
-
- protected List<Partition> partitionIntegerColumn() {
- List<Partition> partitions = new LinkedList<Partition>();
-
- long minValue = partitionMinValue == null ? Long.MIN_VALUE
- : Long.parseLong(partitionMinValue);
- long maxValue = Long.parseLong(partitionMaxValue);
-
- long interval = (maxValue - minValue) / numberPartitions;
- long remainder = (maxValue - minValue) % numberPartitions;
-
- if (interval == 0) {
- numberPartitions = (int)remainder;
- }
-
- long lowerBound;
- long upperBound = minValue;
- for (int i = 1; i < numberPartitions; i++) {
- lowerBound = upperBound;
- upperBound = lowerBound + interval;
- upperBound += (i <= remainder) ? 1 : 0;
-
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(
- constructConditions(lowerBound, upperBound, false));
- partitions.add(partition);
- }
-
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(
- constructConditions(upperBound, maxValue, true));
- partitions.add(partition);
-
- return partitions;
- }
-
- protected List<Partition> partitionFloatingPointColumn() {
- List<Partition> partitions = new LinkedList<Partition>();
-
-
- double minValue = partitionMinValue == null ? Double.MIN_VALUE
- : Double.parseDouble(partitionMinValue);
- double maxValue = Double.parseDouble(partitionMaxValue);
-
- double interval = (maxValue - minValue) / numberPartitions;
-
- double lowerBound;
- double upperBound = minValue;
- for (int i = 1; i < numberPartitions; i++) {
- lowerBound = upperBound;
- upperBound = lowerBound + interval;
-
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(
- constructConditions(lowerBound, upperBound, false));
- partitions.add(partition);
- }
-
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(
- constructConditions(upperBound, maxValue, true));
- partitions.add(partition);
-
- return partitions;
- }
-
- protected List<Partition> partitionNumericColumn() {
- List<Partition> partitions = new LinkedList<Partition>();
- // Having one end in null is not supported
- if (partitionMinValue == null || partitionMaxValue == null) {
- throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015);
- }
-
- BigDecimal minValue = new BigDecimal(partitionMinValue);
- BigDecimal maxValue = new BigDecimal(partitionMaxValue);
-
- // Having one single value means that we can create only one single split
- if(minValue.equals(maxValue)) {
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(constructConditions(minValue));
- partitions.add(partition);
- return partitions;
- }
-
- // Get all the split points together.
- List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
-
- BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions));
-
- if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
- splitSize = NUMERIC_MIN_INCREMENT;
- }
-
- BigDecimal curVal = minValue;
-
- while (curVal.compareTo(maxValue) <= 0) {
- splitPoints.add(curVal);
- curVal = curVal.add(splitSize);
- }
-
- if (splitPoints.get(splitPoints.size() - 1).compareTo(maxValue) != 0 || splitPoints.size() == 1) {
- splitPoints.remove(splitPoints.size() - 1);
- splitPoints.add(maxValue);
- }
-
- // Turn the split points into a set of intervals.
- BigDecimal start = splitPoints.get(0);
- for (int i = 1; i < splitPoints.size(); i++) {
- BigDecimal end = splitPoints.get(i);
-
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
- partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1));
- partitions.add(partition);
-
- start = end;
- }
-
- return partitions;
- }
-
- protected List<Partition> partitionBooleanColumn() {
- List<Partition> partitions = new LinkedList<Partition>();
-
-
- Boolean minValue = parseBooleanValue(partitionMinValue);
- Boolean maxValue = parseBooleanValue(partitionMaxValue);
-
- StringBuilder conditions = new StringBuilder();
-
- // Having one single value means that we can create only one single split
- if(minValue.equals(maxValue)) {
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-
- conditions.append(partitionColumnName).append(" = ")
- .append(maxValue);
- partition.setConditions(conditions.toString());
- partitions.add(partition);
- return partitions;
- }
-
- GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-
- if (partitionMinValue == null) {
- conditions = new StringBuilder();
- conditions.append(partitionColumnName).append(" IS NULL");
- partition.setConditions(conditions.toString());
- partitions.add(partition);
- }
- partition = new GenericJdbcImportPartition();
- conditions = new StringBuilder();
- conditions.append(partitionColumnName).append(" = TRUE");
- partition.setConditions(conditions.toString());
- partitions.add(partition);
- partition = new GenericJdbcImportPartition();
- conditions = new StringBuilder();
- conditions.append(partitionColumnName).append(" = FALSE");
- partition.setConditions(conditions.toString());
- partitions.add(partition);
- return partitions;
- }
-
- private Boolean parseBooleanValue(String value) {
- if (value == null) {
- return null;
- }
- if (value.equals("1")) {
- return Boolean.TRUE;
- } else if (value.equals("0")) {
- return Boolean.FALSE;
- } else {
- return Boolean.parseBoolean(value);
- }
- }
-
- protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
- try {
- return numerator.divide(denominator);
- } catch (ArithmeticException ae) {
- return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
- }
- }
-
- protected String constructConditions(
- Object lowerBound, Object upperBound, boolean lastOne) {
- StringBuilder conditions = new StringBuilder();
- conditions.append(lowerBound);
- conditions.append(" <= ");
- conditions.append(partitionColumnName);
- conditions.append(" AND ");
- conditions.append(partitionColumnName);
- conditions.append(lastOne ? " <= " : " < ");
- conditions.append(upperBound);
- return conditions.toString();
- }
-
- protected String constructConditions(Object value) {
- return new StringBuilder()
- .append(partitionColumnName)
- .append(" = ")
- .append(value)
- .toString()
- ;
- }
-
- protected String constructDateConditions(SimpleDateFormat sdf,
- Object lowerBound, Object upperBound, boolean lastOne) {
- StringBuilder conditions = new StringBuilder();
- conditions.append('\'').append(sdf.format((java.util.Date)lowerBound)).append('\'');
- conditions.append(" <= ");
- conditions.append(partitionColumnName);
- conditions.append(" AND ");
- conditions.append(partitionColumnName);
- conditions.append(lastOne ? " <= " : " < ");
- conditions.append('\'').append(sdf.format((java.util.Date)upperBound)).append('\'');
- return conditions.toString();
- }
-
- protected String constructTextConditions(String prefix, Object lowerBound, Object upperBound,
- String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) {
- StringBuilder conditions = new StringBuilder();
- String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound);
- String ubString = prefix + bigDecimalToText((BigDecimal)upperBound);
- conditions.append('\'').append(firstOne ? lowerStringBound : lbString).append('\'');
- conditions.append(" <= ");
- conditions.append(partitionColumnName);
- conditions.append(" AND ");
- conditions.append(partitionColumnName);
- conditions.append(lastOne ? " <= " : " < ");
- conditions.append('\'').append(lastOne ? upperStringBound : ubString).append('\'');
- return conditions.toString();
- }
-
- /**
- * Converts a string to a BigDecimal representation in Base 2^21 format.
- * The maximum Unicode code point value defined is 10FFFF. Although
- * not all database system support UTF16 and mostly we expect UCS2
- * characters only, for completeness, we assume that all the unicode
- * characters are supported.
- * Given a string 's' containing characters s_0, s_1,..s_n,
- * the string is interpreted as the number: 0.s_0 s_1 s_2 s_3 s_48)
- * This can be split and each split point can be converted back to
- * a string value for comparison purposes. The number of characters
- * is restricted to prevent repeating fractions and rounding errors
- * towards the higher fraction positions.
- */
- private static final BigDecimal UNITS_BASE = new BigDecimal(0x200000);
- private static final int MAX_CHARS_TO_CONVERT = 4;
-
- private BigDecimal textToBigDecimal(String str) {
- BigDecimal result = BigDecimal.ZERO;
- BigDecimal divisor = UNITS_BASE;
-
- int len = Math.min(str.length(), MAX_CHARS_TO_CONVERT);
-
- for (int n = 0; n < len; ) {
- int codePoint = str.codePointAt(n);
- n += Character.charCount(codePoint);
- BigDecimal val = divide(new BigDecimal(codePoint), divisor);
- result = result.add(val);
- divisor = divisor.multiply(UNITS_BASE);
- }
-
- return result;
- }
-
- private String bigDecimalToText(BigDecimal bd) {
- BigDecimal curVal = bd.stripTrailingZeros();
- StringBuilder sb = new StringBuilder();
-
- for (int n = 0; n < MAX_CHARS_TO_CONVERT; ++n) {
- curVal = curVal.multiply(UNITS_BASE);
- int cp = curVal.intValue();
- if (0 >= cp) {
- break;
- }
-
- if (!Character.isDefined(cp)) {
- int t_cp = Character.MAX_CODE_POINT < cp ? 1 : cp;
- // We are guaranteed to find at least one character
- while(!Character.isDefined(t_cp)) {
- ++t_cp;
- if (t_cp == cp) {
- break;
- }
- if (t_cp >= Character.MAX_CODE_POINT || t_cp <= 0) {
- t_cp = 1;
- }
- }
- cp = t_cp;
- }
- curVal = curVal.subtract(new BigDecimal(cp));
- sb.append(Character.toChars(cp));
- }
-
- return sb.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
new file mode 100644
index 0000000..7d583c5
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
+
+public class GenericJdbcLoader extends Loader<ConnectionConfiguration, ToJobConfiguration> {
+
+ public static final int DEFAULT_ROWS_PER_BATCH = 100;
+ public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100;
+ private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH;
+ private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
+
+ @Override
+ public void load(LoaderContext context, ConnectionConfiguration connection, ToJobConfiguration job) throws Exception{
+ String driver = connection.connection.jdbcDriver;
+ String url = connection.connection.connectionString;
+ String username = connection.connection.username;
+ String password = connection.connection.password;
+ GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
+ executor.setAutoCommit(false);
+
+ String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_TO_JDBC_DATA_SQL);
+ executor.beginBatch(sql);
+ try {
+ int numberOfRows = 0;
+ int numberOfBatches = 0;
+ Object[] array;
+
+ while ((array = context.getDataReader().readArrayRecord()) != null) {
+ numberOfRows++;
+ executor.addBatch(array);
+
+ if (numberOfRows == rowsPerBatch) {
+ numberOfBatches++;
+ if (numberOfBatches == batchesPerTransaction) {
+ executor.executeBatch(true);
+ numberOfBatches = 0;
+ } else {
+ executor.executeBatch(false);
+ }
+ numberOfRows = 0;
+ }
+ }
+
+ if (numberOfRows != 0 || numberOfBatches != 0) {
+ // execute and commit the remaining rows
+ executor.executeBatch(true);
+ }
+
+ executor.endBatch();
+
+ } finally {
+ executor.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
new file mode 100644
index 0000000..65400ef
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartition.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.sqoop.job.etl.Partition;
+
+public class GenericJdbcPartition extends Partition {
+
+ private String conditions;
+
+ public void setConditions(String conditions) {
+ this.conditions = conditions;
+ }
+
+ public String getConditions() {
+ return conditions;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ conditions = in.readUTF();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(conditions);
+ }
+
+ @Override
+ public String toString() {
+ return conditions;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
new file mode 100644
index 0000000..bf84445
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcPartitioner.java
@@ -0,0 +1,604 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.SimpleDateFormat;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
+
+public class GenericJdbcPartitioner extends Partitioner<ConnectionConfiguration, FromJobConfiguration> {
+
+ private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);
+
+
+ private long numberPartitions;
+ private String partitionColumnName;
+ private int partitionColumnType;
+ private String partitionMinValue;
+ private String partitionMaxValue;
+ private Boolean partitionColumnNull;
+
+ @Override
+ public List<Partition> getPartitions(PartitionerContext context,ConnectionConfiguration connection, FromJobConfiguration job) {
+ List<Partition> partitions = new LinkedList<Partition>();
+
+ numberPartitions = context.getMaxPartitions();
+ partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME);
+ partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1);
+ partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
+ partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);
+
+ partitionColumnNull = job.table.partitionColumnNull;
+ if (partitionColumnNull == null) {
+ partitionColumnNull = false;
+ }
+
+ if (partitionMinValue == null && partitionMaxValue == null) {
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+ partition.setConditions(partitionColumnName + " IS NULL");
+ partitions.add(partition);
+ return partitions;
+ }
+
+ if (partitionColumnNull) {
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+ partition.setConditions(partitionColumnName + " IS NULL");
+ partitions.add(partition);
+ numberPartitions -= 1;
+ }
+
+ switch (partitionColumnType) {
+ case Types.TINYINT:
+ case Types.SMALLINT:
+ case Types.INTEGER:
+ case Types.BIGINT:
+ // Integer column
+ partitions.addAll(partitionIntegerColumn());
+ break;
+
+ case Types.REAL:
+ case Types.FLOAT:
+ case Types.DOUBLE:
+ // Floating point column
+ partitions.addAll(partitionFloatingPointColumn());
+ break;
+
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ // Decimal column
+ partitions.addAll(partitionNumericColumn());
+ break;
+
+ case Types.BIT:
+ case Types.BOOLEAN:
+ // Boolean column
+ return partitionBooleanColumn();
+
+ case Types.DATE:
+ case Types.TIME:
+ case Types.TIMESTAMP:
+ // Date time column
+ partitions.addAll(partitionDateTimeColumn());
+ break;
+
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGVARCHAR:
+ // Text column
+ partitions.addAll(partitionTextColumn());
+ break;
+
+ default:
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
+ String.valueOf(partitionColumnType));
+ }
+
+ return partitions;
+ }
+
+ protected List<Partition> partitionDateTimeColumn() {
+ List<Partition> partitions = new LinkedList<Partition>();
+
+ long minDateValue = 0;
+ long maxDateValue = 0;
+ SimpleDateFormat sdf = null;
+ switch(partitionColumnType) {
+ case Types.DATE:
+ sdf = new SimpleDateFormat("yyyy-MM-dd");
+ minDateValue = Date.valueOf(partitionMinValue).getTime();
+ maxDateValue = Date.valueOf(partitionMaxValue).getTime();
+ break;
+ case Types.TIME:
+ sdf = new SimpleDateFormat("HH:mm:ss");
+ minDateValue = Time.valueOf(partitionMinValue).getTime();
+ maxDateValue = Time.valueOf(partitionMaxValue).getTime();
+ break;
+ case Types.TIMESTAMP:
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ minDateValue = Timestamp.valueOf(partitionMinValue).getTime();
+ maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime();
+ break;
+ }
+
+
+ minDateValue += TimeZone.getDefault().getOffset(minDateValue);
+ maxDateValue += TimeZone.getDefault().getOffset(maxDateValue);
+
+ sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+ long interval = (maxDateValue - minDateValue) / numberPartitions;
+ long remainder = (maxDateValue - minDateValue) % numberPartitions;
+
+ if (interval == 0) {
+ numberPartitions = (int)remainder;
+ }
+
+ long lowerBound;
+ long upperBound = minDateValue;
+
+ Object objLB = null;
+ Object objUB = null;
+
+ for (int i = 1; i < numberPartitions; i++) {
+ lowerBound = upperBound;
+ upperBound = lowerBound + interval;
+ upperBound += (i <= remainder) ? 1 : 0;
+
+ switch(partitionColumnType) {
+ case Types.DATE:
+ objLB = new Date(lowerBound);
+ objUB = new Date(upperBound);
+ break;
+ case Types.TIME:
+ objLB = new Time(lowerBound);
+ objUB = new Time(upperBound);
+
+ break;
+ case Types.TIMESTAMP:
+ objLB = new Timestamp(lowerBound);
+ objUB = new Timestamp(upperBound);
+ break;
+ }
+
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+ partition.setConditions(
+ constructDateConditions(sdf, objLB, objUB, false));
+ partitions.add(partition);
+ }
+
+ switch(partitionColumnType) {
+ case Types.DATE:
+ objLB = new Date(upperBound);
+ objUB = new Date(maxDateValue);
+ break;
+ case Types.TIME:
+ objLB = new Time(upperBound);
+ objUB = new Time(maxDateValue);
+ break;
+ case Types.TIMESTAMP:
+ objLB = new Timestamp(upperBound);
+ objUB = new Timestamp(maxDateValue);
+ break;
+ }
+
+
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+ partition.setConditions(
+ constructDateConditions(sdf, objLB, objUB, true));
+ partitions.add(partition);
+ return partitions;
+ }
+
+ protected List<Partition> partitionTextColumn() {
+ List<Partition> partitions = new LinkedList<Partition>();
+
+ String minStringValue = null;
+ String maxStringValue = null;
+
+ // Remove common prefix if any as it does not affect outcome.
+ int maxPrefixLen = Math.min(partitionMinValue.length(),
+ partitionMaxValue.length());
+ // Calculate common prefix length
+ int cpLen = 0;
+
+ for (cpLen = 0; cpLen < maxPrefixLen; cpLen++) {
+ char c1 = partitionMinValue.charAt(cpLen);
+ char c2 = partitionMaxValue.charAt(cpLen);
+ if (c1 != c2) {
+ break;
+ }
+ }
+
+ // The common prefix has length 'sharedLen'. Extract it from both.
+ String prefix = partitionMinValue.substring(0, cpLen);
+ minStringValue = partitionMinValue.substring(cpLen);
+ maxStringValue = partitionMaxValue.substring(cpLen);
+
+ BigDecimal minStringBD = textToBigDecimal(minStringValue);
+ BigDecimal maxStringBD = textToBigDecimal(maxStringValue);
+
+ // Having one single value means that we can create only one single split
+ if(minStringBD.equals(maxStringBD)) {
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+ partition.setConditions(constructTextConditions(prefix, 0, 0,
+ partitionMinValue, partitionMaxValue, true, true));
+ partitions.add(partition);
+ return partitions;
+ }
+
+ // Get all the split points together.
+ List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
+
+ BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD),
+ new BigDecimal(numberPartitions));
+ if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
+ splitSize = NUMERIC_MIN_INCREMENT;
+ }
+
+ BigDecimal curVal = minStringBD;
+
+ int parts = 0;
+
+ while (curVal.compareTo(maxStringBD) <= 0 && parts < numberPartitions) {
+ splitPoints.add(curVal);
+ curVal = curVal.add(splitSize);
+ // bigDecimalToText approximates to next comparison location.
+ // Make sure we are still in range
+ String text = bigDecimalToText(curVal);
+ curVal = textToBigDecimal(text);
+ ++parts;
+ }
+
+ if (splitPoints.size() == 0
+ || splitPoints.get(0).compareTo(minStringBD) != 0) {
+ splitPoints.add(0, minStringBD);
+ }
+
+ if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0
+ || splitPoints.size() == 1) {
+ splitPoints.add(maxStringBD);
+ }
+
+ // Turn the split points into a set of string intervals.
+ BigDecimal start = splitPoints.get(0);
+ for (int i = 1; i < splitPoints.size(); i++) {
+ BigDecimal end = splitPoints.get(i);
+
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+ partition.setConditions(constructTextConditions(prefix, start, end,
+ partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1));
+ partitions.add(partition);
+
+ start = end;
+ }
+
+ return partitions;
+ }
+
+
+ protected List<Partition> partitionIntegerColumn() {
+ List<Partition> partitions = new LinkedList<Partition>();
+
+ long minValue = partitionMinValue == null ? Long.MIN_VALUE
+ : Long.parseLong(partitionMinValue);
+ long maxValue = Long.parseLong(partitionMaxValue);
+
+ long interval = (maxValue - minValue) / numberPartitions;
+ long remainder = (maxValue - minValue) % numberPartitions;
+
+ if (interval == 0) {
+ numberPartitions = (int)remainder;
+ }
+
+ long lowerBound;
+ long upperBound = minValue;
+ for (int i = 1; i < numberPartitions; i++) {
+ lowerBound = upperBound;
+ upperBound = lowerBound + interval;
+ upperBound += (i <= remainder) ? 1 : 0;
+
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+ partition.setConditions(
+ constructConditions(lowerBound, upperBound, false));
+ partitions.add(partition);
+ }
+
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+ partition.setConditions(
+ constructConditions(upperBound, maxValue, true));
+ partitions.add(partition);
+
+ return partitions;
+ }
+
+ protected List<Partition> partitionFloatingPointColumn() {
+ List<Partition> partitions = new LinkedList<Partition>();
+
+
+ double minValue = partitionMinValue == null ? Double.MIN_VALUE
+ : Double.parseDouble(partitionMinValue);
+ double maxValue = Double.parseDouble(partitionMaxValue);
+
+ double interval = (maxValue - minValue) / numberPartitions;
+
+ double lowerBound;
+ double upperBound = minValue;
+ for (int i = 1; i < numberPartitions; i++) {
+ lowerBound = upperBound;
+ upperBound = lowerBound + interval;
+
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+ partition.setConditions(
+ constructConditions(lowerBound, upperBound, false));
+ partitions.add(partition);
+ }
+
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+ partition.setConditions(
+ constructConditions(upperBound, maxValue, true));
+ partitions.add(partition);
+
+ return partitions;
+ }
+
+ protected List<Partition> partitionNumericColumn() {
+ List<Partition> partitions = new LinkedList<Partition>();
+ // Having one end in null is not supported
+ if (partitionMinValue == null || partitionMaxValue == null) {
+ throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015);
+ }
+
+ BigDecimal minValue = new BigDecimal(partitionMinValue);
+ BigDecimal maxValue = new BigDecimal(partitionMaxValue);
+
+ // Having one single value means that we can create only one single split
+ if(minValue.equals(maxValue)) {
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+ partition.setConditions(constructConditions(minValue));
+ partitions.add(partition);
+ return partitions;
+ }
+
+ // Get all the split points together.
+ List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
+
+ BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions));
+
+ if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
+ splitSize = NUMERIC_MIN_INCREMENT;
+ }
+
+ BigDecimal curVal = minValue;
+
+ while (curVal.compareTo(maxValue) <= 0) {
+ splitPoints.add(curVal);
+ curVal = curVal.add(splitSize);
+ }
+
+ if (splitPoints.get(splitPoints.size() - 1).compareTo(maxValue) != 0 || splitPoints.size() == 1) {
+ splitPoints.remove(splitPoints.size() - 1);
+ splitPoints.add(maxValue);
+ }
+
+ // Turn the split points into a set of intervals.
+ BigDecimal start = splitPoints.get(0);
+ for (int i = 1; i < splitPoints.size(); i++) {
+ BigDecimal end = splitPoints.get(i);
+
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+ partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1));
+ partitions.add(partition);
+
+ start = end;
+ }
+
+ return partitions;
+ }
+
+ protected List<Partition> partitionBooleanColumn() {
+ List<Partition> partitions = new LinkedList<Partition>();
+
+
+ Boolean minValue = parseBooleanValue(partitionMinValue);
+ Boolean maxValue = parseBooleanValue(partitionMaxValue);
+
+ StringBuilder conditions = new StringBuilder();
+
+ // Having one single value means that we can create only one single split
+ if(minValue.equals(maxValue)) {
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+
+ conditions.append(partitionColumnName).append(" = ")
+ .append(maxValue);
+ partition.setConditions(conditions.toString());
+ partitions.add(partition);
+ return partitions;
+ }
+
+ GenericJdbcPartition partition = new GenericJdbcPartition();
+
+ if (partitionMinValue == null) {
+ conditions = new StringBuilder();
+ conditions.append(partitionColumnName).append(" IS NULL");
+ partition.setConditions(conditions.toString());
+ partitions.add(partition);
+ }
+ partition = new GenericJdbcPartition();
+ conditions = new StringBuilder();
+ conditions.append(partitionColumnName).append(" = TRUE");
+ partition.setConditions(conditions.toString());
+ partitions.add(partition);
+ partition = new GenericJdbcPartition();
+ conditions = new StringBuilder();
+ conditions.append(partitionColumnName).append(" = FALSE");
+ partition.setConditions(conditions.toString());
+ partitions.add(partition);
+ return partitions;
+ }
+
+ private Boolean parseBooleanValue(String value) {
+ if (value == null) {
+ return null;
+ }
+ if (value.equals("1")) {
+ return Boolean.TRUE;
+ } else if (value.equals("0")) {
+ return Boolean.FALSE;
+ } else {
+ return Boolean.parseBoolean(value);
+ }
+ }
+
+ protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
+ try {
+ return numerator.divide(denominator);
+ } catch (ArithmeticException ae) {
+ return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
+ }
+ }
+
+ protected String constructConditions(
+ Object lowerBound, Object upperBound, boolean lastOne) {
+ StringBuilder conditions = new StringBuilder();
+ conditions.append(lowerBound);
+ conditions.append(" <= ");
+ conditions.append(partitionColumnName);
+ conditions.append(" AND ");
+ conditions.append(partitionColumnName);
+ conditions.append(lastOne ? " <= " : " < ");
+ conditions.append(upperBound);
+ return conditions.toString();
+ }
+
+ protected String constructConditions(Object value) {
+ return new StringBuilder()
+ .append(partitionColumnName)
+ .append(" = ")
+ .append(value)
+ .toString()
+ ;
+ }
+
+ protected String constructDateConditions(SimpleDateFormat sdf,
+ Object lowerBound, Object upperBound, boolean lastOne) {
+ StringBuilder conditions = new StringBuilder();
+ conditions.append('\'').append(sdf.format((java.util.Date)lowerBound)).append('\'');
+ conditions.append(" <= ");
+ conditions.append(partitionColumnName);
+ conditions.append(" AND ");
+ conditions.append(partitionColumnName);
+ conditions.append(lastOne ? " <= " : " < ");
+ conditions.append('\'').append(sdf.format((java.util.Date)upperBound)).append('\'');
+ return conditions.toString();
+ }
+
+ protected String constructTextConditions(String prefix, Object lowerBound, Object upperBound,
+ String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) {
+ StringBuilder conditions = new StringBuilder();
+ String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound);
+ String ubString = prefix + bigDecimalToText((BigDecimal)upperBound);
+ conditions.append('\'').append(firstOne ? lowerStringBound : lbString).append('\'');
+ conditions.append(" <= ");
+ conditions.append(partitionColumnName);
+ conditions.append(" AND ");
+ conditions.append(partitionColumnName);
+ conditions.append(lastOne ? " <= " : " < ");
+ conditions.append('\'').append(lastOne ? upperStringBound : ubString).append('\'');
+ return conditions.toString();
+ }
+
+ /**
+ * Converts a string to a BigDecimal representation in Base 2^21 format.
+ * The maximum Unicode code point value defined is 10FFFF. Although
+ * not all database system support UTF16 and mostly we expect UCS2
+ * characters only, for completeness, we assume that all the unicode
+ * characters are supported.
+ * Given a string 's' containing characters s_0, s_1,..s_n,
+ * the string is interpreted as the number: 0.s_0 s_1 s_2 s_3 s_48)
+ * This can be split and each split point can be converted back to
+ * a string value for comparison purposes. The number of characters
+ * is restricted to prevent repeating fractions and rounding errors
+ * towards the higher fraction positions.
+ */
+ private static final BigDecimal UNITS_BASE = new BigDecimal(0x200000);
+ private static final int MAX_CHARS_TO_CONVERT = 4;
+
+ private BigDecimal textToBigDecimal(String str) {
+ BigDecimal result = BigDecimal.ZERO;
+ BigDecimal divisor = UNITS_BASE;
+
+ int len = Math.min(str.length(), MAX_CHARS_TO_CONVERT);
+
+ for (int n = 0; n < len; ) {
+ int codePoint = str.codePointAt(n);
+ n += Character.charCount(codePoint);
+ BigDecimal val = divide(new BigDecimal(codePoint), divisor);
+ result = result.add(val);
+ divisor = divisor.multiply(UNITS_BASE);
+ }
+
+ return result;
+ }
+
+ private String bigDecimalToText(BigDecimal bd) {
+ BigDecimal curVal = bd.stripTrailingZeros();
+ StringBuilder sb = new StringBuilder();
+
+ for (int n = 0; n < MAX_CHARS_TO_CONVERT; ++n) {
+ curVal = curVal.multiply(UNITS_BASE);
+ int cp = curVal.intValue();
+ if (0 >= cp) {
+ break;
+ }
+
+ if (!Character.isDefined(cp)) {
+ int t_cp = Character.MAX_CODE_POINT < cp ? 1 : cp;
+ // We are guaranteed to find at least one character
+ while(!Character.isDefined(t_cp)) {
+ ++t_cp;
+ if (t_cp == cp) {
+ break;
+ }
+ if (t_cp >= Character.MAX_CODE_POINT || t_cp <= 0) {
+ t_cp = 1;
+ }
+ }
+ cp = t_cp;
+ }
+ curVal = curVal.subtract(new BigDecimal(cp));
+ sb.append(Character.toChars(cp));
+ }
+
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToDestroyer.java
new file mode 100644
index 0000000..6be3e12
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToDestroyer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class GenericJdbcToDestroyer extends Destroyer<ConnectionConfiguration, ToJobConfiguration> {
+
+ private static final Logger LOG = Logger.getLogger(GenericJdbcToDestroyer.class);
+
+ @Override
+ public void destroy(DestroyerContext context, ConnectionConfiguration connection, ToJobConfiguration job) {
+ LOG.info("Running generic JDBC connector destroyer");
+
+ final String tableName = job.table.tableName;
+ final String stageTableName = job.table.stageTableName;
+ final boolean stageEnabled = stageTableName != null &&
+ stageTableName.length() > 0;
+ if(stageEnabled) {
+ moveDataToDestinationTable(connection,
+ context.isSuccess(), stageTableName, tableName);
+ }
+ }
+
+ private void moveDataToDestinationTable(ConnectionConfiguration connectorConf,
+ boolean success, String stageTableName, String tableName) {
+ GenericJdbcExecutor executor =
+ new GenericJdbcExecutor(connectorConf.connection.jdbcDriver,
+ connectorConf.connection.connectionString,
+ connectorConf.connection.username,
+ connectorConf.connection.password);
+
+ if(success) {
+ LOG.info("Job completed, transferring data from stage table to " +
+ "destination table.");
+ executor.migrateData(stageTableName, tableName);
+ } else {
+ LOG.warn("Job failed, clearing stage table.");
+ executor.deleteTableData(stageTableName);
+ }
+ }
+
+}