You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/03/16 18:14:54 UTC
sqoop git commit: SQOOP-1805: Sqoop2: GenericJdbcConnector: Delta
read support
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 3f618c917 -> e6519c76c
SQOOP-1805: Sqoop2: GenericJdbcConnector: Delta read support
(jarek Jarcec Cecho via Abraham Elmahrek)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e6519c76
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e6519c76
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e6519c76
Branch: refs/heads/sqoop2
Commit: e6519c76c673caaf2816b5e68183d1723d777e01
Parents: 3f618c9
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Mon Mar 16 10:13:05 2015 -0700
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Mon Mar 16 10:13:05 2015 -0700
----------------------------------------------------------------------
.../error/code/GenericJdbcConnectorError.java | 4 +
.../jdbc/GenericJdbcConnectorConstants.java | 2 +
.../connector/jdbc/GenericJdbcExecutor.java | 9 +
.../jdbc/GenericJdbcFromInitializer.java | 186 +++++++++++--------
.../configuration/FromJobConfiguration.java | 3 +
.../jdbc/configuration/IncrementalRead.java | 49 +++++
.../generic-jdbc-connector-config.properties | 14 +-
.../connector/jdbc/TestFromInitializer.java | 120 ++++++++++++
.../jdbc/TestGenericJdbcConnector.java | 77 ++++++++
.../jdbc/generic/IncrementalReadTest.java | 176 ++++++++++++++++++
10 files changed, 560 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
index 03bc104..f18acbd 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
@@ -85,6 +85,10 @@ public enum GenericJdbcConnectorError implements ErrorCode {
GENERIC_JDBC_CONNECTOR_0021("Schema column size do not match the result set column size"),
+ GENERIC_JDBC_CONNECTOR_0022("Can't find maximal value of column"),
+
+ GENERIC_JDBC_CONNECTOR_0023("Received error from the database"),
+
;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
index 4369071..dc86821 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
@@ -41,6 +41,8 @@ public final class GenericJdbcConnectorConstants {
PREFIX_CONNECTOR_JDBC_CONFIG + "partition.minvalue";
public static final String CONNECTOR_JDBC_PARTITION_MAXVALUE =
PREFIX_CONNECTOR_JDBC_CONFIG + "partition.maxvalue";
+ public static final String CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE =
+ PREFIX_CONNECTOR_JDBC_CONFIG + "incremental.last_value";
public static final String CONNECTOR_JDBC_FROM_DATA_SQL =
PREFIX_CONNECTOR_JDBC_CONFIG + "from.data.sql";
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index 7a01992..5af34a5 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -72,6 +72,15 @@ public class GenericJdbcExecutor {
}
}
+ public PreparedStatement createStatement(String sql) {
+ try {
+ return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ } catch (SQLException e) {
+ logSQLException(e);
+ throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
+ }
+ }
+
public void setAutoCommit(boolean autoCommit) {
try {
connection.setAutoCommit(autoCommit);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
index 1ecd152..6ad2cab 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
@@ -17,6 +17,7 @@
*/
package org.apache.sqoop.connector.jdbc;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -50,6 +51,8 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
try {
configurePartitionProperties(context.getContext(), linkConfig, fromJobConfig);
configureTableProperties(context.getContext(), linkConfig, fromJobConfig);
+ } catch(SQLException e) {
+ throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
} finally {
executor.close();
}
@@ -124,108 +127,141 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration, F
executor = new GenericJdbcExecutor(driver, url, username, password);
}
- private void configurePartitionProperties(MutableContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
- // ----- configure column name -----
-
- String partitionColumnName = fromJobConfig.fromJobConfig.partitionColumn;
-
- if (partitionColumnName == null) {
- // if column is not specified by the user,
- // find the primary key of the fromTable (when there is a fromTable).
- String tableName = fromJobConfig.fromJobConfig.tableName;
- if (tableName != null) {
- partitionColumnName = executor.getPrimaryKey(tableName);
- }
+ private void configurePartitionProperties(MutableContext context, LinkConfiguration linkConfig, FromJobConfiguration jobConf) throws SQLException {
+ // Assertions that should be valid (verified via validator)
+ assert (jobConf.fromJobConfig.tableName != null && jobConf.fromJobConfig.sql == null) ||
+ (jobConf.fromJobConfig.tableName == null && jobConf.fromJobConfig.sql != null);
+ assert (jobConf.fromJobConfig.boundaryQuery == null && jobConf.incrementalRead.checkColumn == null) ||
+ (jobConf.fromJobConfig.boundaryQuery != null && jobConf.incrementalRead.checkColumn == null) ||
+ (jobConf.fromJobConfig.boundaryQuery == null && jobConf.incrementalRead.checkColumn != null);
+
+ // We have few if/else conditions based on import type
+ boolean tableImport = jobConf.fromJobConfig.tableName != null;
+ boolean incrementalImport = jobConf.incrementalRead.checkColumn != null;
+
+ // For generating queries
+ StringBuilder sb = new StringBuilder();
+
+ // Partition column name
+ String partitionColumnName = jobConf.fromJobConfig.partitionColumn;
+ // If it's not specified, we can use primary key of given table (if it's table based import)
+ if (StringUtils.isBlank(partitionColumnName) && tableImport) {
+ partitionColumnName = executor.getPrimaryKey(jobConf.fromJobConfig.tableName);
}
-
+ // If we don't have partition column name, we will error out
if (partitionColumnName != null) {
- context.setString(
- GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
- partitionColumnName);
-
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME, partitionColumnName);
} else {
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005);
+ throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005);
}
+ LOG.info("Using partition column: " + partitionColumnName);
- // ----- configure column type, min value, and max value -----
-
- String minMaxQuery = fromJobConfig.fromJobConfig.boundaryQuery;
+ // From fragment for subsequent queries
+ String fromFragment;
+ if(tableImport) {
+ String tableName = jobConf.fromJobConfig.tableName;
+ String schemaName = jobConf.fromJobConfig.schemaName;
- if (minMaxQuery == null) {
- StringBuilder builder = new StringBuilder();
-
- String schemaName = fromJobConfig.fromJobConfig.schemaName;
- String tableName = fromJobConfig.fromJobConfig.tableName;
- String tableSql = fromJobConfig.fromJobConfig.sql;
+ fromFragment = executor.delimitIdentifier(tableName);
+ if(schemaName != null) {
+ fromFragment = executor.delimitIdentifier(schemaName) + "." + fromFragment;
+ }
+ } else {
+ sb.setLength(0);
+ sb.append("(");
+ sb.append(jobConf.fromJobConfig.sql.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1"));
+ sb.append(") ");
+ sb.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
+ fromFragment = sb.toString();
+ }
- if (tableName != null && tableSql != null) {
- // when both fromTable name and fromTable sql are specified:
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
+ // If this is incremental, then we need to get new maximal value and persist is a constant
+ String incrementalMaxValue = null;
+ if(incrementalImport) {
+ sb.setLength(0);
+ sb.append("SELECT ");
+ sb.append("MAX(").append(jobConf.incrementalRead.checkColumn).append(") ");
+ sb.append("FROM ");
+ sb.append(fromFragment);
- } else if (tableName != null) {
- // when fromTable name is specified:
+ String incrementalNewMaxValueQuery = sb.toString();
+ LOG.info("Incremental new max value query: " + incrementalNewMaxValueQuery);
- // For databases that support schemas (IE: postgresql).
- String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName) : executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+ ResultSet rs = null;
+ try {
+ rs = executor.executeQuery(incrementalNewMaxValueQuery);
- String column = partitionColumnName;
- builder.append("SELECT MIN(");
- builder.append(column);
- builder.append("), MAX(");
- builder.append(column);
- builder.append(") FROM ");
- builder.append(fullTableName);
+ if (!rs.next()) {
+ throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0022);
+ }
- } else if (tableSql != null) {
- String column = executor.qualify(
- partitionColumnName, GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
- builder.append("SELECT MIN(");
- builder.append(column);
- builder.append("), MAX(");
- builder.append(column);
- builder.append(") FROM ");
- builder.append("(");
- builder.append(tableSql.replace(
- GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1"));
- builder.append(") ");
- builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
+ incrementalMaxValue = rs.getString(1);
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE, incrementalMaxValue);
+ LOG.info("New maximal value for incremental import is " + incrementalMaxValue);
+ } finally {
+ if(rs != null) {
+ rs.close();
+ }
+ }
+ }
- } else {
- // when neither are specified:
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
+ // Retrieving min and max values for partition column
+ String minMaxQuery = jobConf.fromJobConfig.boundaryQuery;
+ if (minMaxQuery == null) {
+ sb.setLength(0);
+ sb.append("SELECT ");
+ sb.append("MIN(").append(partitionColumnName).append("), ");
+ sb.append("MAX(").append(partitionColumnName).append(") ");
+ sb.append("FROM ").append(fromFragment).append(" ");
+
+ if(incrementalImport) {
+ sb.append("WHERE ");
+ sb.append(jobConf.incrementalRead.checkColumn).append(" > ?");
+ sb.append(" AND ");
+ sb.append(jobConf.incrementalRead.checkColumn).append(" <= ?");
}
- minMaxQuery = builder.toString();
+ minMaxQuery = sb.toString();
}
+ LOG.info("Using min/max query: " + minMaxQuery);
-
- LOG.debug("Using minMaxQuery: " + minMaxQuery);
- ResultSet rs = executor.executeQuery(minMaxQuery);
+ PreparedStatement ps = null;
+ ResultSet rs = null;
try {
- ResultSetMetaData rsmd = rs.getMetaData();
- if (rsmd.getColumnCount() != 2) {
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
+ ps = executor.createStatement(minMaxQuery);
+ if (incrementalImport) {
+ ps.setString(1, jobConf.incrementalRead.lastValue);
+ ps.setString(2, incrementalMaxValue);
}
- rs.next();
+ rs = ps.executeQuery();
+ if(!rs.next()) {
+ throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
+ }
- int columnType = rsmd.getColumnType(1);
+ // Boundaries for the job
String min = rs.getString(1);
String max = rs.getString(2);
- LOG.info("Boundaries: min=" + min + ", max=" + max + ", columnType=" + columnType);
+ // Type of the partition column
+ ResultSetMetaData rsmd = rs.getMetaData();
+ if (rsmd.getColumnCount() != 2) {
+ throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
+ }
+ int columnType = rsmd.getColumnType(1);
+
+ LOG.info("Boundaries for the job: min=" + min + ", max=" + max + ", columnType=" + columnType);
context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, columnType);
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, min);
context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, max);
-
- } catch (SQLException e) {
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006, e);
+ } finally {
+ if(ps != null) {
+ ps.close();
+ }
+ if(rs != null) {
+ rs.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
index 39e8edd..d11b3b1 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
@@ -27,7 +27,10 @@ import org.apache.sqoop.model.Config;
public class FromJobConfiguration {
@Config public FromJobConfig fromJobConfig;
+ @Config public IncrementalRead incrementalRead;
+
public FromJobConfiguration() {
fromJobConfig = new FromJobConfig();
+ incrementalRead = new IncrementalRead();
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/IncrementalRead.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/IncrementalRead.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/IncrementalRead.java
new file mode 100644
index 0000000..f226532
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/IncrementalRead.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.InputEditable;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.validators.AbstractValidator;
+
+/**
+ */
+@ConfigClass(validators = {@Validator(IncrementalRead.ConfigValidator.class)})
+public class IncrementalRead {
+ @Input(size = 50)
+ public String checkColumn;
+
+ @Input(editable = InputEditable.ANY)
+ public String lastValue;
+
+ public static class ConfigValidator extends AbstractValidator<IncrementalRead> {
+ @Override
+ public void validate(IncrementalRead conf) {
+ if(conf.checkColumn != null && conf.lastValue == null) {
+ addMessage(Status.ERROR, "Last value is required during incremental read");
+ }
+
+ if(conf.checkColumn == null && conf.lastValue != null) {
+ addMessage(Status.ERROR, "Last value can't be filled without check column.");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
index 6a2159b..52bf631 100644
--- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
+++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
@@ -112,9 +112,13 @@ toJobConfig.stageTableName.help = Name of the staging table to use (Optional)
toJobConfig.shouldClearStageTable.label = Should clear stage table
toJobConfig.shouldClearStageTable.help = Indicate if the stage table should be cleared (Defaults to false)
-# Placeholders to have some entities created
-ignored.label = Ignored
-ignored.help = This is completely ignored
+# Incremental related configuration
+incrementalRead.label = Incremental read
+incrementalRead.help = Configuration related to incremental read
+
+incrementalRead.checkColumn.label = Check column
+incrementalRead.checkColumn.help = Column that is checked during incremental read for new values
+
+incrementalRead.lastValue.label = Last value
+incrementalRead.lastValue.help = Last read value, fetch will resume with higher values
-ignored.ignored.label = Ignored
-ignored.ignored.help = This is completely ignored
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
index 52003ab..e9c8d41 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
@@ -142,6 +142,66 @@ public class TestFromInitializer {
@Test
@SuppressWarnings("unchecked")
+ public void testIncrementalTableNameFullRange() throws Exception {
+ LinkConfiguration linkConfig = new LinkConfiguration();
+ FromJobConfiguration jobConfig = new FromJobConfiguration();
+
+ linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+ jobConfig.fromJobConfig.tableName = schemalessTableName;
+ jobConfig.incrementalRead.checkColumn = "ICOL";
+ jobConfig.incrementalRead.lastValue = "-51";
+
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcFromInitializer();
+ initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+ verifyResult(context,
+ "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
+ "ICOL,DCOL,VCOL",
+ "ICOL",
+ String.valueOf(Types.INTEGER),
+ String.valueOf(START),
+ String.valueOf(START+NUMBER_OF_ROWS-1));
+
+ assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testIncrementalTableNameFromZero() throws Exception {
+ LinkConfiguration linkConfig = new LinkConfiguration();
+ FromJobConfiguration jobConfig = new FromJobConfiguration();
+
+ linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+ jobConfig.fromJobConfig.tableName = schemalessTableName;
+ jobConfig.incrementalRead.checkColumn = "ICOL";
+ jobConfig.incrementalRead.lastValue = "0";
+
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcFromInitializer();
+ initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+ verifyResult(context,
+ "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
+ "ICOL,DCOL,VCOL",
+ "ICOL",
+ String.valueOf(Types.INTEGER),
+ String.valueOf(1),
+ String.valueOf(START+NUMBER_OF_ROWS-1));
+
+ assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
public void testTableNameWithTableColumns() throws Exception {
LinkConfiguration linkConfig = new LinkConfiguration();
FromJobConfiguration jobConfig = new FromJobConfiguration();
@@ -198,6 +258,66 @@ public class TestFromInitializer {
@Test
@SuppressWarnings("unchecked")
+ public void testIncrementalTableSqlFullRange() throws Exception {
+ LinkConfiguration linkConfig = new LinkConfiguration();
+ FromJobConfiguration jobConfig = new FromJobConfiguration();
+
+ linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+ jobConfig.fromJobConfig.sql = schemalessTableSql;
+ jobConfig.fromJobConfig.partitionColumn = "ICOL";
+ jobConfig.incrementalRead.checkColumn = "ICOL";
+ jobConfig.incrementalRead.lastValue = "-51";
+
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcFromInitializer();
+ initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+ verifyResult(context,
+ "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
+ "ICOL,DCOL,VCOL",
+ "ICOL",
+ String.valueOf(Types.INTEGER),
+ String.valueOf(START),
+ String.valueOf((START+NUMBER_OF_ROWS-1)));
+ assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testIncrementalTableSqlFromZero() throws Exception {
+ LinkConfiguration linkConfig = new LinkConfiguration();
+ FromJobConfiguration jobConfig = new FromJobConfiguration();
+
+ linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+ linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+ jobConfig.fromJobConfig.sql = schemalessTableSql;
+ jobConfig.fromJobConfig.partitionColumn = "ICOL";
+ jobConfig.incrementalRead.checkColumn = "ICOL";
+ jobConfig.incrementalRead.lastValue = "0";
+
+ MutableContext context = new MutableMapContext();
+ InitializerContext initializerContext = new InitializerContext(context);
+
+ @SuppressWarnings("rawtypes")
+ Initializer initializer = new GenericJdbcFromInitializer();
+ initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+ verifyResult(context,
+ "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
+ "ICOL,DCOL,VCOL",
+ "ICOL",
+ String.valueOf(Types.INTEGER),
+ String.valueOf(1),
+ String.valueOf((START+NUMBER_OF_ROWS-1)));
+ assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
public void testTableSqlWithTableColumns() throws Exception {
LinkConfiguration linkConfig = new LinkConfiguration();
FromJobConfiguration jobConfig = new FromJobConfiguration();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcConnector.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcConnector.java
new file mode 100644
index 0000000..cc1c58f
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcConnector.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.model.ConfigUtils;
+import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MInput;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ */
+public class TestGenericJdbcConnector {
+
+ @Test
+ public void testBundleForLink() {
+ GenericJdbcConnector connector = new GenericJdbcConnector();
+ verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getLinkConfigurationClass());
+ }
+
+ @Test
+ void testBundleForJobToDirection() {
+ GenericJdbcConnector connector = new GenericJdbcConnector();
+ verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.TO));
+ }
+
+ @Test
+ void testBundleForJobFromDirection() {
+ GenericJdbcConnector connector = new GenericJdbcConnector();
+ verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.FROM));
+ }
+
+ void verifyBundleForConfigClass(ResourceBundle bundle, Class klass) {
+ assertNotNull(bundle);
+ assertNotNull(klass);
+
+ List<MConfig> configs = ConfigUtils.toConfigs(klass);
+
+ for(MConfig config : configs) {
+ assertNotNull(config.getHelpKey());
+ assertNotNull(config.getLabelKey());
+
+ assertTrue(bundle.containsKey(config.getHelpKey()), "Can't find help for " + config.getName());
+ assertTrue(bundle.containsKey(config.getLabelKey()), "Can't find label for " + config.getName());
+
+ for(MInput input : config.getInputs()) {
+ assertNotNull(input.getHelpKey());
+ assertNotNull(input.getLabelKey());
+
+ assertTrue(bundle.containsKey(input.getHelpKey()), "Can't find help for " + input.getName());
+ assertTrue(bundle.containsKey(input.getLabelKey()), "Can't find label for " + input.getName());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
new file mode 100644
index 0000000..716de30
--- /dev/null
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.jdbc.generic;
+
+import com.google.common.collect.Iterables;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
+import org.apache.sqoop.model.MConfigList;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.utils.ParametrizedUtils;
+import org.testng.ITest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+/**
+ */
+public class IncrementalReadTest extends ConnectorTestCase implements ITest {
+
+ public static Object[] COLUMNS = new Object [][] {
+ // column - last value - new max value
+ { "id", "9", "19"},
+ { "version", "8.10", "13.10"},
+ {"release_date", "2008-10-18", "2013-10-17"},
+ };
+
+ private String checkColumn;
+ private String lastValue;
+ private String newMaxValue;
+
+ @Factory(dataProvider="incremental-integration-test")
+ public IncrementalReadTest(String checkColumn, String lastValue, String newMaxValue) {
+ this.checkColumn = checkColumn;
+ this.lastValue = lastValue;
+ this.newMaxValue = newMaxValue;
+ }
+
+ @DataProvider(name="incremental-integration-test", parallel=true)
+ public static Object[][] data() {
+ return Iterables.toArray(ParametrizedUtils.toArrayOfArrays(COLUMNS), Object[].class);
+ }
+
+ @Test
+ public void testTable() throws Exception {
+ createAndLoadTableUbuntuReleases();
+
+ // RDBMS link
+ MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
+ fillRdbmsLinkConfig(rdbmsLink);
+ saveLink(rdbmsLink);
+
+ // HDFS link
+ MLink hdfsLink = getClient().createLink("hdfs-connector");
+ saveLink(hdfsLink);
+
+ // Job creation
+ MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
+
+ // Set the rdbms "FROM" config
+ MConfigList fromConfig = job.getJobConfig(Direction.FROM);
+ fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName()));
+ fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
+ fromConfig.getStringInput("incrementalRead.checkColumn").setValue(provider.escapeColumnName(checkColumn));
+ fromConfig.getStringInput("incrementalRead.lastValue").setValue(lastValue);
+
+ // Fill hdfs "TO" config
+ fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+
+ saveJob(job);
+
+ executeJob(job);
+
+ // Assert correct output
+ assertTo(
+ "10,'Jaunty Jackalope',9.04,'2009-04-23',false",
+ "11,'Karmic Koala',9.10,'2009-10-29',false",
+ "12,'Lucid Lynx',10.04,'2010-04-29',true",
+ "13,'Maverick Meerkat',10.10,'2010-10-10',false",
+ "14,'Natty Narwhal',11.04,'2011-04-28',false",
+ "15,'Oneiric Ocelot',11.10,'2011-10-10',false",
+ "16,'Precise Pangolin',12.04,'2012-04-26',true",
+ "17,'Quantal Quetzal',12.10,'2012-10-18',false",
+ "18,'Raring Ringtail',13.04,'2013-04-25',false",
+ "19,'Saucy Salamander',13.10,'2013-10-17',false"
+ );
+
+ // TODO: After Sqoop will be properly updating configuration objects we need to verify new max value
+
+ // Clean up testing table
+ dropTable();
+ }
+
+ @Test
+ public void testQuery() throws Exception {
+ createAndLoadTableUbuntuReleases();
+
+ // RDBMS link
+ MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
+ fillRdbmsLinkConfig(rdbmsLink);
+ saveLink(rdbmsLink);
+
+ // HDFS link
+ MLink hdfsLink = getClient().createLink("hdfs-connector");
+ saveLink(hdfsLink);
+
+ // Job creation
+ MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
+
+ String query = "SELECT * FROM " + provider.escapeTableName(getTableName()) + " WHERE ${CONDITIONS}";
+
+ // Set the rdbms "FROM" config
+ MConfigList fromConfig = job.getJobConfig(Direction.FROM);
+ fromConfig.getStringInput("fromJobConfig.sql").setValue(query);
+ fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
+ fromConfig.getStringInput("incrementalRead.checkColumn").setValue(provider.escapeColumnName(checkColumn));
+ fromConfig.getStringInput("incrementalRead.lastValue").setValue(lastValue);
+
+ // Fill hdfs "TO" config
+ fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+
+ saveJob(job);
+
+ executeJob(job);
+
+ // Assert correct output
+ assertTo(
+ "10,'Jaunty Jackalope',9.04,'2009-04-23',false",
+ "11,'Karmic Koala',9.10,'2009-10-29',false",
+ "12,'Lucid Lynx',10.04,'2010-04-29',true",
+ "13,'Maverick Meerkat',10.10,'2010-10-10',false",
+ "14,'Natty Narwhal',11.04,'2011-04-28',false",
+ "15,'Oneiric Ocelot',11.10,'2011-10-10',false",
+ "16,'Precise Pangolin',12.04,'2012-04-26',true",
+ "17,'Quantal Quetzal',12.10,'2012-10-18',false",
+ "18,'Raring Ringtail',13.04,'2013-04-25',false",
+ "19,'Saucy Salamander',13.10,'2013-10-17',false"
+ );
+
+ // TODO: After Sqoop will be properly updating configuration objects we need to verify new max value
+
+ // Clean up testing table
+ dropTable();
+ }
+
+ private String testName;
+
+ @BeforeMethod(alwaysRun = true)
+ public void beforeMethod(Method aMethod) {
+ this.testName = aMethod.getName();
+ }
+
+ @Override
+ public String getTestName() {
+ return testName + "[" + checkColumn + "]";
+ }
+}